Инструменты пользователя

Инструменты сайта


linux:kafka

Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
Следующая версия
Предыдущая версия
linux:kafka [2023/05/17 03:11]
admin
linux:kafka [2024/05/10 05:57] (текущий)
admin
Строка 94: Строка 94:
   * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению   * Headers - пользовательские атрибуты, тоже key/value, которые прикрепляются к сообщению
 </details> </details>
 +
 +
 +
 +==== Конфигурация ====
 +
 +<details>
 +<summary> :!: </summary>
 +{{:linux:screenshot_1.png?direct&600|}}
 +</details>
 +
 +
 +<details>
 +<summary> :!: </summary>
 +
 +</details>
 +
 +
 +<details>
 +<summary> :!: </summary>
 +
 +</details>
 +
  
  
Строка 203: Строка 225:
 bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --list bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --list
 bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --describe --topic "myTopic" bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --describe --topic "myTopic"
 +</code>
  
- # Перебалансировка 
-bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --reassignment-json-file rebalance.json --execute 
- 
- # Сам файл для перебалансировки 
-{"version":1, 
- "partitions":[ 
-     {"topic":"__consumer_offsets","partition":4,"replicas":[2,0,1]} 
- 
-]} 
- 
-</bash> 
  
  
Строка 235: Строка 247:
 (..) --alter --topic "myTopic" min.insync.replicas=2 --config retention.ms=172800000 (..) --alter --topic "myTopic" min.insync.replicas=2 --config retention.ms=172800000
 </code> </code>
 +
 +
 +Топики
 +<code bash>
 +bin/kafka-topics.sh
 + list
 + create
 + describe
 + alter
 + delete
 +
 + # Create topic
 + --replication-factor 2 --partitions 10 --topic "myFirstTopic"
 +
 + # Delete topic
 + --delete --topic first_topic
 +
 + # Alter topic
 + --alter --topic first_topic --partitions 5
 +
 +? --alter --topic "myFirstTopic" --replication-factor 2
 +
 +
 + # Alter topic other params
 + --alter --topic "myFirstTopic" --config retention.ms=50000000 --config compression.type=gzip
 +
 + # Alter topic delete param
 + --alter --topic "myFirstTopic" --delete-config retention.ms
 +</code>
 +</details>
 +
 +
 <details> <details>
 +<summary>:!: Ручная балансировка</summary>
 +Файл для перебалансировки
 +<code json>
 +{"version":1,
 + "partitions":[
 +     {"topic":"__consumer_offsets","partition":4,"replicas":[2,0,1]},
 +     {"topic":"__consumer_offsets","partition":5,"replicas":[2,0,1]},
 +     {"topic":"__consumer_offsets","partition":6,"replicas":[2,0,1]}
 +]}
 +</code>
 +
 +<code bash>
 +    # Применение изменений
 +bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --command-config config/credentials.conf --reassignment-json-file rebalance.json --execute
 +
 +   # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами
 +bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --admin.config config/auth_ssl.properties --election-type preferred --all-topic-partitions
 +
 +# Либо это произойдет автоматом при включенном параметре "auto.leader.rebalance.enable=false"
 +</code>
 +
 +
 +Есть еще вроде как перебалансировка в утилите kafkactl\\
 +но тут что то непонятно, фактически это работа с фактором репликации, который побочно влияет на распределение\\
 +нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\
 +<code bash>
 +kafkactl alter topic brain-finres-events-test --replication-factor 3
 +</code>
 +
 +</details>
 +
  
  
Строка 257: Строка 332:
   # Можно перечислять несколько сущностей   # Можно перечислять несколько сущностей
  --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres   --remove --allow-principal User:svc-brain --allow-principal User:svc-brain-finres 
