Показаны различия между двумя версиями страницы.
Предыдущая версия справа и слева Предыдущая версия Следующая версия | Предыдущая версия | ||
linux:kafka [2023/04/12 03:04] admin |
linux:kafka [2024/05/10 05:57] (текущий) admin |
||
---|---|---|---|
Строка 94: | Строка 94: | ||
* Headers - пользовательские атрибуты, | * Headers - пользовательские атрибуты, | ||
</ | </ | ||
+ | |||
+ | |||
+ | |||
+ | ==== Конфигурация ==== | ||
+ | |||
+ | < | ||
+ | < | ||
+ | {{: | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
Строка 192: | Строка 214: | ||
===== Использование ===== | ===== Использование ===== | ||
+ | < | ||
+ | < | ||
+ | Топики | ||
<code bash> | <code bash> | ||
- | # Создаем топик | + | |
- | bin/ | + | bin/ |
+ | --topic " | ||
- | # Отправляем | + | # Список / детализация |
- | echo " | + | bin/kafka-topics.sh --bootstrap-server localhost: |
+ | bin/ | ||
+ | </ | ||
- | # Извлекаем сообщение | ||
- | bin/ | ||
- | # Список групп | + | |
+ | <code bash> | ||
+ | # Отправляем сообщение (с указанием группы) | ||
+ | echo " | ||
+ | |||
+ | # Извлекаем сообщение | ||
+ | bin/ | ||
+ | --from-beginning [--group " | ||
+ | |||
+ | | ||
bin/ | bin/ | ||
- | # Расширенная инфа по группе | + | |
bin/ | bin/ | ||
Строка 211: | Строка 246: | ||
(..) --alter --topic " | (..) --alter --topic " | ||
(..) --alter --topic " | (..) --alter --topic " | ||
+ | </ | ||
+ | |||
+ | Топики | ||
+ | <code bash> | ||
+ | bin/ | ||
+ | list | ||
+ | create | ||
+ | describe | ||
+ | alter | ||
+ | delete | ||
+ | |||
+ | # Create topic | ||
+ | | ||
+ | |||
+ | # Delete topic | ||
+ | | ||
+ | |||
+ | # Alter topic | ||
+ | | ||
+ | |||
+ | ? --alter --topic " | ||
+ | |||
+ | |||
+ | # Alter topic other params | ||
+ | | ||
+ | |||
+ | # Alter topic delete param | ||
+ | | ||
</ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Файл для перебалансировки | ||
+ | <code json> | ||
+ | {" | ||
+ | " | ||
+ | | ||
+ | | ||
+ | | ||
+ | ]} | ||
+ | </ | ||
+ | |||
+ | <code bash> | ||
+ | # Применение изменений | ||
+ | bin/ | ||
+ | |||
+ | # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами | ||
+ | bin/ | ||
+ | |||
+ | # Либо это произойдет автоматом при включенном параметре " | ||
+ | </ | ||
+ | |||
+ | |||
+ | Есть еще вроде как перебалансировка в утилите kafkactl\\ | ||
+ | но тут что то непонятно, | ||
+ | нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\ | ||
+ | <code bash> | ||
+ | kafkactl alter topic brain-finres-events-test --replication-factor 3 | ||
+ | </ | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | <code bash> | ||
+ | # Список назначенных прав | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | |||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | |||
+ | --group | ||
+ | |||
+ | # Удолить | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --operation WRITE --operation DESCRIBE --transactional-id * | ||
+ | |||
+ | # Можно перечислять несколько сущностей | ||
+ | | ||
+ | | ||
+ | | ||
+ | |||
+ | |||
+ | # Создание | ||
+ | # Роль " | ||
+ | # --consumer READ and DESCRIBE on topic, READ on consumer group | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --producer --topic brain-trainer-events | ||
+ | |||
+ | |||
+ | !!!!!!!!!!!!!!!!!!!!!!!!!! | ||
+ | !!!!!!!!!!!!!!!!!!!!!!!!!! | ||
+ | В случае звездочек, | ||
+ | --resource-pattern-type prefixed / literal создают разные объекты, | ||
+ | |||
+ | # Пользователи | ||
+ | список | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | |||
+ | добавить | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --entity-type users --entity-name < | ||
+ | |||
+ | удалить | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --entity-type users --entity-name < | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | [[https:// | ||
+ | Конфиг файл для работы **~/ | ||
+ | <code bash> | ||
+ | contexts: | ||
+ | default: | ||
+ | brokers: | ||
+ | - localhost: | ||
+ | - localhost: | ||
+ | |||
+ | current-context: | ||
+ | </ | ||
+ | |||
+ | Еще пример | ||
+ | <code bash> | ||
+ | contexts: | ||
+ | default: | ||
+ | brokers: | ||
+ | - bootstrap-kafka1: | ||
+ | - bootstrap-kafka2: | ||
+ | sasl: | ||
+ | enabled: true | ||
+ | mechanism: scram-sha512 | ||
+ | password: | ||
+ | username: admin | ||
+ | tls: | ||
+ | enabled: true | ||
+ | insecure: true | ||
+ | |||
+ | |||
+ | current-context: | ||
+ | </ | ||
+ | |||
+ | |||
+ | Некоторые команды | ||
+ | <code bash> | ||
+ | kafkactl | ||
+ | # Некоторые команды первого уровня | ||
+ | create | ||
+ | list | ||
+ | describe | ||
+ | alter | ||
+ | delete | ||
+ | # Затем | ||
+ | broker | ||
+ | topic | ||
+ | consumer-group | ||
+ | # В общем то логика простая | ||
+ | |||
+ | # Некоторые примеры | ||
+ | kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2 | ||
+ | kafkactl describe broker 0 | ||
+ | |||
+ | # Send message | ||
+ | kafkactl produce firstTop --key=my-key --value=my-value | ||
+ | |||
+ | # Describe group | ||
+ | kafkactl describe consumer-group test_app | ||
+ | |||
+ | |||
+ | # Смена контекста из конфига | ||
+ | kafkactl config use-context my_other_contx | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Compose файл, zookeeper и два брокера\\ | ||
+ | <code yaml> | ||
+ | version: " | ||
+ | |||
+ | services: | ||
+ | zookeeper: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - ALLOW_ANONYMOUS_LOGIN=yes | ||
+ | |||
+ | kafka0: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | - KAFKA_CFG_BROKER_ID=0 | ||
+ | |||
+ | - KAFKA_CFG_LISTENERS=PLAINTEXT://: | ||
+ | - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT:// | ||
+ | - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER: | ||
+ | |||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | kafka1: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | - KAFKA_CFG_BROKER_ID=1 | ||
+ | |||
+ | - KAFKA_CFG_LISTENERS=PLAINTEXT://: | ||
+ | - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT:// | ||
+ | - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER: | ||
+ | |||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | volumes: | ||
+ | zookeeper_data: | ||
+ | driver: local | ||
+ | kafka_data0: | ||
+ | driver: local | ||
+ | kafka_data1: | ||
+ | driver: local | ||
+ | </ | ||
+ | |||
+ | Без кластера конечно короче\\ | ||
+ | <code yml> | ||
+ | version: " | ||
+ | |||
+ | services: | ||
+ | zookeeper: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - ALLOW_ANONYMOUS_LOGIN=yes | ||
+ | kafka: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | volumes: | ||
+ | zookeeper_data: | ||
+ | driver: local | ||
+ | kafka_data: | ||
+ | driver: local | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | <code bash> | ||
+ | |||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | ==== Epoch ==== | ||
+ | |||
+ | |||
+ | ==== Overs ==== | ||
+ | В Кафке по умолчанию вычитывание сообщений из партиции останавливается, | ||
+ | |||
+ | ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, | ||
+ | |||
+ | |||
+ | ==== Параметры ==== | ||
+ | |||
+ | === ? Параметры потребителей ? === | ||
+ | |||
+ | **max.poll.records**\\ | ||
+ | По умолчанию 500, максимальное кол-во записей, | ||
+ | Изменение вроде не сильно влияет, | ||
+ | |||
+ | |||
+ | **max.poll.interval.ms**\\ | ||
+ | По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Два выше рассмотренных свойства задают требования к приложению клиента, | ||
+ | |||
+ | Увеличение " | ||
+ | Увеличение " | ||
+ | </ | ||
+ | |||
+ | |||
+ | **fetch.max.bytes**\\ | ||
+ | Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с " | ||
+ | |||
+ | |||
+ | **request.timeout.ms**\\ | ||
+ | Таймаут, | ||
+ | |||
+ | |||
+ | **group.instance.id**\\ | ||
+ | Что то связано со статическими потребителями, | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ==== CLI ==== | ||
+ | === Topics === | ||
+ | |||
+ | < | ||
+ | < | ||
+ | По мимо всего прочего, | ||
+ | **Leader**\\ | ||
+ | Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\ | ||
+ | Вроде вручную это не регулируется\\ | ||
+ | |||
+ | |||
+ | **Replicas**\\ | ||
+ | Указаны брокеры которые реплицируют данные партиции, | ||
+ | Первый идентификатор представляет предпочтительного лидера, | ||
+ | :!: В случае отвала брокера, | ||
+ | |||
+ | |||
+ | **Isr**\\ | ||
+ | Означает синхронизированную реплику.\\ | ||
+ | Сообщения шлются в лидера, | ||
+ | Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\ | ||
+ | Здесь что то тоже упоминается про приоритет выбора, | ||
+ | |||
+ | |||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||