Инструменты пользователя

Инструменты сайта


linux:kafka

Apache Kafka

Матчасть

Программный брокер сообщений т.е. принимает данные от одной программы, хранит их и по запросу предоставляет другой

Сервер кафки хранит информацию о топиках, поставщиках и потребителях данных. Так же отвечает за хранение и предоставление передаваемых сообщений, по умолчанию сообщения хранятся в течении 7 дней, этот параметр настраивается, длительность хранения не влияет на производительность системы

ZooKeeper

Офф дока

:!: Спойлер

Это самостоятельный продукт, служба для координации распределённых систем. Кафка использует зукипер для хранения всех служебных метаданных, необходимых для работы системы

Роль зукипера, всюду упоминается только про случаи распределенной установки кафки,
тобишь управление именно распределенной системой, что и исходит из описания.
Но вроде как даже сингл-хост без него не работает, или работает ?

В распределенном варианте кафки, зукипер хранит информацию о состоянии распределенной системы, статус узлов кластера, какой брокер является в данный момент контроллером, в каком состоянии находятся лидеры партиций и их реплики.
Зукипер обеспечивает синхронизацию внутри распределенной системы и обеспечивает работу с ней, а так же хранит информацию о клиентах, активен/неактивен и т.д.
В случае его потери, данные превращаются в беспорядочную кучу т.е. сами данные хранятся на брокерах, в кафке, а «ключ» к работе с ними хранится в зукипере

Зукипер фактически является самостоятельной, независимой, распределенной системой хранения данных, за которой нужно следить и поддерживать, падение зукипера это гарантированное падение кафки. С ним работает только кафка, клиенты никогда не коннектятся к нему напрямую

Кластер будет работать пока большая его часть в порядке, т.е. кластер из 3х машин может выдержать падение одной, из 5ти, падение двух и т.д.
Рекомендуется использовать нечетное ко-во машин т.к. например с 4мя машинами зукипер так же способен выдержать потерю только одной, иначе принцип большинства теряется, т.е. в вопросе отказоустойчивости 3 == 4, но 4ый хост добавляет нагрузку фоловера, это доп подключение к лидеру, трафик и т.д.

Конфигурация кластера
Для объединения узлов в кластер, нужно сохранить в спец файле («myid») ИД узла затем в конфиге перечислить адреса всех участников кластера. Есть возможность указать доп адреса для каждого узла

  • Файл ИД должен содержать только цифру (1-255) и находится в папке с данными (dataDir)
  • Перечень адресов должен быть одинаковым для всех узлов, т.е. свой адрес тоже указываем узлу который настраиваем (именно из этого файла он понимает какой порт ему нужно слушать и на каком интерфейсе)

Топики и их партиции

:!: Спойлер

Все сообщения в кафке организованы и хранятся в именованных топиках, каждый топик в свою очередь может состоять из одной или более партиции

Топики это по сути смысловые темы, на которые программист разбивает свои сообщения
А партиция это деление топика на функциональные единицы, 
потребители работают именно с партициями, и если партиций несколько то есть возможность работать с ними параллельно, иначе нет

Партиции не дублируют сообщения, они именно делят топик на части, по какому либо правилу, условно говоря одна партиция обрабатывает сообщения от «А» до «О», другая от «П» до «Я» и тд

Каждая партиция реплицируется указанное кол-во раз, параметр replication_factor, :!: Важно :!: клиенты работают только с лидером партиции, записывают и читают только с него, а реплики (фоловеры) копируют в себя данные с лидера

Сообщения отправляются в топик, по партициям они распределяются автоматом, если специально не задано другое, например с один ключом, сообщения будут гарантированно писаться в одну партицию

Получатели (консьюмеры)

Консьюмер группы как таковые не создаются, создаются только консьюмеры, которые входят в группы. Консольные консьюмеры генерят каждый себе «авто-группы», можно указать статическую

:!: Спойлер

Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров из одной группы читают топик, то каждый из них получает данные из разных партиций, это и есть вторая составляющая для многопоточности

