Это старая версия документа!
Программный брокер сообщений т.е. принимает данные от одной программы, хранит их и по запросу предоставляет другой
Сервер кафки хранит информацию о топиках, поставщиках и потребителях данных. Так же отвечает за хранение и предоставление передаваемых сообщений, по умолчанию сообщения хранятся в течении 7 дней, этот параметр настраивается, длительность хранения не влияет на производительность системы
Это самостоятельный продукт, служба для координации распределённых систем. Кафка использует зукипер для хранения всех служебных метаданных, необходимых для работы системы
Роль зукипера, всюду упоминается только про случаи распределенной установки кафки, тобишь управление именно распределенной системой, что и исходит из описания. Но вроде как даже сингл-хост без него не работает, или работает ?
В распределенном варианте кафки, зукипер хранит информацию о состоянии распределенной системы, статус узлов кластера, какой брокер является в данный момент контроллером, в каком состоянии находятся лидеры партиций и их реплики.
Зукипер обеспечивает синхронизацию внутри распределенной системы и обеспечивает работу с ней, а так же хранит информацию о клиентах, активен/неактивен и т.д.
В случае его потери, данные превращаются в беспорядочную кучу т.е. сами данные хранятся на брокерах, в кафке, а «ключ» к работе с ними хранится в зукипере
Зукипер фактически является самостоятельной, независимой, распределенной системой хранения данных, за которой нужно следить и поддерживать, падение зукипера это гарантированное падение кафки. С ним работает только кафка, клиенты никогда не коннектятся к нему напрямую
Все сообщения в кафке организованы и хранятся в именованных топиках, каждый топик в свою очередь может состоять из одной или более партиции
Топики это по сути смысловые темы, на которые программист разбивает свои сообщения А партиция это деление топика на функциональные единицы, потребители работают именно с партициями, и если партиций несколько то есть возможность работать с ними параллельно, иначе нет
Партиции не дублируют сообщения, они именно делят топик на части, по какому либо правилу, условно говоря одна партиция обрабатывает сообщения от «А» до «О», другая от «П» до «Я» и тд
Каждая партиция реплицируется указанное кол-во раз, параметр replication_factor, Важно
клиенты работают только с лидером партиции, записывают и читают только с него, а реплики (фоловеры) копируют в себя данные с лидера
Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров из одной группы читают топик, то каждый из них получает данные из разных партиций, это и есть вторая составляющая для многопоточности
Например если мы видим что три консьюмера не справляются с нагрузкой, создаем еще одну партицию и еще одного консьюмера, тем самым снижаем общую нагрузку
Партиции автоматически распределяются между доступными консьюмерами внутри группы
Важно
Партиции назначаются уникально, не повторяются и если консьюмеров больше чем партиций, то остаток будет простаивать, в нашем примере четвертый консьюмер не будет задействован в обработке сообщений
Добавлять партиции можно на лету, без перезапуска, клиенты автоматически их обнаружат, но у них должен быть включен параметр auto.offset.reset=earliest т.к. обновление происходит периодически и чтобы в этот промежуток ничего не потерять
Важно
Партиции невозможно удалить после создания
Партиционирование является инструментом масштабирования, группы инструментом отказоустойчивости
Каждое сообщение партиции имеет свой собственный уникальный, увеличивающийся номер, т.н. офсет, его используются консьюмеры для навигации по потоку сообщений
Консьюмер делает т.н. «offset-commit» с указанием своей группы, топика и офсета сообщения, которое должно быть помечено как обработанное, брокер сохраняет эту инфу у себя, после например рестарта, консьюмер запрашивает эту информацию у брокера и продолжает читать очередь там где остановился
Консьюмер может коммитить любой допустимый офсет, тем самым передвигаться по истории сообщений.
Ключевой момент в том, что у связки «топик партиция - консьюмер группа» может быть только один коммит с офсетом, т.е. один указатель чтения внутри партиции, для одной группы
Основная структура данных в кафке это распределенный, реплицируемый лог. Каждая партиция это и есть этот самый лог, который хранится на диске
Каждое новое сообщение отправляемое в партицию добавляется в «голову» этого лога и получает свой уникальный номер, «offset», 64-битное число, назначается брокером
Сообщение содержит в себе
Бинарник скачивается и распаковывается оф сайт
Zookeeper содержится в дистрибутиве, работает отдельной службой, в файле конфига (./config/server.properties) есть блок подключения к зукиперу
Файлы обоих служб тоже создаются вручную
В ./bin есть множество скриптов для управления, как под линукс так и под винду
yum install curl tar wget iptables -I INPUT -p tcp --dport 9092 -j ACCEPT yum install java-11-openjdk-devel wget https://kafka.apache.org/downloads mkdir /opt/kafka tar zxf kafka_*.tgz -C /opt/kafka --strip 1 vi /opt/kafka/config/server.properties delete.topic.enable = true useradd -r -c 'Kafka broker user service' kafka chown -R kafka:kafka /opt/kafka vi /etc/systemd/system/zookeeper.service [Unit] Description=Zookeeper Service Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure [Install] WantedBy=multi-user.target vi /etc/systemd/system/kafka.service [Unit] Description=Kafka Service Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1' ExecStop=/opt/kafka/bin/kafka-server-stop.sh ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure [Install] WantedBy=multi-user.target systemctl daemon-reload systemctl enable zookeeper kafka systemctl start kafka
# Создаем топик /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Test # Отправляем сообщение echo "Hello, World from Kafka" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test # Извлекаем сообщение /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --from-beginning
Есть спец экспортеры, например Мониторинг Prometheus
Но вполне можно использовать общий, «jmx_exporter», скачиваем с гита, компилируем, затем запускаем его вместе с целевой службой, в режиме «java-agent», для этого надо в переменной окружения прописать путь и все
nano /etc/systemd/system/kafka-exporter.service [Unit] (...) [Service] (...) Environment=EXTRA_ARGS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent/target/jmx_javaagent.jar=7721:/opt/jmx_exporter/example_configs/zookeeper.yaml