Инструменты пользователя

Инструменты сайта


linux:kafka

Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
Следующая версия
Предыдущая версия
linux:kafka [2023/05/04 05:32]
admin
linux:kafka [2024/05/10 05:57] (текущий)
admin
Строка 94: Строка 94:
   * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению   * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению
 </details> </details>
 +
 +
 +
 +==== Конфигурация ====
 +
 +<details>
 +<summary> :!: </summary>
 +{{:linux:screenshot_1.png?direct&600|}}
 +</details>
 +
 +
 +<details>
 +<summary> :!: </summary>
 +
 +</details>
 +
 +
 +<details>
 +<summary> :!: </summary>
 +
 +</details>
 +
  
  
Строка 192: Строка 214:
 ===== Использование =====  ===== Использование ===== 
  
 +<details>
 +<summary>:!: Примеры </summary>
 +Топики
 <code bash> <code bash>
-# Создаем топик +        # Создаем топик 
-bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic "myTest"+bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1  
 +--topic "myTest"
  
-# Отправляем сообщение + # Список / детализация 
-echo "Hello" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "myTest"+bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --list 
 +bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --describe --topic "myTopic" 
 +</code> 
 + 
 + 
 + 
 +<code bash> 
 +    # Отправляем сообщение (с указанием группы) 
 +echo "Hello" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic "myTest" [--group "myGroup"]
  
-# Извлекаем сообщение +    # Извлекаем сообщение 
-bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "myTest" --from-beginning [--group "myGroup"]+bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "myTest"  
 +--from-beginning [--group "myGroup"]
  
-# Список групп+    # Список групп
 bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
  
-# Расширенная инфа по группе+    # Расширенная инфа по группе
 bin/kafka-consumer-groups.sh --describe --group "myGroup" --bootstrap-server localhost:9092 bin/kafka-consumer-groups.sh --describe --group "myGroup" --bootstrap-server localhost:9092
  
Строка 213: Строка 248:
 </code> </code>
  
-Работа с ACL+ 
 +Топики 
 +<code bash> 
 +bin/kafka-topics.sh 
 + list 
 + create 
 + describe 
 + alter 
 + delete 
 + 
 + # Create topic 
 + --replication-factor 2 --partitions 10 --topic "myFirstTopic" 
 + 
 + # Delete topic 
 + --delete --topic first_topic 
 + 
 + # Alter topic 
 + --alter --topic first_topic --partitions 5 
 + 
 +? --alter --topic "myFirstTopic" --replication-factor 2 
 + 
 + 
 + # Alter topic other params 
 + --alter --topic "myFirstTopic" --config retention.ms=50000000 --config compression.type=gzip 
 + 
 + # Alter topic delete param 
 + --alter --topic "myFirstTopic" --delete-config retention.ms 
 +</code> 
 +</details> 
 + 
 + 
 +<details> 
 +<summary>:!: Ручная балансировка</summary> 
 +Файл для перебалансировки 
 +<code json> 
 +{"version":1, 
 + "partitions":
 +     {"topic":"__consumer_offsets","partition":4,"replicas":[2,0,1]}, 
 +     {"topic":"__consumer_offsets","partition":5,"replicas":[2,0,1]}, 
 +     {"topic":"__consumer_offsets","partition":6,"replicas":[2,0,1]} 
 +]} 
 +</code> 
 + 
 +<code bash> 
 +    # Применение изменений 
 +bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --reassignment-json-file rebalance.json --execute 
 + 
 +   # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами 
 +bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --admin.config config/auth_ssl.properties --election-type preferred --all-topic-partitions 
 + 
 +# Либо это произойдет автоматом при включенном параметре "auto.leader.rebalance.enable=false" 
 +</code> 
 + 
 + 
 +Есть еще вроде как перебалансировка в утилите kafkactl\\ 
 +но тут что то непонятно, фактически это работа с фактором репликации, который побочно влияет на распределение\\ 
 +нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\ 
 +<code bash> 
 +kafkactl alter topic brain-finres-events-test --replication-factor 3 
 +</code> 
 + 
 +</details> 
 + 
 + 
 + 
 +<details> 
 +<summary>:!: Работа с ACL</summary>
 <code bash> <code bash>
   # Список назначенных прав   # Список назначенных прав
-bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --list --transactional-id * (или имя)+bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --list --transactional-id * (или имя)
  
-bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --list --topic * (или имя)+bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --list --topic * (или имя)
  
 --group --group
  
   # Удолить   # Удолить
-bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --remove --allow-principal User:svc-brain --operation WRITE --operation DESCRIBE --transactional-id *+bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --remove --allow-principal User:svc-brain  
 +--operation WRITE --operation DESCRIBE --transactional-id *
  
   # Можно перечислять несколько сущностей   # Можно перечислять несколько сущностей
- --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres --allow-principal User:svc-hyper-mes-prod  --operation WRITE --operation READ --operation DESCRIBE --topic brain-finres-events-test+ --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres  
 + --allow-principal User:svc-hyper-mes-prod  --operation WRITE --operation READ --operation DESCRIBE  
 + --topic brain-finres-events-test
  
  
Строка 232: Строка 339:
   # Роль "--producer", WRITE and DESCRIBE on topic, CREATE on cluster;    # Роль "--producer", WRITE and DESCRIBE on topic, CREATE on cluster; 
   # --consumer READ and DESCRIBE on topic, READ on consumer group   # --consumer READ and DESCRIBE on topic, READ on consumer group
-bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --add --allow-principal User:svc-brain --producer --topic brain-trainer-events+bin/kafka-acls.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --add --allow-principal User:svc-brain  
 +--producer --topic brain-trainer-events
  
  
Строка 242: Строка 351:
 # Пользователи # Пользователи
  список  список
-bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --entity-type users --describe+bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --entity-type users --describe
  
  добавить  добавить
-bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --alter --add-config 'SCRAM-SHA-512=[password=<myPass>]' --entity-type users --entity-name <myName>+bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --alter --add-config 'SCRAM-SHA-512=[password=<myPass>]'  
 +--entity-type users --entity-name <myName>
  
  удалить  удалить
-bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093 --command-config config/auth_ssl.properties --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name <myName>+bin/kafka-configs.sh --bootstrap-server bootstrap-kafka.ai.getcom.pw:9093  
 +--command-config config/auth_ssl.properties --alter --delete-config 'SCRAM-SHA-512'  
 +--entity-type users --entity-name <myName>
 </code> </code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: kafkactl</summary>
 +[[https://github.com/deviceinsight/kafkactl|Github]]\\
 +Конфиг файл для работы **~/.config/kafkactl/config.yml**\\
 +<code bash>
 +contexts:
 +  default:
 +    brokers:
 +      - localhost:9094
 +      - localhost:9096
 +
 +current-context: default
 +</code>
 +
 +Еще пример
 +<code bash>
 +contexts:
 +  default:
 +    brokers:
 +      - bootstrap-kafka1:9092
 +      - bootstrap-kafka2:9092
 +    sasl:
 +      enabled: true
 +      mechanism: scram-sha512
 +      password: 
 +      username: admin
 +    tls:
 +      enabled: true
 +      insecure: true
 +
 +
 +current-context: default
 +</code>
 +
 +
 +Некоторые команды
 +<code bash>
 +kafkactl 
 +# Некоторые команды первого уровня
 + create
 + list
 + describe
 + alter
 + delete
 +# Затем
 + broker
 + topic
 + consumer-group
 +# В общем то логика простая
 +
 +# Некоторые примеры
 +kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2
 +kafkactl describe broker 0
 +
 + # Send message
 +kafkactl produce firstTop --key=my-key --value=my-value
 +
 + # Describe group
 +kafkactl describe consumer-group test_app
 +
 +
 +  # Смена контекста из конфига
 +kafkactl config use-context my_other_contx
 +</code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: Docker</summary>
 +Compose файл, zookeeper и два брокера\\
 +<code yaml>
 +version: "2"
 +
 +services:
 +  zookeeper:
 +    image: docker.io/bitnami/zookeeper:3.9
 +    ports:
 +      - "2181:2181"
 +    volumes:
 +      - "zookeeper_data:/bitnami"
 +    environment:
 +      - ALLOW_ANONYMOUS_LOGIN=yes
 +
 +  kafka0:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9094:9094"
 +    volumes:
 +      - "kafka_data0:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +      - KAFKA_CFG_BROKER_ID=0
 + 
 +      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
 +      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka0:9092,EXTERNAL://localhost:9094
 +      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
 +
 +    depends_on:
 +      - zookeeper
 +
 +  kafka1:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9096:9096"
 +    volumes:
 +      - "kafka_data1:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +      - KAFKA_CFG_BROKER_ID=1
 +
 +      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9095,CONTROLLER://:9097,EXTERNAL://:9096
 +      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9095,EXTERNAL://localhost:9096
 +      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
 +
 +    depends_on:
 +      - zookeeper
 +
 +volumes:
 +  zookeeper_data:
 +    driver: local
 +  kafka_data0:
 +    driver: local
 +  kafka_data1:
 +    driver: local
 +</code>
 +
 +Без кластера конечно короче\\
 +<code yml>
 +version: "2"
 +
 +services:
 +  zookeeper:
 +    image: docker.io/bitnami/zookeeper:3.9
 +    ports:
 +      - "2181:2181"
 +    volumes:
 +      - "zookeeper_data:/bitnami"
 +    environment:
 +      - ALLOW_ANONYMOUS_LOGIN=yes
 +  kafka:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9092:9092"
 +    volumes:
 +      - "kafka_data:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +    depends_on:
 +      - zookeeper
 +
 +volumes:
 +  zookeeper_data:
 +    driver: local
 +  kafka_data:
 +    driver: local
 +</code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: </summary>
 +<code bash>
 +
 +</code>
 +</details>
 +
 +
 +==== Epoch ====
 +
 +
 +==== Overs ====
 +В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в “карантинную” очередь (также именуемой “dead letter queue”) для последующей обработки, чтение партиции продолжить не получится.
 +
 +ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя
 +
 +
 +==== Параметры ====
 +
 +=== ? Параметры потребителей ? ===
 +
 +**max.poll.records**\\
 +По умолчанию 500, максимальное кол-во записей, возвращаемых одним вызовом **poll()**\\
 +Изменение вроде не сильно влияет, потребитель кэширует записи и постепенно возвращает оттуда\\
 +
 +
 +**max.poll.interval.ms**\\
 +По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, если метод **poll()** не вызывался в течении этого времени то клиент считается отвалилшимся и консьюмер группа начинает перебалансировку\\
 +
 +
 +<details>
 +<summary>:!: Быстродействие потребителя</summary>
 +Два выше рассмотренных свойства задают требования к приложению клиента, оно должно потреблять "max.poll.records" за "max.poll.interval.ms"\\
 +
 +Увеличение "max.poll.records" может снижить пропускную способность изза роста накладных расходов\\
 +Увеличение "max.poll.interval.ms" может замедлить скорость отклика при перебалансировке потребителей\\
 +</details>
 +
 +
 +**fetch.max.bytes**\\
 +Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с "request.timeout.ms"\\
 +
 +
 +**request.timeout.ms**\\
 +Таймаут, за который консьюмер ожидает запрошенные данные от брокера\\
 +
 +
 +**group.instance.id**\\
 +Что то связано со статическими потребителями, и отвал статического будет по истечении **session.timeout.ms**\\
 +
 +
 +
 +
 +==== CLI ====
 +=== Topics ===
 +
 +<details>
 +<summary>:!: Расширенный вывод (desribe)</summary>
 +По мимо всего прочего, показано 3 столбца:\\
 +**Leader**\\
 +Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\
 +Вроде вручную это не регулируется\\
 +
 +
 +**Replicas**\\
 +Указаны брокеры которые реплицируют данные партиции, вне зависимости от лидерства\\
 +Первый идентификатор представляет предпочтительного лидера, поэтому кафка попытается сделать его лидером партиции\\
 +:!: В случае отвала брокера, резервный лидер партиции выбирается именно из этого столбца, проверено практикой, надежно\\
 +
 +
 +**Isr**\\
 +Означает синхронизированную реплику.\\
 +Сообщения шлются в лидера, затем если есть репли-фактор то фоловеры копируют новые сообщения себе, вычитывают.\\
 +Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\
 +Здесь что то тоже упоминается про приоритет выбора, может это про контроллер ? (пока один раз сошлось, хотя нет)\\
 +
 +
 +
 +</details>
 +
 +
 +
 +
 +====  ====
 +====  ====
 +====  ====
 +====  ====
 +
 +
 +<details>
 +<summary>:!: </summary>
 +
 +</details>
 +
 +
 +
 +
 +
 +
 +
 +
 +
  
  
linux/kafka.1683178341.txt.gz · Последнее изменение: 2023/05/04 05:32 — admin