Например если мы видим что три консьюмера не справляются с нагрузкой, создаем еще одну партицию и еще одного консьюмера, тем самым снижаем общую нагрузку

Партиции автоматически распределяются между доступными консьюмерами внутри группы
:!: Важно :!: Партиции назначаются уникально, не повторяются и если консьюмеров больше чем партиций, то остаток будет простаивать, в нашем примере четвертый консьюмер не будет задействован в обработке сообщений

Добавлять партиции можно на лету, без перезапуска, клиенты автоматически их обнаружат, но у них должен быть включен параметр auto.offset.reset=earliest т.к. обновление происходит периодически и чтобы в этот промежуток ничего не потерять
:!: Важно :!: Партиции невозможно удалить после создания

Партиционирование является инструментом масштабирования, группы инструментом отказоустойчивости

Офсеты

:!: Спойлер

Каждое сообщение партиции имеет свой собственный уникальный, увеличивающийся номер, т.н. офсет, его используются консьюмеры для навигации по потоку сообщений
Консьюмер делает т.н. «offset-commit» с указанием своей группы, топика и офсета сообщения, которое должно быть помечено как обработанное, брокер сохраняет эту инфу у себя, после например рестарта, консьюмер запрашивает эту информацию у брокера и продолжает читать очередь там где остановился

Консьюмер может коммитить любой допустимый офсет, тем самым передвигаться по истории сообщений.
Ключевой момент в том, что у связки «топик партиция - консьюмер группа» может быть только один коммит с офсетом, т.е. один указатель чтения внутри партиции, для одной группы

Основная структура данных в кафке это распределенный, реплицируемый лог. Каждая партиция это и есть этот самый лог, который хранится на диске
Каждое новое сообщение отправляемое в партицию добавляется в «голову» этого лога и получает свой уникальный номер, «offset», 64-битное число, назначается брокером

Сообщение содержит в себе

  • Key - опциональный ключ, нужен для распределения сообщений по кластеру, сообщения с одинаковым ключом всегда пишутся в одну партицию, это гарантирует очередность считывания/записи
  • Value - само сообщение
  • Timestamp
  • Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению

Конфигурация

:!:

:!:
:!:

Установка

:!: Notes

Бинарник скачивается и распаковывается оф сайт
Zookeeper содержится в дистрибутиве, работает отдельной службой, в файле конфига (./config/server.properties) есть блок подключения к зукиперу
Файлы обоих служб тоже создаются вручную
В ./bin есть множество скриптов для управления, как под линукс так и под винду

:!: Самый простой инстанс

Пример

yum install curl tar wget
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
yum install java-11-openjdk-devel
wget https://kafka.apache.org/downloads
 
mkdir /opt/kafka
tar zxf kafka_*.tgz -C /opt/kafka --strip 1
 
vi /opt/kafka/config/server.properties
	delete.topic.enable = true
 
useradd -r -c 'Kafka broker user service' kafka
chown -R kafka:kafka /opt/kafka
 
vi /etc/systemd/system/zookeeper.service
	[Unit]
	Description=Zookeeper Service
	Requires=network.target remote-fs.target
	After=network.target remote-fs.target
 
	[Service]
	Type=simple
	User=kafka
	ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
	ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
	ExecReload=/bin/kill -HUP $MAINPID
	Restart=on-failure
 
	[Install]
	WantedBy=multi-user.target
 
vi /etc/systemd/system/kafka.service
	[Unit]
	Description=Kafka Service
	Requires=network.target remote-fs.target
	After=network.target remote-fs.target
 
	[Service]
	Type=simple
	User=kafka
	ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
	ExecStop=/opt/kafka/bin/kafka-server-stop.sh
	ExecReload=/bin/kill -HUP $MAINPID
	Restart=on-failure
 
	[Install]
	WantedBy=multi-user.target
 
systemctl daemon-reload
systemctl enable zookeeper kafka
systemctl start kafka
 
 
  # Для активации JMX, можно в unit файле добавить след переменные
Environment=SERVER_JVMFLAGS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10020 -Djava.rmi.server.hostname=10.200.192.31"
 
  # Для зукипера
