Инструменты пользователя

Инструменты сайта


linux:kafka

Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
Следующая версия
Предыдущая версия
linux:kafka [2023/02/12 05:54]
admin
linux:kafka [2024/05/10 05:57] (текущий)
admin
Строка 9: Строка 9:
  
  
-**ZooKeeper**\\+==== ZooKeeper ==== 
 +[[https://zookeeper.apache.org/doc/current/zookeeperAdmin.html|Офф дока]]\\ 
 +<details> 
 +<summary> :!: Спойлер </summary>
 Это самостоятельный продукт, служба для координации распределённых систем. Кафка использует зукипер для хранения всех служебных метаданных, необходимых для работы системы\\ Это самостоятельный продукт, служба для координации распределённых систем. Кафка использует зукипер для хранения всех служебных метаданных, необходимых для работы системы\\
   Роль зукипера, всюду упоминается только про случаи распределенной установки кафки,   Роль зукипера, всюду упоминается только про случаи распределенной установки кафки,
Строка 21: Строка 24:
 Зукипер фактически является самостоятельной, независимой, распределенной системой хранения данных, за которой нужно следить и поддерживать, падение зукипера это гарантированное падение кафки. С ним работает только кафка, клиенты никогда не коннектятся к нему напрямую\\ Зукипер фактически является самостоятельной, независимой, распределенной системой хранения данных, за которой нужно следить и поддерживать, падение зукипера это гарантированное падение кафки. С ним работает только кафка, клиенты никогда не коннектятся к нему напрямую\\
  
 +Кластер будет работать пока большая его часть в порядке, т.е. кластер из 3х машин может выдержать падение одной, из 5ти, падение двух и т.д.\\
 +Рекомендуется  использовать нечетное ко-во машин т.к. например с 4мя машинами зукипер так же способен выдержать потерю только одной, иначе принцип большинства теряется, т.е. в вопросе отказоустойчивости 3 == 4, но 4ый хост добавляет нагрузку фоловера, это доп подключение к лидеру, трафик и т.д.\\
  
-**Топики и их партиции**\\+**Конфигурация кластера**\\ 
 +Для объединения узлов в кластер, нужно сохранить в спец файле ("myid") ИД узла затем в конфиге перечислить адреса всех участников кластера. Есть возможность указать доп адреса для каждого узла\\ 
 + 
 +  * Файл ИД должен содержать только цифру (1-255) и находится в папке с данными (dataDir) 
 +  * Перечень адресов должен быть одинаковым для всех узлов, т.е. свой адрес тоже указываем узлу который настраиваем (именно из этого файла он понимает какой порт ему нужно слушать и на каком интерфейсе) 
 +</details> 
 + 
 + 
 + 
 +==== Топики и их партиции ==== 
 +<details> 
 +<summary> :!: Спойлер </summary>
 Все сообщения в кафке организованы и хранятся в именованных топиках, каждый топик в свою очередь может состоять из одной или более партиции\\ Все сообщения в кафке организованы и хранятся в именованных топиках, каждый топик в свою очередь может состоять из одной или более партиции\\
   Топики это по сути смысловые темы, на которые программист разбивает свои сообщения   Топики это по сути смысловые темы, на которые программист разбивает свои сообщения
Строка 33: Строка 49:
 |{{:linux:screenshot_2.jpg?direct&400 |}} | {{:linux:screenshot_1.jpg?direct&400 |}}| |{{:linux:screenshot_2.jpg?direct&400 |}} | {{:linux:screenshot_1.jpg?direct&400 |}}|
  
 +Сообщения отправляются в топик, по партициям они распределяются автоматом, если специально не задано другое, например с один ключом, сообщения будут гарантированно писаться в одну партицию\\
 +</details>
  
-**Получатели (консьюмеры)**\\+ 
 + 
 +==== Получатели (консьюмеры) ==== 
 +Консьюмер группы как таковые не создаются, создаются только консьюмеры, которые входят в группы. Консольные консьюмеры генерят каждый себе "авто-группы", можно указать статическую\\ 
 +<details> 
 +<summary> :!: Спойлер </summary>
 Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров **из одной группы** читают топик, то каждый из них получает **данные из разных партиций**, это и есть вторая составляющая для многопоточности\\ Консьюмеры обычно объединяются в группы. Когда несколько консьюмеров **из одной группы** читают топик, то каждый из них получает **данные из разных партиций**, это и есть вторая составляющая для многопоточности\\
 | {{ :linux:screenshot_3.jpg?direct&400 |}} | {{ :linux:screenshot_4.jpg?direct&400 |}} | | {{ :linux:screenshot_3.jpg?direct&400 |}} | {{ :linux:screenshot_4.jpg?direct&400 |}} |
Строка 48: Строка 71:
 Партиционирование является инструментом масштабирования, группы инструментом отказоустойчивости\\ Партиционирование является инструментом масштабирования, группы инструментом отказоустойчивости\\
 {{ :linux:безымянный.jpg?direct&400 |}} {{ :linux:безымянный.jpg?direct&400 |}}
 +</details>
  
  
-**Офсеты**\\+ 
 +==== Офсеты ==== 
 +<details> 
 +<summary> :!: Спойлер </summary>
 Каждое сообщение партиции имеет свой собственный уникальный, увеличивающийся номер, т.н. офсет, его используются консьюмеры для навигации по потоку сообщений\\ Каждое сообщение партиции имеет свой собственный уникальный, увеличивающийся номер, т.н. офсет, его используются консьюмеры для навигации по потоку сообщений\\
 Консьюмер делает т.н. "offset-commit" с указанием своей группы, топика и офсета сообщения, которое должно быть помечено как обработанное, брокер сохраняет эту инфу у себя, после например рестарта, консьюмер запрашивает эту информацию у брокера и продолжает читать очередь там где остановился\\ Консьюмер делает т.н. "offset-commit" с указанием своей группы, топика и офсета сообщения, которое должно быть помечено как обработанное, брокер сохраняет эту инфу у себя, после например рестарта, консьюмер запрашивает эту информацию у брокера и продолжает читать очередь там где остановился\\
Строка 60: Строка 87:
 Каждое новое сообщение отправляемое в партицию добавляется в "голову" этого лога и получает свой уникальный номер, "offset", 64-битное число, назначается брокером\\ Каждое новое сообщение отправляемое в партицию добавляется в "голову" этого лога и получает свой уникальный номер, "offset", 64-битное число, назначается брокером\\
 {{ :linux:screenshot_5.jpg?direct&400 |}} {{ :linux:screenshot_5.jpg?direct&400 |}}
- 
- 
- 
  
 **Сообщение содержит** в себе\\ **Сообщение содержит** в себе\\
Строка 69: Строка 93:
   * Timestamp   * Timestamp
   * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению   * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению
 +</details>
  
  
  
 +==== Конфигурация ====
  
-===== Установка ===== +<details> 
 +<summary> :!: </summary> 
 +{{:linux:screenshot_1.png?direct&600|}} 
 +</details>
  
 +
 +<details>
 +<summary> :!: </summary>
 +
 +</details>
 +
 +
 +<details>
 +<summary> :!: </summary>
 +
 +</details>
 +
 +
 +
 +
 +===== Установка ===== 
 <details> <details>
 <summary> :!: Notes </summary> <summary> :!: Notes </summary>
Строка 123: Строка 168:
  [Unit]  [Unit]
  Description=Kafka Service  Description=Kafka Service
- Requires=zookeeper.service + Requires=network.target remote-fs.target 
- After=zookeeper.service+ After=network.target remote-fs.target
  
  [Service]  [Service]
Строка 140: Строка 185:
 systemctl enable zookeeper kafka systemctl enable zookeeper kafka
 systemctl start kafka systemctl start kafka
-</code> 
  
-<code bash> 
-# Создаем топик 
-/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Test 
  
-Отправляем сообщение +  Для активации JMX, можно в unit файле добавить след переменные 
-echo "Hello, World from Kafka" | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Test+Environment=SERVER_JVMFLAGS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=10020 -Djava.rmi.server.hostname=10.200.192.31" 
 + 
 +  # Для зукипера 
 +SERVER_JVMFLAGS
  
-Извлекаем сообщение +  Для кафки 
-/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Test --from-beginning+KAFKA_OPTS
 </code> </code>
  
-[[https://github.com/danielqsj/kafka_exporter|Мониторинг Prometheus]]\\+Есть спец экспортеры, например [[https://github.com/danielqsj/kafka_exporter|Мониторинг Prometheus]]\\ 
 +Но вполне можно использовать общий, "jmx_exporter", скачиваем с гита, компилируем, затем запускаем его вместе с целевой службой, в режиме "java-agent", для этого надо в переменной окружения прописать путь и все
 <code bash> <code bash>
 nano /etc/systemd/system/kafka-exporter.service nano /etc/systemd/system/kafka-exporter.service
  [Unit]  [Unit]
- Description=Kafka prometheus exporter + (...)
- After=kafka.service+
  
  [Service]  [Service]
- Type=simple + (...) 
- User=kafka +        Environment=EXTRA_ARGS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent/target/jmx_javaagent.jar=7721:/opt/jmx_exporter/example_configs/zookeeper.yaml 
- ExecStart=/opt/kafka/bin/start_exporter.sh+</code> 
 +</details>
  
- [Install] 
- WantedBy=multi-user.target 
  
-nano /opt/kafka/bin/start_exporter.sh + 
- #!/bin/bash +===== Использование =====  
- ./opt/kafka/bin/kafka_exporter --kafka.server=localhost:9092 --web.listen-address=":7777"+ 
 +<details> 
 +<summary>:!: Примеры </summary> 
 +Топики 
 +<code bash> 
 +        # Создаем топик 
 +bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1  
 +--topic "myTest" 
 + 
 +Список детализация 
 +bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --list 
 +bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --describe --topic "myTopic" 
 +</code> 
 + 
 + 
 + 
 +<code bash> 
 +    # Отправляем сообщение (с указанием группы) 
 +echo "Hello" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "myTest" [--group "myGroup"
 + 
 +    # Извлекаем сообщение 
 +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "myTest"  
 +--from-beginning [--group "myGroup"
 + 
 +    # Список групп 
 +bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 
 + 
 +    # Расширенная инфа по группе 
 +bin/kafka-consumer-groups.sh --describe --group "myGroup" --bootstrap-server localhost:9092 
 + 
 +
 +(..) --alter --topic "myTopic" --partitions 100 
 +(..) --alter --topic "myTopic" min.insync.replicas=2 --config retention.ms=172800000 
 +</code> 
 + 
 + 
 +Топики 
 +<code bash> 
 +bin/kafka-topics.sh 
 + list 
 + create 
 + describe 
 + alter 
 + delete 
 + 
 + # Create topic 
 + --replication-factor 2 --partitions 10 --topic "myFirstTopic" 
 + 
 + # Delete topic 
 + --delete --topic first_topic 
 + 
 + # Alter topic 
 + --alter --topic first_topic --partitions 5 
 + 
 +? --alter --topic "myFirstTopic" --replication-factor 2 
 + 
 + 
 + # Alter topic other params 
 + --alter --topic "myFirstTopic" --config retention.ms=50000000 --config compression.type=gzip 
 + 
 + # Alter topic delete param 
 + --alter --topic "myFirstTopic" --delete-config retention.ms
 </code> </code>
 </details> </details>
  
  
 +<details>
 +<summary>:!: Ручная балансировка</summary>
 +Файл для перебалансировки
 +<code json>
 +{"version":1,
 + "partitions":[
 +     {"topic":"__consumer_offsets","partition":4,"replicas":[2,0,1]},
 +     {"topic":"__consumer_offsets","partition":5,"replicas":[2,0,1]},
 +     {"topic":"__consumer_offsets","partition":6,"replicas":[2,0,1]}
 +]}
 +</code>
 +
 +<code bash>
 +    # Применение изменений
 +bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --reassignment-json-file rebalance.json --execute
 +
 +   # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами
 +bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --admin.config config/auth_ssl.properties --election-type preferred --all-topic-partitions
 +
 +# Либо это произойдет автоматом при включенном параметре "auto.leader.rebalance.enable=false"
 +</code>
 +
 +
 +Есть еще вроде как перебалансировка в утилите kafkactl\\
 +но тут что то непонятно, фактически это работа с фактором репликации, который побочно влияет на распределение\\
 +нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\
 +<code bash>
 +kafkactl alter topic brain-finres-events-test --replication-factor 3
 +</code>
 +
 +</details>
  
  
-===== Использование =====  
  
 <details> <details>
-<summary> </summary>+<summary>:!: Работа с ACL</summary> 
 +<code bash> 
 +  # Список назначенных прав 
 +bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --list --transactional-id * (или имя)
  
-<code> </code>+bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --list --topic * (или имя) 
 + 
 +--group 
 + 
 +  # Удолить 
 +bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --remove --allow-principal User:svc-brain  
 +--operation WRITE --operation DESCRIBE --transactional-id * 
 + 
 +  # Можно перечислять несколько сущностей 
 + --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres  
 + --allow-principal User:svc-hyper-mes-prod  --operation WRITE --operation READ --operation DESCRIBE  
 + --topic brain-finres-events-test 
 + 
 + 
 +  # Создание 
 +  # Роль "--producer", WRITE and DESCRIBE on topic, CREATE on cluster;  
 +  # --consumer READ and DESCRIBE on topic, READ on consumer group 
 +bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --add --allow-principal User:svc-brain  
 +--producer --topic brain-trainer-events 
 + 
 + 
 +!!!!!!!!!!!!!!!!!!!!!!!!!! 
 +!!!!!!!!!!!!!!!!!!!!!!!!!! 
 +В случае звездочек, обязательно нужно использовать кавычки, по крайней мере с пробелом наверно 
 +--resource-pattern-type prefixed / literal создают разные объекты, которые не пересекаются 
 + 
 +# Пользователи 
 + список 
 +bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --entity-type users --describe 
 + 
 + добавить 
 +bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --alter --add-config 'SCRAM-SHA-512=[password=<myPass>]'  
 +--entity-type users --entity-name <myName> 
 + 
 + удалить 
 +bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --alter --delete-config 'SCRAM-SHA-512'  
 +--entity-type users --entity-name <myName> 
 +</code>
 </details> </details>
 +
 +
 +<details>
 +<summary>:!: kafkactl</summary>
 +[[https://github.com/deviceinsight/kafkactl|Github]]\\
 +Конфиг файл для работы **~/.config/kafkactl/config.yml**\\
 +<code bash>
 +contexts:
 +  default:
 +    brokers:
 +      - localhost:9094
 +      - localhost:9096
 +
 +current-context: default
 +</code>
 +
 +Еще пример
 +<code bash>
 +contexts:
 +  default:
 +    brokers:
 +      - bootstrap-kafka1:9092
 +      - bootstrap-kafka2:9092
 +    sasl:
 +      enabled: true
 +      mechanism: scram-sha512
 +      password: 
 +      username: admin
 +    tls:
 +      enabled: true
 +      insecure: true
 +
 +
 +current-context: default
 +</code>
 +
 +
 +Некоторые команды
 +<code bash>
 +kafkactl 
 +# Некоторые команды первого уровня
 + create
 + list
 + describe
 + alter
 + delete
 +# Затем
 + broker
 + topic
 + consumer-group
 +# В общем то логика простая
 +
 +# Некоторые примеры
 +kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2
 +kafkactl describe broker 0
 +
 + # Send message
 +kafkactl produce firstTop --key=my-key --value=my-value
 +
 + # Describe group
 +kafkactl describe consumer-group test_app
 +
 +
 +  # Смена контекста из конфига
 +kafkactl config use-context my_other_contx
 +</code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: Docker</summary>
 +Compose файл, zookeeper и два брокера\\
 +<code yaml>
 +version: "2"
 +
 +services:
 +  zookeeper:
 +    image: docker.io/bitnami/zookeeper:3.9
 +    ports:
 +      - "2181:2181"
 +    volumes:
 +      - "zookeeper_data:/bitnami"
 +    environment:
 +      - ALLOW_ANONYMOUS_LOGIN=yes
 +
 +  kafka0:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9094:9094"
 +    volumes:
 +      - "kafka_data0:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +      - KAFKA_CFG_BROKER_ID=0
 + 
 +      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
 +      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka0:9092,EXTERNAL://localhost:9094
 +      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
 +
 +    depends_on:
 +      - zookeeper
 +
 +  kafka1:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9096:9096"
 +    volumes:
 +      - "kafka_data1:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +      - KAFKA_CFG_BROKER_ID=1
 +
 +      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9095,CONTROLLER://:9097,EXTERNAL://:9096
 +      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9095,EXTERNAL://localhost:9096
 +      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
 +
 +    depends_on:
 +      - zookeeper
 +
 +volumes:
 +  zookeeper_data:
 +    driver: local
 +  kafka_data0:
 +    driver: local
 +  kafka_data1:
 +    driver: local
 +</code>
 +
 +Без кластера конечно короче\\
 +<code yml>
 +version: "2"
 +
 +services:
 +  zookeeper:
 +    image: docker.io/bitnami/zookeeper:3.9
 +    ports:
 +      - "2181:2181"
 +    volumes:
 +      - "zookeeper_data:/bitnami"
 +    environment:
 +      - ALLOW_ANONYMOUS_LOGIN=yes
 +  kafka:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9092:9092"
 +    volumes:
 +      - "kafka_data:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +    depends_on:
 +      - zookeeper
 +
 +volumes:
 +  zookeeper_data:
 +    driver: local
 +  kafka_data:
 +    driver: local
 +</code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: </summary>
 +<code bash>
 +
 +</code>
 +</details>
 +
 +
 +==== Epoch ====
 +
 +
 +==== Overs ====
 +В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в “карантинную” очередь (также именуемой “dead letter queue”) для последующей обработки, чтение партиции продолжить не получится.
 +
 +ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя
 +
 +
 +==== Параметры ====
 +
 +=== ? Параметры потребителей ? ===
 +
 +**max.poll.records**\\
 +По умолчанию 500, максимальное кол-во записей, возвращаемых одним вызовом **poll()**\\
 +Изменение вроде не сильно влияет, потребитель кэширует записи и постепенно возвращает оттуда\\
 +
 +
 +**max.poll.interval.ms**\\
 +По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, если метод **poll()** не вызывался в течении этого времени то клиент считается отвалилшимся и консьюмер группа начинает перебалансировку\\
 +
 +
 +<details>
 +<summary>:!: Быстродействие потребителя</summary>
 +Два выше рассмотренных свойства задают требования к приложению клиента, оно должно потреблять "max.poll.records" за "max.poll.interval.ms"\\
 +
 +Увеличение "max.poll.records" может снижить пропускную способность изза роста накладных расходов\\
 +Увеличение "max.poll.interval.ms" может замедлить скорость отклика при перебалансировке потребителей\\
 +</details>
 +
 +
 +**fetch.max.bytes**\\
 +Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с "request.timeout.ms"\\
 +
 +
 +**request.timeout.ms**\\
 +Таймаут, за который консьюмер ожидает запрошенные данные от брокера\\
 +
 +
 +**group.instance.id**\\
 +Что то связано со статическими потребителями, и отвал статического будет по истечении **session.timeout.ms**\\
 +
 +
 +
 +
 +==== CLI ====
 +=== Topics ===
 +
 +<details>
 +<summary>:!: Расширенный вывод (desribe)</summary>
 +По мимо всего прочего, показано 3 столбца:\\
 +**Leader**\\
 +Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\
 +Вроде вручную это не регулируется\\
 +
 +
 +**Replicas**\\
 +Указаны брокеры которые реплицируют данные партиции, вне зависимости от лидерства\\
 +Первый идентификатор представляет предпочтительного лидера, поэтому кафка попытается сделать его лидером партиции\\
 +:!: В случае отвала брокера, резервный лидер партиции выбирается именно из этого столбца, проверено практикой, надежно\\
 +
 +
 +**Isr**\\
 +Означает синхронизированную реплику.\\
 +Сообщения шлются в лидера, затем если есть репли-фактор то фоловеры копируют новые сообщения себе, вычитывают.\\
 +Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\
 +Здесь что то тоже упоминается про приоритет выбора, может это про контроллер ? (пока один раз сошлось, хотя нет)\\
 +
 +
 +
 +</details>
 +
 +
 +
 +
 +====  ====
 +====  ====
 +====  ====
 +====  ====
 +
 +
 +<details>
 +<summary>:!: </summary>
 +
 +</details>
 +
 +
 +
 +
 +
 +
 +
 +
 +
 +
 +
linux/kafka.1676181245.txt.gz · Последнее изменение: 2023/02/12 05:54 — admin