---allow-principal User:svc-hyper-mes-prod  --operation WRITE --operation READ --operation DESCRIBE  + --allow-principal User:svc-hyper-mes-prod  --operation WRITE --operation READ --operation DESCRIBE  
---topic brain-finres-events-test+ --topic brain-finres-events-test
  
  
Строка 290: Строка 365:
 </code> </code>
 </details> </details>
 +
 +
 +<details>
 +<summary>:!: kafkactl</summary>
 +[[https://github.com/deviceinsight/kafkactl|Github]]\\
 +Конфиг файл для работы **~/.config/kafkactl/config.yml**\\
 +<code bash>
 +contexts:
 +  default:
 +    brokers:
 +      - localhost:9094
 +      - localhost:9096
 +
 +current-context: default
 +</code>
 +
 +Еще пример
 +<code bash>
 +contexts:
 +  default:
 +    brokers:
 +      - bootstrap-kafka1:9092
 +      - bootstrap-kafka2:9092
 +    sasl:
 +      enabled: true
 +      mechanism: scram-sha512
 +      password: 
 +      username: admin
 +    tls:
 +      enabled: true
 +      insecure: true
 +
 +
 +current-context: default
 +</code>
 +
 +
 +Некоторые команды
 +<code bash>
 +kafkactl 
 +# Некоторые команды первого уровня
 + create
 + list
 + describe
 + alter
 + delete
 +# Затем
 + broker
 + topic
 + consumer-group
 +# В общем то логика простая
 +
 +# Некоторые примеры
 +kafkactl create topic secondTop --partitions 4 --replicatoin-factor 2
 +kafkactl describe broker 0
 +
 + # Send message
 +kafkactl produce firstTop --key=my-key --value=my-value
 +
 + # Describe group
 +kafkactl describe consumer-group test_app
 +
 +
 +  # Смена контекста из конфига
 +kafkactl config use-context my_other_contx
 +</code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: Docker</summary>
 +Compose файл, zookeeper и два брокера\\
 +<code yaml>
 +version: "2"
 +
 +services:
 +  zookeeper:
 +    image: docker.io/bitnami/zookeeper:3.9
 +    ports:
 +      - "2181:2181"
 +    volumes:
 +      - "zookeeper_data:/bitnami"
 +    environment:
 +      - ALLOW_ANONYMOUS_LOGIN=yes
 +
 +  kafka0:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9094:9094"
 +    volumes:
 +      - "kafka_data0:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +      - KAFKA_CFG_BROKER_ID=0
 + 
 +      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
 +      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka0:9092,EXTERNAL://localhost:9094
 +      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
 +
 +    depends_on:
 +      - zookeeper
 +
 +  kafka1:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9096:9096"
 +    volumes:
 +      - "kafka_data1:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +      - KAFKA_CFG_BROKER_ID=1
 +
 +      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9095,CONTROLLER://:9097,EXTERNAL://:9096
 +      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9095,EXTERNAL://localhost:9096
 +      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
 +
 +    depends_on:
 +      - zookeeper
 +
 +volumes:
 +  zookeeper_data:
 +    driver: local
 +  kafka_data0:
 +    driver: local
 +  kafka_data1:
 +    driver: local
 +</code>
 +
 +Без кластера конечно короче\\
 +<code yml>
 +version: "2"
 +
 +services:
 +  zookeeper:
 +    image: docker.io/bitnami/zookeeper:3.9
 +    ports:
 +      - "2181:2181"
 +    volumes:
 +      - "zookeeper_data:/bitnami"
 +    environment:
 +      - ALLOW_ANONYMOUS_LOGIN=yes
 +  kafka:
 +    image: docker.io/bitnami/kafka:3.4
 +    ports:
 +      - "9092:9092"
 +    volumes:
 +      - "kafka_data:/bitnami"
 +    environment:
 +      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 +    depends_on:
 +      - zookeeper
 +
 +volumes:
 +  zookeeper_data:
 +    driver: local
 +  kafka_data:
 +    driver: local
 +</code>
 +</details>
 +
 +
 +<details>
 +<summary>:!: </summary>
 +<code bash>
 +
 +</code>
 +</details>
 +
 +
 +==== Epoch ====
 +
 +
 +==== Overs ====
 +В Кафке по умолчанию вычитывание сообщений из партиции останавливается, когда получатель доходит до битого сообщения, и до тех пор, пока оно не будет пропущено и закинуто в “карантинную” очередь (также именуемой “dead letter queue”) для последующей обработки, чтение партиции продолжить не получится.
 +
 +ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, чтобы каждая партиция имела одного и только одного читателя
 +
 +
 +==== Параметры ====
 +
 +=== ? Параметры потребителей ? ===
 +
 +**max.poll.records**\\
 +По умолчанию 500, максимальное кол-во записей, возвращаемых одним вызовом **poll()**\\
 +Изменение вроде не сильно влияет, потребитель кэширует записи и постепенно возвращает оттуда\\
 +
 +
 +**max.poll.interval.ms**\\
 +По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, если метод **poll()** не вызывался в течении этого времени то клиент считается отвалилшимся и консьюмер группа начинает перебалансировку\\
 +
 +
 +<details>
 +<summary>:!: Быстродействие потребителя</summary>
 +Два выше рассмотренных свойства задают требования к приложению клиента, оно должно потреблять "max.poll.records" за "max.poll.interval.ms"\\
 +
 +Увеличение "max.poll.records" может снижить пропускную способность изза роста накладных расходов\\
 +Увеличение "max.poll.interval.ms" может замедлить скорость отклика при перебалансировке потребителей\\
 +</details>
 +
 +
 +**fetch.max.bytes**\\
 +Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с "request.timeout.ms"\\
 +
 +
 +**request.timeout.ms**\\
 +Таймаут, за который консьюмер ожидает запрошенные данные от брокера\\
 +
 +
 +**group.instance.id**\\
 +Что то связано со статическими потребителями, и отвал статического будет по истечении **session.timeout.ms**\\
 +
 +
 +
 +
 +==== CLI ====
 +=== Topics ===
 +
 +<details>
 +<summary>:!: Расширенный вывод (desribe)</summary>
 +По мимо всего прочего, показано 3 столбца:\\
 +**Leader**\\
 +Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\
 +Вроде вручную это не регулируется\\
 +
 +
 +**Replicas**\\
 +Указаны брокеры которые реплицируют данные партиции, вне зависимости от лидерства\\
 +Первый идентификатор представляет предпочтительного лидера, поэтому кафка попытается сделать его лидером партиции\\
 +:!: В случае отвала брокера, резервный лидер партиции выбирается именно из этого столбца, проверено практикой, надежно\\
 +
 +
 +**Isr**\\
 +Означает синхронизированную реплику.\\
 +Сообщения шлются в лидера, затем если есть репли-фактор то фоловеры копируют новые сообщения себе, вычитывают.\\
 +Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\
 +Здесь что то тоже упоминается про приоритет выбора, может это про контроллер ? (пока один раз сошлось, хотя нет)\\
 +
 +
 +
 +</details>
 +
 +
 +
 +
 +====  ====
 +====  ====
 +====  ====
 +====  ====
 +
 +
 +<details>
 +<summary>:!: </summary>
 +
 +</details>
 +
 +
 +
 +
 +
 +
 +
 +
 +
 +
  
linux/kafka.1684293063.txt.gz · Последнее изменение: 2023/05/17 03:11 — admin