SERVER_JVMFLAGS
 
  # Для кафки
KAFKA_OPTS

Есть спец экспортеры, например Мониторинг Prometheus
Но вполне можно использовать общий, «jmx_exporter», скачиваем с гита, компилируем, затем запускаем его вместе с целевой службой, в режиме «java-agent», для этого надо в переменной окружения прописать путь и все

nano /etc/systemd/system/kafka-exporter.service
	[Unit]
	(...)
 
	[Service]
	(...)
        Environment=EXTRA_ARGS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent/target/jmx_javaagent.jar=7721:/opt/jmx_exporter/example_configs/zookeeper.yaml

Использование

:!: Примеры

Топики

        # Создаем топик
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 
--topic "myTest"
 
	# Список / детализация
bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --list
bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --describe --topic "myTopic"
    # Отправляем сообщение (с указанием группы)
echo "Hello" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "myTest" [--group "myGroup"]
 
    # Извлекаем сообщение
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "myTest" 
--from-beginning [--group "myGroup"]
 
    # Список групп
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
 
    # Расширенная инфа по группе
bin/kafka-consumer-groups.sh --describe --group "myGroup" --bootstrap-server localhost:9092
 
#
(..) --alter --topic "myTopic" --partitions 100
(..) --alter --topic "myTopic" min.insync.replicas=2 --config retention.ms=172800000

Топики

bin/kafka-topics.sh
	list
	create
	describe
	alter
	delete
 
	# Create topic
 --replication-factor 2 --partitions 10 --topic "myFirstTopic"
 
	# Delete topic
 --delete --topic first_topic
 
	# Alter topic
 --alter --topic first_topic --partitions 5
 
? --alter --topic "myFirstTopic" --replication-factor 2
 
 
	# Alter topic other params
 --alter --topic "myFirstTopic" --config retention.ms=50000000 --config compression.type=gzip
 
	# Alter topic delete param
 --alter --topic "myFirstTopic" --delete-config retention.ms
:!: Ручная балансировка

Файл для перебалансировки

{"version":1,
 "partitions":[
     {"topic":"__consumer_offsets","partition":4,"replicas":[2,0,1]},
     {"topic":"__consumer_offsets","partition":5,"replicas":[2,0,1]},
     {"topic":"__consumer_offsets","partition":6,"replicas":[2,0,1]}
]}
    # Применение изменений
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --reassignment-json-file rebalance.json --execute
 
   # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --admin.config config/auth_ssl.properties --election-type preferred --all-topic-partitions
 
# Либо это произойдет автоматом при включенном параметре "auto.leader.rebalance.enable=false"

Есть еще вроде как перебалансировка в утилите kafkactl
но тут что то непонятно, фактически это работа с фактором репликации, который побочно влияет на распределение
нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х

kafkactl alter topic brain-finres-events-test --replication-factor 3
:!: Работа с ACL
  # Список назначенных прав
bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --list --transactional-id * (или имя)
 
bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --list --topic * (или имя)
 
--group
 
  # Удолить
bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --remove --allow-principal User:svc-brain 
--operation WRITE --operation DESCRIBE --transactional-id *
 
  # Можно перечислять несколько сущностей
 --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres 
 --allow-principal User:svc-hyper-mes-prod  --operation WRITE --operation READ --operation DESCRIBE 
 --topic brain-finres-events-test
 
 
  # Создание
  # Роль "--producer", WRITE and DESCRIBE on topic, CREATE on cluster; 
  # --consumer READ and DESCRIBE on topic, READ on consumer group
bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --add --allow-principal User:svc-brain 
--producer --topic brain-trainer-events
 
 
!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!
В случае звездочек, обязательно нужно использовать кавычки, по крайней мере с пробелом наверно
--resource-pattern-type prefixed / literal создают разные объекты, которые не пересекаются
 
# Пользователи
	список
bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --entity-type users --describe
 
	добавить
bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --alter --add-config 'SCRAM-SHA-512=[password=<myPass>]' 
--entity-type users --entity-name <myName>
 
	удалить
bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 
--command-config config/auth_ssl.properties --alter --delete-config 'SCRAM-SHA-512' 
--entity-type users --entity-name <myName>
:!: kafkactl

Github
Конфиг файл для работы ~/.config/kafkactl/config.yml

contexts:
  default:
    brokers:
      - localhost:9094
      - localhost:9096
 
current-context: default

Еще пример

contexts:
  default:
    brokers:
      - bootstrap-kafka1:9092
      - bootstrap-kafka2:9092
    sasl:
      enabled: true
      mechanism: scram-sha512
      password: 
      username: admin
    tls:
      enabled: true
      insecure: true
 
 
current-context: default

Некоторые команды

kafkactl 
# Некоторые команды первого уровня
	create
	list
	describe
	alter
	delete
# Затем
	broker
	topic
	consumer-group
# В общем то логика простая
 
# Некоторые примеры
kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2
kafkactl describe broker 0
 
	# Send message
kafkactl produce firstTop --key=my-key --value=my-value
 
	# Describe group
kafkactl describe consumer-group test_app
 
 
  # Смена контекста из конфига
kafkactl config use-context my_other_contx
:!: Docker

Compose файл, zookeeper и два брокера

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.9
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka0:
    image: docker.io/bitnami/kafka:3.4
    ports:
      - "9094:9094"
    volumes:
      - "kafka_data0:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_BROKER_ID=0
 
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka0:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT

    depends_on:
      - zookeeper

  kafka1:
    image: docker.io/bitnami/kafka:3.4
    ports:
      - "9096:9096"
    volumes:
      - "kafka_data1:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_BROKER_ID=1
 
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9095,CONTROLLER://:9097,EXTERNAL://:9096
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9095,EXTERNAL://localhost:9096
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT

    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data0:
    driver: local
  kafka_data1:
    driver: local

Без кластера конечно короче

version: "2"
 
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.9
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.4
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
    depends_on:
      - zookeeper
 
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
:!:
 

Epoch

Overs

В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в “карантинную” очередь (также именуемой “dead letter queue”) для последующей обработки, чтение партиции продолжить не получится.

ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя

Параметры

? Параметры потребителей ?

max.poll.records
По умолчанию 500, максимальное кол-во записей, возвращаемых одним вызовом poll()
Изменение вроде не сильно влияет, потребитель кэширует записи и постепенно возвращает оттуда

max.poll.interval.ms
По умолчанию 5мин, макс задержка между вызовами poll(). т.е. время в течении которого потребитель может бездействовать, если метод poll() не вызывался в течении этого времени то клиент считается отвалилшимся и консьюмер группа начинает перебалансировку

:!: Быстродействие потребителя

Два выше рассмотренных свойства задают требования к приложению клиента, оно должно потреблять «max.poll.records» за «max.poll.interval.ms»

Увеличение «max.poll.records» может снижить пропускную способность изза роста накладных расходов
Увеличение «max.poll.interval.ms» может замедлить скорость отклика при перебалансировке потребителей

fetch.max.bytes
Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с «request.timeout.ms»

request.timeout.ms
Таймаут, за который консьюмер ожидает запрошенные данные от брокера

group.instance.id
Что то связано со статическими потребителями, и отвал статического будет по истечении session.timeout.ms

CLI

Topics

:!: Расширенный вывод (desribe)

По мимо всего прочего, показано 3 столбца:
Leader
Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами
Вроде вручную это не регулируется

Replicas
Указаны брокеры которые реплицируют данные партиции, вне зависимости от лидерства
Первый идентификатор представляет предпочтительного лидера, поэтому кафка попытается сделать его лидером партиции
:!: В случае отвала брокера, резервный лидер партиции выбирается именно из этого столбца, проверено практикой, надежно

Isr
Означает синхронизированную реплику.
Сообщения шлются в лидера, затем если есть репли-фактор то фоловеры копируют новые сообщения себе, вычитывают.
Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)
Здесь что то тоже упоминается про приоритет выбора, может это про контроллер ? (пока один раз сошлось, хотя нет)

:!:
linux/kafka.txt · Последнее изменение: 2024/05/10 05:57 — admin