Показаны различия между двумя версиями страницы.
Предыдущая версия справа и слева Предыдущая версия Следующая версия | Предыдущая версия | ||
develop:java:practics_overall [2024/03/02 12:53] admin |
develop:java:practics_overall [2024/04/27 09:49] (текущий) admin |
||
---|---|---|---|
Строка 133: | Строка 133: | ||
</ | </ | ||
</ | </ | ||
+ | |||
+ | На выходе, | ||
</ | </ | ||
Строка 159: | Строка 161: | ||
javac TestLog.jar | javac TestLog.jar | ||
java TestLog | java TestLog | ||
+ | |||
+ | # Запуск без манифеста | ||
+ | java -cp jarfilename.jar com.example.MainClass | ||
</ | </ | ||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | < | ||
+ | < | ||
+ | Применяется сборка Maven\\ | ||
+ | pom.xml | ||
+ | <code xml> | ||
+ | <?xml version=" | ||
+ | <project xmlns=" | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | |||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | </ | ||
+ | |||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | </ | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | < | ||
+ | </ | ||
+ | </ | ||
+ | < | ||
+ | < | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
+ | </ | ||
+ | |||
+ | <code java> | ||
+ | // Часть импортов похоже не нужна | ||
+ | import org.apache.kafka.clients.consumer.*; | ||
+ | import org.apache.kafka.clients.consumer.Consumer; | ||
+ | import org.apache.kafka.common.serialization.LongDeserializer; | ||
+ | import org.apache.kafka.common.serialization.StringDeserializer; | ||
+ | |||
+ | import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
+ | import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
+ | import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
+ | import org.apache.kafka.clients.consumer.OffsetAndTimestamp; | ||
+ | import org.apache.kafka.common.PartitionInfo; | ||
+ | import org.apache.kafka.common.TopicPartition; | ||
+ | import org.apache.kafka.common.record.TimestampType; | ||
+ | |||
+ | import java.util.Collections; | ||
+ | import java.util.Properties; | ||
+ | import java.time.Duration; | ||
+ | import java.util.Arrays; | ||
+ | import java.util.Map; | ||
+ | import java.util.List; | ||
+ | import java.util.stream.Collectors; | ||
+ | |||
+ | public class ConsumerTest { | ||
+ | public static void main(String[] args) { | ||
+ | |||
+ | Properties consumerProperties = new Properties(); | ||
+ | consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | ||
+ | consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, | ||
+ | |||
+ | consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | ||
+ | // | ||
+ | // | ||
+ | |||
+ | consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | ||
+ | consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | ||
+ | |||
+ | KafkaConsumer< | ||
+ | |||
+ | String topic = " | ||
+ | ///////////////////////////////////////////////////////// | ||
+ | /// Ручное назначение партиций | ||
+ | /* | ||
+ | TopicPartition partition0 = new TopicPartition(topic, | ||
+ | TopicPartition partition1 = new TopicPartition(topic, | ||
+ | TopicPartition partition2 = new TopicPartition(topic, | ||
+ | TopicPartition partition3 = new TopicPartition(topic, | ||
+ | consumer.assign(Arrays.asList(partition0, | ||
+ | */ | ||
+ | |||
+ | ///////////////////////////////////////////////////////// | ||
+ | // Пример с автополучением и назначением всех имеющихся партиций | ||
+ | final List< | ||
+ | |||
+ | final Map< | ||
+ | .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), | ||
+ | .collect(Collectors.toMap( | ||
+ | partitionInfo -> partitionInfo, | ||
+ | partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) | ||
+ | ); | ||
+ | |||
+ | // Получаем смещения, | ||
+ | final Map< | ||
+ | consumer.offsetsForTimes(partitions); | ||
+ | |||
+ | // Подписываемся на все партиции в топике. | ||
+ | consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); | ||
+ | |||
+ | // Что то не работает, | ||
+ | // Выставляем сдвиги для каждой партиции. | ||
+ | // | ||
+ | // (topicPartition, | ||
+ | ///////////////////////////////////////////////////// | ||
+ | |||
+ | |||
+ | /// Подписка на топик | ||
+ | // | ||
+ | consumer.seekToBeginning(consumer.assignment()); | ||
+ | |||
+ | // Read | ||
+ | while (true) { | ||
+ | ConsumerRecords< | ||
+ | for (ConsumerRecord< | ||
+ | System.out.printf(" | ||
+ | consumer.commitAsync(); | ||
+ | |||
+ | // Этот кусок для искуственной задержки между итерациями | ||
+ | try | ||
+ | { | ||
+ | Thread.sleep(2000); | ||
+ | } | ||
+ | catch(InterruptedException e) | ||
+ | { | ||
+ | // this part is executed when an exception (in this example InterruptedException) occurs | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | |||
+ | <code bash> | ||
+ | mvn clean install | ||
+ | java -jar target/ | ||
+ | </ | ||
+ | |||
+ | |||
+ | Еще пример, | ||
+ | со смещением правда неясно, | ||
+ | стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм\\ | ||
+ | <code java> | ||
+ | import org.apache.kafka.clients.consumer.*; | ||
+ | import org.apache.kafka.common.serialization.StringDeserializer; | ||
+ | import org.apache.kafka.common.PartitionInfo; | ||
+ | import org.apache.kafka.common.TopicPartition; | ||
+ | import java.util.Properties; | ||
+ | import java.time.Duration; | ||
+ | import java.util.Map; | ||
+ | import java.util.List; | ||
+ | import java.util.stream.Collectors; | ||
+ | |||
+ | public class ConsumerTest { | ||
+ | public static void main(String[] args) throws InterruptedException { | ||
+ | |||
+ | Properties consumerProperties = new Properties(); | ||
+ | consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | ||
+ | consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, | ||
+ | |||
+ | consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, | ||
+ | // | ||
+ | // | ||
+ | |||
+ | consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | ||
+ | consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | ||
+ | |||
+ | KafkaConsumer< | ||
+ | String topic = " | ||
+ | |||
+ | // Автополучение и назначением всех имеющихся партиций | ||
+ | // перечень имеющихся партиций | ||
+ | final List< | ||
+ | // собираем в мапу, каждой ставится текущий timestamp | ||
+ | final Map< | ||
+ | .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), | ||
+ | .collect(Collectors.toMap( | ||
+ | partitionInfo -> partitionInfo, | ||
+ | partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) | ||
+ | ); | ||
+ | // echo | ||
+ | partitions.forEach((key, | ||
+ | |||
+ | |||
+ | // Получаем смещения, | ||
+ | // комент из игаса, не совсем явно какие именно задаются смещения, | ||
+ | final Map< | ||
+ | // echo | ||
+ | System.out.println("< | ||
+ | topicPartitionOffsetAndTimestampMap.forEach((key, | ||
+ | |||
+ | // Подписываемся на все партиции в топике. | ||
+ | consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); | ||
+ | |||
+ | // выставляем сдвиги на каждой партиции | ||
+ | topicPartitionOffsetAndTimestampMap.forEach((topicPartition, | ||
+ | |||
+ | // Read topic | ||
+ | while (true) | ||
+ | { | ||
+ | // получаем очередную запись | ||
+ | ConsumerRecords< | ||
+ | for (ConsumerRecord< | ||
+ | { | ||
+ | System.out.printf(" | ||
+ | consumer.commitAsync(); | ||
+ | // Искуственная задержка между итерациями чтения, | ||
+ | Thread.sleep(1000); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | |||
</ | </ | ||