Показаны различия между двумя версиями страницы.
Предыдущая версия справа и слева Предыдущая версия Следующая версия | Предыдущая версия | ||
linux:kafka [2024/04/14 09:51] admin |
linux:kafka [2024/05/10 05:57] (текущий) admin |
||
---|---|---|---|
Строка 94: | Строка 94: | ||
* Headers - пользовательские атрибуты, | * Headers - пользовательские атрибуты, | ||
</ | </ | ||
+ | |||
+ | |||
+ | |||
+ | ==== Конфигурация ==== | ||
+ | |||
+ | < | ||
+ | < | ||
+ | {{: | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
Строка 203: | Строка 225: | ||
bin/ | bin/ | ||
bin/ | bin/ | ||
+ | </ | ||
- | # Перебалансировка | ||
- | bin/ | ||
- | |||
- | # Сам файл для перебалансировки | ||
- | {" | ||
- | " | ||
- | | ||
- | |||
- | ]} | ||
- | </ | ||
Строка 263: | Строка 276: | ||
| | ||
</ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Файл для перебалансировки | ||
+ | <code json> | ||
+ | {" | ||
+ | " | ||
+ | | ||
+ | | ||
+ | | ||
+ | ]} | ||
+ | </ | ||
+ | |||
+ | <code bash> | ||
+ | # Применение изменений | ||
+ | bin/ | ||
+ | |||
+ | # Еще требуется тригернуть непосредственно миграцию лидеров в соответствии с приоритетами | ||
+ | bin/ | ||
+ | |||
+ | # Либо это произойдет автоматом при включенном параметре " | ||
+ | </ | ||
+ | |||
+ | |||
+ | Есть еще вроде как перебалансировка в утилите kafkactl\\ | ||
+ | но тут что то непонятно, | ||
+ | нормально распределить удалось только снизив кол-во до единицы и снова подняв до 3х\\ | ||
+ | <code bash> | ||
+ | kafkactl alter topic brain-finres-events-test --replication-factor 3 | ||
+ | </ | ||
+ | |||
</ | </ | ||
Строка 334: | Строка 380: | ||
current-context: | current-context: | ||
</ | </ | ||
+ | |||
+ | Еще пример | ||
+ | <code bash> | ||
+ | contexts: | ||
+ | default: | ||
+ | brokers: | ||
+ | - bootstrap-kafka1: | ||
+ | - bootstrap-kafka2: | ||
+ | sasl: | ||
+ | enabled: true | ||
+ | mechanism: scram-sha512 | ||
+ | password: | ||
+ | username: admin | ||
+ | tls: | ||
+ | enabled: true | ||
+ | insecure: true | ||
+ | |||
+ | |||
+ | current-context: | ||
+ | </ | ||
+ | |||
Некоторые команды | Некоторые команды | ||
Строка 359: | Строка 426: | ||
# Describe group | # Describe group | ||
kafkactl describe consumer-group test_app | kafkactl describe consumer-group test_app | ||
+ | |||
+ | |||
+ | # Смена контекста из конфига | ||
+ | kafkactl config use-context my_other_contx | ||
</ | </ | ||
</ | </ | ||
Строка 419: | Строка 490: | ||
driver: local | driver: local | ||
kafka_data1: | kafka_data1: | ||
+ | driver: local | ||
+ | </ | ||
+ | |||
+ | Без кластера конечно короче\\ | ||
+ | <code yml> | ||
+ | version: " | ||
+ | |||
+ | services: | ||
+ | zookeeper: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - ALLOW_ANONYMOUS_LOGIN=yes | ||
+ | kafka: | ||
+ | image: docker.io/ | ||
+ | ports: | ||
+ | - " | ||
+ | volumes: | ||
+ | - " | ||
+ | environment: | ||
+ | - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper: | ||
+ | depends_on: | ||
+ | - zookeeper | ||
+ | |||
+ | volumes: | ||
+ | zookeeper_data: | ||
+ | driver: local | ||
+ | kafka_data: | ||
driver: local | driver: local | ||
</ | </ | ||
Строка 430: | Строка 532: | ||
</ | </ | ||
</ | </ | ||
+ | |||
+ | |||
+ | ==== Epoch ==== | ||
+ | |||
+ | |||
+ | ==== Overs ==== | ||
+ | В Кафке по умолчанию вычитывание сообщений из партиции останавливается, | ||
+ | |||
+ | ZooKeeper при подключении нового читателя производит перераспределение участников в Consumer Group таким образом, | ||
+ | |||
+ | |||
+ | ==== Параметры ==== | ||
+ | |||
+ | === ? Параметры потребителей ? === | ||
+ | |||
+ | **max.poll.records**\\ | ||
+ | По умолчанию 500, максимальное кол-во записей, | ||
+ | Изменение вроде не сильно влияет, | ||
+ | |||
+ | |||
+ | **max.poll.interval.ms**\\ | ||
+ | По умолчанию 5мин, макс задержка между вызовами **poll()**. т.е. время в течении которого потребитель может бездействовать, | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Два выше рассмотренных свойства задают требования к приложению клиента, | ||
+ | |||
+ | Увеличение " | ||
+ | Увеличение " | ||
+ | </ | ||
+ | |||
+ | |||
+ | **fetch.max.bytes**\\ | ||
+ | Дефолт 50мб, макс размер пакета запрашиваемого консьюмером во время чтения. Концептуально связан с " | ||
+ | |||
+ | |||
+ | **request.timeout.ms**\\ | ||
+ | Таймаут, | ||
+ | |||
+ | |||
+ | **group.instance.id**\\ | ||
+ | Что то связано со статическими потребителями, | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ==== CLI ==== | ||
+ | === Topics === | ||
+ | |||
+ | < | ||
+ | < | ||
+ | По мимо всего прочего, | ||
+ | **Leader**\\ | ||
+ | Указан брокер где находится лидер-партиция. Кафка равномерно распределяет лидеров между досутпными брокерами\\ | ||
+ | Вроде вручную это не регулируется\\ | ||
+ | |||
+ | |||
+ | **Replicas**\\ | ||
+ | Указаны брокеры которые реплицируют данные партиции, | ||
+ | Первый идентификатор представляет предпочтительного лидера, | ||
+ | :!: В случае отвала брокера, | ||
+ | |||
+ | |||
+ | **Isr**\\ | ||
+ | Означает синхронизированную реплику.\\ | ||
+ | Сообщения шлются в лидера, | ||
+ | Брокер считает засинхронизирован если не сильно отстает (Replica.lag.time.max.ms)\\ | ||
+ | Здесь что то тоже упоминается про приоритет выбора, | ||
+ | |||
+ | |||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | ==== ==== | ||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||