Показаны различия между двумя версиями страницы.
Предыдущая версия справа и слева Предыдущая версия Следующая версия | Предыдущая версия | ||
linux:kafka [2023/02/12 05:54] admin |
linux:kafka [2024/05/10 05:57] (текущий) admin |
||
---|---|---|---|
Строка 9: | Строка 9: | ||
- | **ZooKeeper**\\ | + | ==== ZooKeeper |
+ | [[https:// | ||
+ | < | ||
+ | < | ||
Это самостоятельный продукт, | Это самостоятельный продукт, | ||
Роль зукипера, | Роль зукипера, | ||
Строка 21: | Строка 24: | ||
Зукипер фактически является самостоятельной, | Зукипер фактически является самостоятельной, | ||
+ | Кластер будет работать пока большая его часть в порядке, | ||
+ | Рекомендуется | ||
- | **Топики и их партиции**\\ | + | **Конфигурация кластера**\\ |
+ | Для объединения узлов в кластер, | ||
+ | |||
+ | * Файл ИД должен содержать только цифру (1-255) и находится в папке с данными (dataDir) | ||
+ | * Перечень адресов должен быть одинаковым для всех узлов, т.е. свой адрес тоже указываем узлу который настраиваем (именно из этого файла он понимает какой порт ему нужно слушать и на каком интерфейсе) | ||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | ==== Топики и их партиции | ||
+ | < | ||
+ | < | ||
Все сообщения в кафке организованы и хранятся в именованных топиках, | Все сообщения в кафке организованы и хранятся в именованных топиках, | ||
Топики это по сути смысловые темы, на которые программист разбивает свои сообщения | Топики это по сути смысловые темы, на которые программист разбивает свои сообщения | ||
Строка 33: | Строка 49: | ||
|{{: | |{{: | ||
+ | Сообщения отправляются в топик, по партициям они распределяются автоматом, | ||
+ | </ | ||
- | **Получатели (консьюмеры)**\\ | + | |
+ | |||
+ | ==== Получатели (консьюмеры) | ||
+ | Консьюмер группы как таковые не создаются, | ||
+ | < | ||
+ | < | ||
Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров **из одной группы** читают топик, то каждый из них получает **данные из разных партиций**, | Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров **из одной группы** читают топик, то каждый из них получает **данные из разных партиций**, | ||
| {{ : | | {{ : | ||
Строка 48: | Строка 71: | ||
Партиционирование является инструментом масштабирования, | Партиционирование является инструментом масштабирования, | ||
{{ : | {{ : | ||
+ | </ | ||
- | **Офсеты**\\ | + | |
+ | ==== Офсеты | ||
+ | < | ||
+ | < | ||
Каждое сообщение партиции имеет свой собственный уникальный, | Каждое сообщение партиции имеет свой собственный уникальный, | ||
Консьюмер делает т.н. " | Консьюмер делает т.н. " | ||
Строка 60: | Строка 87: | ||
Каждое новое сообщение отправляемое в партицию добавляется в " | Каждое новое сообщение отправляемое в партицию добавляется в " | ||
{{ : | {{ : | ||
- | |||
- | |||
- | |||
**Сообщение содержит** в себе\\ | **Сообщение содержит** в себе\\ | ||
Строка 69: | Строка 93: | ||
* Timestamp | * Timestamp | ||
* Headers - пользовательские атрибуты, | * Headers - пользовательские атрибуты, | ||
+ | </ | ||
+ | ==== Конфигурация ==== | ||
- | ===== Установка ===== | + | < |
+ | < | ||
+ | {{: | ||
+ | </ | ||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ===== Установка ===== | ||
< | < | ||
< | < | ||
Строка 123: | Строка 168: | ||
[Unit] | [Unit] | ||
Description=Kafka Service | Description=Kafka Service | ||
- | Requires=zookeeper.service | + | Requires=network.target remote-fs.target |
- | After=zookeeper.service | + | After=network.target remote-fs.target |
[Service] | [Service] | ||
Строка 140: | Строка 185: | ||
systemctl enable zookeeper kafka | systemctl enable zookeeper kafka | ||
systemctl start kafka | systemctl start kafka | ||
- | </ | ||
- | <code bash> | ||
- | # Создаем топик | ||
- | / | ||
- | # Отправляем сообщение | + | |
- | echo "Hello, World from Kafka" | / | + | Environment=SERVER_JVMFLAGS="-Dcom.sun.management.jmxremote=true |
+ | |||
+ | # Для зукипера | ||
+ | SERVER_JVMFLAGS | ||
- | # Извлекаем сообщение | + | |
- | / | + | KAFKA_OPTS |
</ | </ | ||
- | [[https:// | + | Есть спец экспортеры, |
+ | Но вполне можно использовать общий, " | ||
<code bash> | <code bash> | ||
nano / | nano / | ||
[Unit] | [Unit] | ||
- | Description=Kafka prometheus exporter | + | (...) |
- | After=kafka.service | + | |
[Service] | [Service] | ||
- | Type=simple | + | (...) |
- | User=kafka | + | |
- | ExecStart=/opt/kafka/bin/start_exporter.sh | + | </ |
+ | </ | ||
- | [Install] | ||
- | WantedBy=multi-user.target | ||
- | nano /opt/kafka/bin/start_exporter.sh | + | |
- | #!/bin/bash | + | ===== Использование ===== |
- | ./opt/kafka/bin/kafka_exporter | + | |
+ | < | ||
+ | < | ||
+ | Топики | ||
+ | <code bash> | ||
+ | # Создаем топик | ||
+ | bin/kafka-topics.sh --create --bootstrap-server localhost: | ||
+ | --topic " | ||
+ | |||
+ | # Список | ||
+ | bin/kafka-topics.sh --bootstrap-server localhost: | ||
+ | bin/ | ||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | <code bash> | ||
+ | # Отправляем сообщение (с указанием группы) | ||
+ | echo " | ||
+ | |||
+ | # Извлекаем сообщение | ||
+ | bin/kafka-console-consumer.sh --bootstrap-server localhost: | ||
+ | --from-beginning [--group " | ||
+ | |||
+ | # Список групп | ||
+ | bin/kafka-consumer-groups.sh --list --bootstrap-server localhost: | ||
+ | |||
+ | # Расширенная инфа по группе | ||
+ | bin/kafka-consumer-groups.sh --describe --group " | ||
+ | |||
+ | # | ||
+ | (..) --alter --topic " | ||
+ | (..) --alter --topic " | ||
+ | </ | ||
+ | |||
+ | |||
+ | Топики | ||
+ | <code bash> | ||
+ | bin/ | ||
+ | list | ||
+ | create | ||
+ | describe | ||
+ | alter | ||
+ | delete | ||
+ | |||
+ | # Create topic | ||
+ | | ||
+ | |||
+ | # Delete topic | ||
+ | | ||
+ | |||
+ | # Alter topic | ||
+ | | ||
+ | |||
+ | ? --alter --topic " | ||
+ | |||
+ | |||
+ | # Alter topic other params | ||
+ | | ||
+ | |||
+ | # Alter topic delete param | ||
+ | | ||
</ | </ | ||
</ | </ | ||
+ | < | ||
+ | < | ||
+ | Файл для перебалансировки | ||
+ | <code json> | ||
+ | {" | ||
+ | " | ||
+ | | ||
+ | | ||
+ | | ||
+ | ]} | ||
+ | </ | ||
+ | |||
+ | <code bash> | ||
+ | # Применение изменений | ||
+ | bin/ | ||
+ | |||
+ | # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами | ||
+ | bin/ | ||
+ | |||
+ | # Либо это произойдет автоматом при включенном параметре " | ||
+ | </ | ||
+ | |||
+ | |||
+ | Есть еще вроде как перебалансировка в утилите kafkactl\\ | ||
+ | но тут что то непонятно, | ||
+ | нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\ | ||
+ | <code bash> | ||
+ | kafkactl alter topic brain-finres-events-test --replication-factor 3 | ||
+ | </ | ||
+ | |||
+ | </ | ||
- | ===== Использование ===== | ||
< | < | ||
- | < | + | < |
+ | <code bash> | ||
+ | # Список назначенных прав | ||
+ | bin/ | ||
+ | --command-config config/ | ||
- | <code> </ | + | bin/ |
+ | --command-config config/ | ||
+ | |||
+ | --group | ||
+ | |||
+ | # Удолить | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --operation WRITE --operation DESCRIBE --transactional-id * | ||
+ | |||
+ | # Можно перечислять несколько сущностей | ||
+ | | ||
+ | | ||
+ | | ||
+ | |||
+ | |||
+ | # Создание | ||
+ | # Роль " | ||
+ | # --consumer READ and DESCRIBE on topic, READ on consumer group | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --producer --topic brain-trainer-events | ||
+ | |||
+ | |||
+ | !!!!!!!!!!!!!!!!!!!!!!!!!! | ||
+ | !!!!!!!!!!!!!!!!!!!!!!!!!! | ||
+ | В случае звездочек, | ||
+ | --resource-pattern-type prefixed / literal создают разные объекты, | ||
+ | |||
+ | # Пользователи | ||
+ | список | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | |||
+ | добавить | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --entity-type users --entity-name < | ||
+ | |||
+ | удалить | ||
+ | bin/ | ||
+ | --command-config config/ | ||
+ | --entity-type users --entity-name < | ||
+ | </ | ||
</ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | [[https:// | ||
+ | Конфиг файл для работы **~/ | ||
+ | <code bash> | ||
+ | contexts: | ||
+ | default: | ||
+ | brokers: | ||
+ | - localhost: | ||
+ | - localhost: | ||
+ | |||
+ | current-context: | ||
+ | </ | ||
+ | |||
+ | Еще пример | ||
+ | <code bash> | ||
+ | contexts: | ||
+ | default: | ||
+ | brokers: | ||
+ | - bootstrap-kafka1: | ||
+ | - bootstrap-kafka2: | ||
+ | sasl: | ||
+ | enabled: true | ||
+ | mechanism: scram-sha512 | ||
+ | password: | ||
+ | username: admin | ||
+ | tls: | ||
+ | enabled: true | ||
+ | insecure: true | ||
+ | |||
+ | |||
+ | current-context: | ||
+ | </ | ||
+ | |||
+ | |||
+ | Некоторые команды | ||
+ | <code bash> | ||
+ | kafkactl | ||
+ | # Некоторые команды первого уровня | ||
+ | create | ||
+ | list | ||
+ | describe | ||
+ | alter | ||
+ | delete | ||
+ | # Затем | ||
+ | broker | ||
+ | topic | ||
+ | consumer-group | ||
+ | # В общем то логика простая | ||
+ | |||
+ | # Некоторые примеры | ||
+ | kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2 | ||
+ | kafkactl describe broker 0 | ||
+ | |||
+ | # Send message | ||
+ | kafkactl produce firstTop --key=my-key --value=my-value | ||
+ | |||
+ | # Describe group | ||
+ | kafkactl describe consumer-group test_app | ||
+ | |||
+ | |||
+ | # Смена контекста из конфига | ||
+ | kafkactl config use-context my_other_contx | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Compose файл, zookeeper и два брокера\\ | ||
+ | <code yaml> | ||
+ | version: " | ||
+ | |||
+ | services: | ||
+ | zookeeper: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - ALLOW_ANONYMOUS_LOGIN=yes | ||
+ | |||
+ | kafka0: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | - KAFKA_CFG_BROKER_ID=0 | ||
+ | |||
+ | - KAFKA_CFG_LISTENERS=PLAINTEXT://: | ||
+ | - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT:// | ||
+ | - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER: | ||
+ | |||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | kafka1: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | - KAFKA_CFG_BROKER_ID=1 | ||
+ | |||
+ | - KAFKA_CFG_LISTENERS=PLAINTEXT://: | ||
+ | - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT:// | ||
+ | - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER: | ||
+ | |||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | volumes: | ||
+ | zookeeper_data: | ||
+ | driver: local | ||
+ | kafka_data0: | ||
+ | driver: local | ||
+ | kafka_data1: | ||
+ | driver: local | ||
+ | </ | ||
+ | |||
+ | Без кластера конечно короче\\ | ||
+ | <code yml> | ||
+ | version: " | ||
+ | |||
+ | services: | ||
+ | zookeeper: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - ALLOW_ANONYMOUS_LOGIN=yes | ||
+ | kafka: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | volumes: | ||
+ | zookeeper_data: | ||
+ | driver: local | ||
+ | kafka_data: | ||
+ | driver: local | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | <code bash> | ||
+ | |||
+ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | ==== Epoch ==== | ||
+ | |||
+ | |||
+ | ==== Overs ==== | ||
+ | В Кафке по умолчанию вычитывание сообщений из партиции останавливается, | ||
+ | |||
+ | ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, | ||
+ | |||
+ | |||
+ | ==== Параметры ==== | ||
+ | |||
+ | === ? Параметры потребителей ? === | ||
+ | |||
+ | **max.poll.records**\\ | ||
+ | По умолчанию 500, максимальное кол-во записей, | ||
+ | Изменение вроде не сильно влияет, | ||
+ | |||
+ | |||
+ | **max.poll.interval.ms**\\ | ||
+ | По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Два выше рассмотренных свойства задают требования к приложению клиента, | ||
+ | |||
+ | Увеличение " | ||
+ | Увеличение " | ||
+ | </ | ||
+ | |||
+ | |||
+ | **fetch.max.bytes**\\ | ||
+ | Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с " | ||
+ | |||
+ | |||
+ | **request.timeout.ms**\\ | ||
+ | Таймаут, | ||
+ | |||
+ | |||
+ | **group.instance.id**\\ | ||
+ | Что то связано со статическими потребителями, | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ==== CLI ==== | ||
+ | === Topics === | ||
+ | |||
+ | < | ||
+ | < | ||
+ | По мимо всего прочего, | ||
+ | **Leader**\\ | ||
+ | Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\ | ||
+ | Вроде вручную это не регулируется\\ | ||
+ | |||
+ | |||
+ | **Replicas**\\ | ||
+ | Указаны брокеры которые реплицируют данные партиции, | ||
+ | Первый идентификатор представляет предпочтительного лидера, | ||
+ | :!: В случае отвала брокера, | ||
+ | |||
+ | |||
+ | **Isr**\\ | ||
+ | Означает синхронизированную реплику.\\ | ||
+ | Сообщения шлются в лидера, | ||
+ | Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\ | ||
+ | Здесь что то тоже упоминается про приоритет выбора, | ||
+ | |||
+ | |||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ |