Инструменты пользователя

Инструменты сайта


linux:kafka

Это старая версия документа!


Apache Kafka

Матчасть

Программный брокер сообщений т.е. принимает данные от одной программы, хранит их и по запросу предоставляет другой

Сервер кафки хранит информацию о топиках, поставщиках и потребителях данных. Так же отвечает за хранение и предоставление передаваемых сообщений, по умолчанию сообщения хранятся в течении 7 дней, этот параметр настраивается, длительность хранения не влияет на производительность системы

ZooKeeper

Офф дока

:!: Спойлер

Это самостоятельный продукт, служба для координации распределённых систем. Кафка использует зукипер для хранения всех служебных метаданных, необходимых для работы системы

Роль зукипера, всюду упоминается только про случаи распределенной установки кафки,
тобишь управление именно распределенной системой, что и исходит из описания.
Но вроде как даже сингл-хост без него не работает, или работает ?

В распределенном варианте кафки, зукипер хранит информацию о состоянии распределенной системы, статус узлов кластера, какой брокер является в данный момент контроллером, в каком состоянии находятся лидеры партиций и их реплики.
Зукипер обеспечивает синхронизацию внутри распределенной системы и обеспечивает работу с ней, а так же хранит информацию о клиентах, активен/неактивен и т.д.
В случае его потери, данные превращаются в беспорядочную кучу т.е. сами данные хранятся на брокерах, в кафке, а «ключ» к работе с ними хранится в зукипере

Зукипер фактически является самостоятельной, независимой, распределенной системой хранения данных, за которой нужно следить и поддерживать, падение зукипера это гарантированное падение кафки. С ним работает только кафка, клиенты никогда не коннектятся к нему напрямую

Кластер будет работать пока большая его часть в порядке, т.е. кластер из 3х машин может выдержать падение одной, из 5ти, падение двух и т.д.
Рекомендуется использовать нечетное ко-во машин т.к. например с 4мя машинами зукипер так же способен выдержать потерю только одной, иначе принцип большинства теряется, т.е. в вопросе отказоустойчивости 3 == 4, но 4ый хост добавляет нагрузку фоловера, это доп подключение к лидеру, трафик и т.д.

Конфигурация кластера
Для объединения узлов в кластер, нужно сохранить в спец файле («myid») ИД узла затем в конфиге перечислить адреса всех участников кластера. Есть возможность указать доп адреса для каждого узла

  • Файл ИД должен содержать только цифру (1-255) и находится в папке с данными (dataDir)
  • Перечень адресов должен быть одинаковым для всех узлов, т.е. свой адрес тоже указываем узлу который настраиваем (именно из этого файла он понимает какой порт ему нужно слушать и на каком интерфейсе)

Топики и их партиции

:!: Спойлер

Все сообщения в кафке организованы и хранятся в именованных топиках, каждый топик в свою очередь может состоять из одной или более партиции

Топики это по сути смысловые темы, на которые программист разбивает свои сообщения
А партиция это деление топика на функциональные единицы, 
потребители работают именно с партициями, и если партиций несколько то есть возможность работать с ними параллельно, иначе нет

Партиции не дублируют сообщения, они именно делят топик на части, по какому либо правилу, условно говоря одна партиция обрабатывает сообщения от «А» до «О», другая от «П» до «Я» и тд

Каждая партиция реплицируется указанное кол-во раз, параметр replication_factor, :!: Важно :!: клиенты работают только с лидером партиции, записывают и читают только с него, а реплики (фоловеры) копируют в себя данные с лидера

Сообщения отправляются в топик, по партициям они распределяются автоматом, если специально не задано другое, например с один ключом, сообщения будут гарантированно писаться в одну партицию

Получатели (консьюмеры)

Консьюмер группы как таковые не создаются, создаются только консьюмеры, которые входят в группы. Консольные консьюмеры генерят каждый себе «авто-группы», можно указать статическую

:!: Спойлер

Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров из одной группы читают топик, то каждый из них получает данные из разных партиций, это и есть вторая составляющая для многопоточности

Например если мы видим что три консьюмера не справляются с нагрузкой, создаем еще одну партицию и еще одного консьюмера, тем самым снижаем общую нагрузку

Партиции автоматически распределяются между доступными консьюмерами внутри группы
:!: Важно :!: Партиции назначаются уникально, не повторяются и если консьюмеров больше чем партиций, то остаток будет простаивать, в нашем примере четвертый консьюмер не будет задействован в обработке сообщений

