Инструменты пользователя

Инструменты сайта


develop:java:practics_overall

Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
develop:java:practics_overall [2024/04/27 08:07]
admin [Over]
develop:java:practics_overall [2024/04/27 09:49] (текущий)
admin
Строка 335: Строка 335:
 java -jar target/ConsumerTest-1.0-SNAPSHOT-jar-with-dependencies.jar java -jar target/ConsumerTest-1.0-SNAPSHOT-jar-with-dependencies.jar
 </code> </code>
 +
 +
 +Еще пример, с явным назначением каждой партиции и заданием смещения каждой\\
 +со смещением правда неясно, какое именно задается, вроде как какое то последнее..\\
 +стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм\\
 +<code java>
 +import org.apache.kafka.clients.consumer.*;
 +import org.apache.kafka.common.serialization.StringDeserializer;
 +import org.apache.kafka.common.PartitionInfo;
 +import org.apache.kafka.common.TopicPartition;
 +import java.util.Properties;
 +import java.time.Duration;
 +import java.util.Map;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +
 +public class ConsumerTest {
 +    public static void main(String[] args) throws InterruptedException {
 +
 + Properties consumerProperties = new Properties();
 + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096");
 + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app");
 +
 + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 + //props.setProperty("enable.auto.commit", "true");
 + //props.setProperty("auto.commit.interval.ms", "1000");
 +
 + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 +
 + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
 + String topic = "secondTop";
 +
 + // Автополучение и назначением всех имеющихся партиций
 + // перечень имеющихся партиций
 + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
 + // собираем в мапу, каждой ставится текущий timestamp 
 +        final Map<TopicPartition, Long> partitions = partitionInfos.stream()
 +                .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
 +                .collect(Collectors.toMap(
 +                        partitionInfo -> partitionInfo,
 +                        partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis())
 +                );
 + // echo 
 + partitions.forEach((key, value) -> System.out.println(key + ":" + value));
 +
 +
 + // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции.
 + // комент из игаса, не совсем явно какие именно задаются смещения, но какие то задаются, каждой партиции
 + final Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions);
 + // echo 
 + System.out.println("<=======>");
 + topicPartitionOffsetAndTimestampMap.forEach((key, value) -> System.out.println(key + ":" + value));
 +
 + // Подписываемся на все партиции в топике.
 +        consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
 +
 + // выставляем сдвиги на каждой партиции
 + topicPartitionOffsetAndTimestampMap.forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); 
 +
 + // Read topic 
 + while (true) 
 + {
 + // получаем очередную запись
 + ConsumerRecords<String, String> records = consumer.poll(100);
 + for (ConsumerRecord<String, String> record : records) 
 + {
 + System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
 + consumer.commitAsync();
 + // Искуственная задержка между итерациями чтения, 1сек
 + Thread.sleep(1000);
 + }
 + }
 +    }
 +}
 +</code>
 +
 </details> </details>
  
  
develop/java/practics_overall.1714205232.txt.gz · Последнее изменение: 2024/04/27 08:07 — admin