Показаны различия между двумя версиями страницы.
Предыдущая версия справа и слева Предыдущая версия Следующая версия | Предыдущая версия | ||
develop:java:practics_overall [2024/04/14 09:57] admin |
develop:java:practics_overall [2024/04/27 09:49] (текущий) admin |
||
---|---|---|---|
Строка 332: | Строка 332: | ||
<code bash> | <code bash> | ||
- | mvn clear install | + | mvn clean install |
java -jar target/ | java -jar target/ | ||
</ | </ | ||
+ | |||
+ | |||
+ | Еще пример, | ||
+ | со смещением правда неясно, | ||
+ | стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм\\ | ||
+ | <code java> | ||
+ | import org.apache.kafka.clients.consumer.*; | ||
+ | import org.apache.kafka.common.serialization.StringDeserializer; | ||
+ | import org.apache.kafka.common.PartitionInfo; | ||
+ | import org.apache.kafka.common.TopicPartition; | ||
+ | import java.util.Properties; | ||
+ | import java.time.Duration; | ||
+ | import java.util.Map; | ||
+ | import java.util.List; | ||
+ | import java.util.stream.Collectors; | ||
+ | |||
+ | public class ConsumerTest { | ||
+ | public static void main(String[] args) throws InterruptedException { | ||
+ | |||
+ | Properties consumerProperties = new Properties(); | ||
+ | consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | ||
+ | consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, | ||
+ | |||
+ | consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | ||
+ | // | ||
+ | // | ||
+ | |||
+ | consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | ||
+ | consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | ||
+ | |||
+ | KafkaConsumer< | ||
+ | String topic = " | ||
+ | |||
+ | // Автополучение и назначением всех имеющихся партиций | ||
+ | // перечень имеющихся партиций | ||
+ | final List< | ||
+ | // собираем в мапу, каждой ставится текущий timestamp | ||
+ | final Map< | ||
+ | .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), | ||
+ | .collect(Collectors.toMap( | ||
+ | partitionInfo -> partitionInfo, | ||
+ | partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) | ||
+ | ); | ||
+ | // echo | ||
+ | partitions.forEach((key, | ||
+ | |||
+ | |||
+ | // Получаем смещения, | ||
+ | // комент из игаса, не совсем явно какие именно задаются смещения, | ||
+ | final Map< | ||
+ | // echo | ||
+ | System.out.println("< | ||
+ | topicPartitionOffsetAndTimestampMap.forEach((key, | ||
+ | |||
+ | // Подписываемся на все партиции в топике. | ||
+ | consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); | ||
+ | |||
+ | // выставляем сдвиги на каждой партиции | ||
+ | topicPartitionOffsetAndTimestampMap.forEach((topicPartition, | ||
+ | |||
+ | // Read topic | ||
+ | while (true) | ||
+ | { | ||
+ | // получаем очередную запись | ||
+ | ConsumerRecords< | ||
+ | for (ConsumerRecord< | ||
+ | { | ||
+ | System.out.printf(" | ||
+ | consumer.commitAsync(); | ||
+ | // Искуственная задержка между итерациями чтения, | ||
+ | Thread.sleep(1000); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
</ | </ | ||