Добавлять партиции можно на лету, без перезапуска, клиенты автоматически их обнаружат, но у них должен быть включен параметр auto.offset.reset=earliest т.к. обновление происходит периодически и чтобы в этот промежуток ничего не потерять
:!: Важно :!: Партиции невозможно удалить после создания

Партиционирование является инструментом масштабирования, группы инструментом отказоустойчивости

Офсеты

:!: Спойлер

Каждое сообщение партиции имеет свой собственный уникальный, увеличивающийся номер, т.н. офсет, его используются консьюмеры для навигации по потоку сообщений
Консьюмер делает т.н. «offset-commit» с указанием своей группы, топика и офсета сообщения, которое должно быть помечено как обработанное, брокер сохраняет эту инфу у себя, после например рестарта, консьюмер запрашивает эту информацию у брокера и продолжает читать очередь там где остановился

Консьюмер может коммитить любой допустимый офсет, тем самым передвигаться по истории сообщений.
Ключевой момент в том, что у связки «топик партиция - консьюмер группа» может быть только один коммит с офсетом, т.е. один указатель чтения внутри партиции, для одной группы

Основная структура данных в кафке это распределенный, реплицируемый лог. Каждая партиция это и есть этот самый лог, который хранится на диске
Каждое новое сообщение отправляемое в партицию добавляется в «голову» этого лога и получает свой уникальный номер, «offset», 64-битное число, назначается брокером

Сообщение содержит в себе

  • Key - опциональный ключ, нужен для распределения сообщений по кластеру, сообщения с одинаковым ключом всегда пишутся в одну партицию, это гарантирует очередность считывания/записи
  • Value - само сообщение
  • Timestamp
  • Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению

Установка

:!: Notes

Бинарник скачивается и распаковывается оф сайт
Zookeeper содержится в дистрибутиве, работает отдельной службой, в файле конфига (./config/server.properties) есть блок подключения к зукиперу
Файлы обоих служб тоже создаются вручную
В ./bin есть множество скриптов для управления, как под линукс так и под винду

:!: Самый простой инстанс

Пример

yum install curl tar wget
iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
yum install java-11-openjdk-devel
wget https://kafka.apache.org/downloads
 
mkdir /opt/kafka
tar zxf kafka_*.tgz -C /opt/kafka --strip 1
 
vi /opt/kafka/config/server.properties
	delete.topic.enable = true
 
useradd -r -c 'Kafka broker user service' kafka
chown -R kafka:kafka /opt/kafka
 
vi /etc/systemd/system/zookeeper.service
	[Unit]
	Description=Zookeeper Service
	Requires=network.target remote-fs.target
	After=network.target remote-fs.target
 
	[Service]
	Type=simple
	User=kafka
	ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
	ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
	ExecReload=/bin/kill -HUP $MAINPID
	Restart=on-failure
 
	[Install]
	WantedBy=multi-user.target
 
vi /etc/systemd/system/kafka.service
	[Unit]
	Description=Kafka Service
	Requires=network.target remote-fs.target
	After=network.target remote-fs.target
 
	[Service]
	Type=simple
	User=kafka
	ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1'
	ExecStop=/opt/kafka/bin/kafka-server-stop.sh
	ExecReload=/bin/kill -HUP $MAINPID
	Restart=on-failure
 
	[Install]
	WantedBy=multi-user.target
 
systemctl daemon-reload
systemctl enable zookeeper kafka
systemctl start kafka
# Создаем топик
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic "myTest"
 
# Отправляем сообщение
echo "Hello" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "myTest"
 
# Извлекаем сообщение
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "myTest" --from-beginning [--group "myGroup"]
 
# Список групп
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
 
# Расширенная инфа по группе
bin/kafka-consumer-groups.sh --describe --group "myGroup" --bootstrap-server localhost:9092

Есть спец экспортеры, например Мониторинг Prometheus
Но вполне можно использовать общий, «jmx_exporter», скачиваем с гита, компилируем, затем запускаем его вместе с целевой службой, в режиме «java-agent», для этого надо в переменной окружения прописать путь и все

nano /etc/systemd/system/kafka-exporter.service
	[Unit]
	(...)
 
	[Service]
	(...)
        Environment=EXTRA_ARGS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent/target/jmx_javaagent.jar=7721:/opt/jmx_exporter/example_configs/zookeeper.yaml

Использование

 
linux/kafka.1678025167.txt.gz · Последнее изменение: 2023/03/05 14:06 — admin