====== Apache Kafka ====== [[:linux:kafka:old_doc|old_doc]] ===== Матчасть ===== Программный брокер сообщений т.е. принимает данные от одной программы, хранит их и по запросу предоставляет другой\\ Сервер кафки хранит информацию о **топиках**, **поставщиках** и **потребителях** данных. Так же отвечает за хранение и предоставление передаваемых **сообщений**, по умолчанию сообщения хранятся в течении 7 дней, этот параметр настраивается, **длительность хранения не влияет на производительность системы**\\ ==== ZooKeeper ==== [[https://zookeeper.apache.org/doc/current/zookeeperAdmin.html|Офф дока]]\\
:!: Спойлер Это самостоятельный продукт, служба для координации распределённых систем. Кафка использует зукипер для хранения всех служебных метаданных, необходимых для работы системы\\ Роль зукипера, всюду упоминается только про случаи распределенной установки кафки, тобишь управление именно распределенной системой, что и исходит из описания. Но вроде как даже сингл-хост без него не работает, или работает ? В распределенном варианте кафки, зукипер хранит информацию о **состоянии распределенной системы**, статус узлов кластера, какой брокер является в данный момент контроллером, в каком состоянии находятся лидеры партиций и их реплики.\\ Зукипер обеспечивает **синхронизацию внутри распределенной системы** и обеспечивает работу с ней, а так же хранит информацию о клиентах, активен/неактивен и т.д.\\ В случае его потери, данные превращаются в беспорядочную кучу т.е. сами данные хранятся на брокерах, в кафке, а "ключ" к работе с ними хранится в зукипере\\ Зукипер фактически является самостоятельной, независимой, распределенной системой хранения данных, за которой нужно следить и поддерживать, падение зукипера это гарантированное падение кафки. С ним работает только кафка, клиенты никогда не коннектятся к нему напрямую\\ Кластер будет работать пока большая его часть в порядке, т.е. кластер из 3х машин может выдержать падение одной, из 5ти, падение двух и т.д.\\ Рекомендуется использовать нечетное ко-во машин т.к. например с 4мя машинами зукипер так же способен выдержать потерю только одной, иначе принцип большинства теряется, т.е. в вопросе отказоустойчивости 3 == 4, но 4ый хост добавляет нагрузку фоловера, это доп подключение к лидеру, трафик и т.д.\\ **Конфигурация кластера**\\ Для объединения узлов в кластер, нужно сохранить в спец файле ("myid") ИД узла затем в конфиге перечислить адреса всех участников кластера. Есть возможность указать доп адреса для каждого узла\\ * Файл ИД должен содержать только цифру (1-255) и находится в папке с данными (dataDir) * Перечень адресов должен быть одинаковым для всех узлов, т.е. свой адрес тоже указываем узлу который настраиваем (именно из этого файла он понимает какой порт ему нужно слушать и на каком интерфейсе)
==== Топики и их партиции ====
:!: Спойлер Все сообщения в кафке организованы и хранятся в именованных топиках, каждый топик в свою очередь может состоять из одной или более партиции\\ Топики это по сути смысловые темы, на которые программист разбивает свои сообщения А партиция это деление топика на функциональные единицы, потребители работают именно с партициями, и если партиций несколько то есть возможность работать с ними параллельно, иначе нет Партиции не дублируют сообщения, они именно делят топик на части, по какому либо правилу, условно говоря одна партиция обрабатывает сообщения от "А" до "О", другая от "П" до "Я" и тд\\ Каждая партиция **реплицируется** указанное кол-во раз, параметр **replication_factor**, :!: Важно :!: **клиенты работают только с лидером партиции**, записывают и читают только с него, а **реплики** (фоловеры) **копируют в себя данные** с лидера\\ |{{:linux:screenshot_2.jpg?direct&400 |}} | {{:linux:screenshot_1.jpg?direct&400 |}}| Сообщения отправляются в топик, по партициям они распределяются автоматом, если специально не задано другое, например с один ключом, сообщения будут гарантированно писаться в одну партицию\\
==== Получатели (консьюмеры) ==== Консьюмер группы как таковые не создаются, создаются только консьюмеры, которые входят в группы. Консольные консьюмеры генерят каждый себе "авто-группы", можно указать статическую\\
:!: Спойлер Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров **из одной группы** читают топик, то каждый из них получает **данные из разных партиций**, это и есть вторая составляющая для многопоточности\\ | {{ :linux:screenshot_3.jpg?direct&400 |}} | {{ :linux:screenshot_4.jpg?direct&400 |}} | Например если мы видим что три консьюмера не справляются с нагрузкой, создаем еще одну партицию и еще одного консьюмера, тем самым снижаем общую нагрузку\\ Партиции автоматически распределяются между доступными консьюмерами внутри группы\\ :!: Важно :!: Партиции назначаются уникально, не повторяются и если консьюмеров больше чем партиций, то остаток будет простаивать, в нашем примере четвертый консьюмер **не будет задействован** в обработке сообщений\\ Добавлять партиции можно на лету, без перезапуска, клиенты **автоматически их обнаружат**, но у них должен быть включен параметр **auto.offset.reset=earliest** т.к. обновление происходит периодически и чтобы в этот промежуток ничего не потерять\\ :!: Важно :!: Партиции невозможно удалить после создания\\ Партиционирование является инструментом масштабирования, группы инструментом отказоустойчивости\\ {{ :linux:безымянный.jpg?direct&400 |}}
==== Офсеты ====
:!: Спойлер Каждое сообщение партиции имеет свой собственный уникальный, увеличивающийся номер, т.н. офсет, его используются консьюмеры для навигации по потоку сообщений\\ Консьюмер делает т.н. "offset-commit" с указанием своей группы, топика и офсета сообщения, которое должно быть помечено как обработанное, брокер сохраняет эту инфу у себя, после например рестарта, консьюмер запрашивает эту информацию у брокера и продолжает читать очередь там где остановился\\ Консьюмер может коммитить любой допустимый офсет, тем самым передвигаться по истории сообщений.\\ Ключевой момент в том, что у связки "топик партиция - консьюмер группа" может быть только один коммит с офсетом, т.е. один указатель чтения внутри партиции, для одной группы\\ Основная структура данных в кафке это распределенный, реплицируемый лог. Каждая партиция это и есть этот самый лог, который хранится на диске\\ Каждое новое сообщение отправляемое в партицию добавляется в "голову" этого лога и получает свой уникальный номер, "offset", 64-битное число, назначается брокером\\ {{ :linux:screenshot_5.jpg?direct&400 |}} **Сообщение содержит** в себе\\ * Key - опциональный ключ, нужен для распределения сообщений по кластеру, сообщения с одинаковым ключом всегда пишутся в одну партицию, это гарантирует очередность считывания/записи * Value - само сообщение * Timestamp * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению
==== Конфигурация ====
:!: {{:linux:screenshot_1.png?direct&600|}}
:!:
:!:
===== Установка =====
:!: Notes Бинарник скачивается и распаковывается [[https://kafka.apache.org/downloads|оф сайт]]\\ Zookeeper содержится в дистрибутиве, работает отдельной службой, в файле конфига (./config/server.properties) есть блок подключения к зукиперу\\ Файлы обоих служб тоже создаются вручную\\ В ./bin есть множество скриптов для управления, как под линукс так и под винду\\
:!: Самый простой инстанс [[https://www.dmosk.ru/miniinstruktions.php?mini=kafka-linux|Пример]]\\ yum install curl tar wget iptables -I INPUT -p tcp --dport 9092 -j ACCEPT yum install java-11-openjdk-devel wget https://kafka.apache.org/downloads mkdir /opt/kafka tar zxf kafka_*.tgz -C /opt/kafka --strip 1 vi /opt/kafka/config/server.properties delete.topic.enable = true useradd -r -c 'Kafka broker user service' kafka chown -R kafka:kafka /opt/kafka vi /etc/systemd/system/zookeeper.service [Unit] Description=Zookeeper Service Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure [Install] WantedBy=multi-user.target vi /etc/systemd/system/kafka.service [Unit] Description=Kafka Service Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1' ExecStop=/opt/kafka/bin/kafka-server-stop.sh ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure [Install] WantedBy=multi-user.target systemctl daemon-reload systemctl enable zookeeper kafka systemctl start kafka # Для активации JMX, можно в unit файле добавить след переменные Environment=SERVER_JVMFLAGS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10020 -Djava.rmi.server.hostname=10.200.192.31" # Для зукипера SERVER_JVMFLAGS # Для кафки KAFKA_OPTS Есть спец экспортеры, например [[https://github.com/danielqsj/kafka_exporter|Мониторинг Prometheus]]\\ Но вполне можно использовать общий, "jmx_exporter", скачиваем с гита, компилируем, затем запускаем его вместе с целевой службой, в режиме "java-agent", для этого надо в переменной окружения прописать путь и все nano /etc/systemd/system/kafka-exporter.service [Unit] (...) [Service] (...) Environment=EXTRA_ARGS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent/target/jmx_javaagent.jar=7721:/opt/jmx_exporter/example_configs/zookeeper.yaml
===== Использование =====
:!: Примеры Топики # Создаем топик bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic "myTest" # Список / детализация bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --list bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --describe --topic "myTopic" # Отправляем сообщение (с указанием группы) echo "Hello" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "myTest" [--group "myGroup"] # Извлекаем сообщение bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "myTest" --from-beginning [--group "myGroup"] # Список групп bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 # Расширенная инфа по группе bin/kafka-consumer-groups.sh --describe --group "myGroup" --bootstrap-server localhost:9092 # (..) --alter --topic "myTopic" --partitions 100 (..) --alter --topic "myTopic" min.insync.replicas=2 --config retention.ms=172800000 Топики bin/kafka-topics.sh list create describe alter delete # Create topic --replication-factor 2 --partitions 10 --topic "myFirstTopic" # Delete topic --delete --topic first_topic # Alter topic --alter --topic first_topic --partitions 5 ? --alter --topic "myFirstTopic" --replication-factor 2 # Alter topic other params --alter --topic "myFirstTopic" --config retention.ms=50000000 --config compression.type=gzip # Alter topic delete param --alter --topic "myFirstTopic" --delete-config retention.ms
:!: Ручная балансировка Файл для перебалансировки {"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":4,"replicas":[2,0,1]}, {"topic":"__consumer_offsets","partition":5,"replicas":[2,0,1]}, {"topic":"__consumer_offsets","partition":6,"replicas":[2,0,1]} ]} # Применение изменений bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --reassignment-json-file rebalance.json --execute # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --admin.config config/auth_ssl.properties --election-type preferred --all-topic-partitions # Либо это произойдет автоматом при включенном параметре "auto.leader.rebalance.enable=false" Есть еще вроде как перебалансировка в утилите kafkactl\\ но тут что то непонятно, фактически это работа с фактором репликации, который побочно влияет на распределение\\ нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\ kafkactl alter topic brain-finres-events-test --replication-factor 3
:!: Работа с ACL # Список назначенных прав bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --list --transactional-id * (или имя) bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --list --topic * (или имя) --group # Удолить bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --remove --allow-principal User:svc-brain --operation WRITE --operation DESCRIBE --transactional-id * # Можно перечислять несколько сущностей --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres --allow-principal User:svc-hyper-mes-prod --operation WRITE --operation READ --operation DESCRIBE --topic brain-finres-events-test # Создание # Роль "--producer", WRITE and DESCRIBE on topic, CREATE on cluster; # --consumer READ and DESCRIBE on topic, READ on consumer group bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --add --allow-principal User:svc-brain --producer --topic brain-trainer-events !!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!! В случае звездочек, обязательно нужно использовать кавычки, по крайней мере с пробелом наверно --resource-pattern-type prefixed / literal создают разные объекты, которые не пересекаются # Пользователи список bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --entity-type users --describe добавить bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --alter --add-config 'SCRAM-SHA-512=[password=]' --entity-type users --entity-name удалить bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name
:!: kafkactl [[https://github.com/deviceinsight/kafkactl|Github]]\\ Конфиг файл для работы **~/.config/kafkactl/config.yml**\\ contexts: default: brokers: - localhost:9094 - localhost:9096 current-context: default Еще пример contexts: default: brokers: - bootstrap-kafka1:9092 - bootstrap-kafka2:9092 sasl: enabled: true mechanism: scram-sha512 password: username: admin tls: enabled: true insecure: true current-context: default Некоторые команды kafkactl # Некоторые команды первого уровня create list describe alter delete # Затем broker topic consumer-group # В общем то логика простая # Некоторые примеры kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2 kafkactl describe broker 0 # Send message kafkactl produce firstTop --key=my-key --value=my-value # Describe group kafkactl describe consumer-group test_app # Смена контекста из конфига kafkactl config use-context my_other_contx
:!: Docker Compose файл, zookeeper и два брокера\\ version: "2" services: zookeeper: image: docker.io/bitnami/zookeeper:3.9 ports: - "2181:2181" volumes: - "zookeeper_data:/bitnami" environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka0: image: docker.io/bitnami/kafka:3.4 ports: - "9094:9094" volumes: - "kafka_data0:/bitnami" environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_BROKER_ID=0 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka0:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT depends_on: - zookeeper kafka1: image: docker.io/bitnami/kafka:3.4 ports: - "9096:9096" volumes: - "kafka_data1:/bitnami" environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9095,CONTROLLER://:9097,EXTERNAL://:9096 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9095,EXTERNAL://localhost:9096 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT depends_on: - zookeeper volumes: zookeeper_data: driver: local kafka_data0: driver: local kafka_data1: driver: local Без кластера конечно короче\\ version: "2" services: zookeeper: image: docker.io/bitnami/zookeeper:3.9 ports: - "2181:2181" volumes: - "zookeeper_data:/bitnami" environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: docker.io/bitnami/kafka:3.4 ports: - "9092:9092" volumes: - "kafka_data:/bitnami" environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 depends_on: - zookeeper volumes: zookeeper_data: driver: local kafka_data: driver: local
:!:
==== Epoch ==== ==== Overs ==== В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в “карантинную” очередь (также именуемой “dead letter queue”) для последующей обработки, чтение партиции продолжить не получится. ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя ==== Параметры ==== === ? Параметры потребителей ? === **max.poll.records**\\ По умолчанию 500, максимальное кол-во записей, возвращаемых одним вызовом **poll()**\\ Изменение вроде не сильно влияет, потребитель кэширует записи и постепенно возвращает оттуда\\ **max.poll.interval.ms**\\ По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, если метод **poll()** не вызывался в течении этого времени то клиент считается отвалилшимся и консьюмер группа начинает перебалансировку\\
:!: Быстродействие потребителя Два выше рассмотренных свойства задают требования к приложению клиента, оно должно потреблять "max.poll.records" за "max.poll.interval.ms"\\ Увеличение "max.poll.records" может снижить пропускную способность изза роста накладных расходов\\ Увеличение "max.poll.interval.ms" может замедлить скорость отклика при перебалансировке потребителей\\
**fetch.max.bytes**\\ Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с "request.timeout.ms"\\ **request.timeout.ms**\\ Таймаут, за который консьюмер ожидает запрошенные данные от брокера\\ **group.instance.id**\\ Что то связано со статическими потребителями, и отвал статического будет по истечении **session.timeout.ms**\\ ==== CLI ==== === Topics ===
:!: Расширенный вывод (desribe) По мимо всего прочего, показано 3 столбца:\\ **Leader**\\ Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\ Вроде вручную это не регулируется\\ **Replicas**\\ Указаны брокеры которые реплицируют данные партиции, вне зависимости от лидерства\\ Первый идентификатор представляет предпочтительного лидера, поэтому кафка попытается сделать его лидером партиции\\ :!: В случае отвала брокера, резервный лидер партиции выбирается именно из этого столбца, проверено практикой, надежно\\ **Isr**\\ Означает синхронизированную реплику.\\ Сообщения шлются в лидера, затем если есть репли-фактор то фоловеры копируют новые сообщения себе, вычитывают.\\ Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\ Здесь что то тоже упоминается про приоритет выбора, может это про контроллер ? (пока один раз сошлось, хотя нет)\\
==== ==== ==== ==== ==== ==== ==== ====
:!: