====== Practics_GUI ====== ===== Логирование =====
:!: Пример с логированием. logback.xml Для использования функционала logback нужны доп модули, поэтому нужно ставить зависимости\\ В данном примере для установки зависимостей используется Maven\\ Формируется манифест, итог нужно компилировать в jar файл, включая зависимости\\ src/main/java/TestLog.java package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestLog{ public static final Logger logger = LoggerFactory.getLogger(TestLog.class); public static void main(String[] args){ System.out.println("Hello world"); logger.info("==== Start program ===="); for (int i=0; i != 100; i += 1){ logger.info("{} - write ro log in loop", i); } } } src/main/recources/logback.xml System.out %d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n log/test_log.log true %d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n log/test_log-%d{yyyy-MM-dd}.%i.zip 200KB pom.xml 4.0.0 com.example TestLog 1.2-SNAPSHOT TestLog UTF-8 1.8 1.8 ch.qos.logback logback-core 1.4.7 ch.qos.logback logback-classic 1.4.7 org.slf4j slf4j-api 2.0.7 org.apache.maven.plugins maven-assembly-plugin 3.6.0 package single com.example.TestLog jar-with-dependencies На выходе, собранная джарка включает в себя все подключенные пакеты, файлы "*.class", logback.xml, в корне, манифест\\
===== Over =====
:!: Минимальный пример Вывод сообщения\\ файл TestLog.java public class TestLog{ public static final Logger logger = LoggerFactory.getLogger(TestLog.class); public static void main(String[] args){ System.out.println("Hello world"); } } Далее компилируем и запускаем\\ javac TestLog.jar java TestLog # Запуск без манифеста java -cp jarfilename.jar com.example.MainClass
:!: Консьюмер для кафки Применяется сборка Maven\\ pom.xml 4.0.0 com.consumertest ConsumerTest 1.0-SNAPSHOT org.apache.kafka kafka-clients 3.4.0 org.slf4j slf4j-api 2.0.13 org.slf4j slf4j-simple 2.0.13 org.apache.maven.plugins maven-assembly-plugin package single ConsumerTest jar-with-dependencies // Часть импортов похоже не нужна import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import java.util.Collections; import java.util.Properties; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.List; import java.util.stream.Collectors; public class ConsumerTest { public static void main(String[] args) { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096"); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //props.setProperty("enable.auto.commit", "true"); //props.setProperty("auto.commit.interval.ms", "1000"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties); String topic = "secondTop"; ///////////////////////////////////////////////////////// /// Ручное назначение партиций /* TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); TopicPartition partition2 = new TopicPartition(topic, 2); TopicPartition partition3 = new TopicPartition(topic, 3); consumer.assign(Arrays.asList(partition0, partition1, partition2, partition3)); */ ///////////////////////////////////////////////////////// // Пример с автополучением и назначением всех имеющихся партиций final List partitionInfos = consumer.partitionsFor(topic); final Map partitions = partitionInfos.stream() .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) .collect(Collectors.toMap( partitionInfo -> partitionInfo, partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) ); // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции. final Map topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions); // Подписываемся на все партиции в топике. consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); // Что то не работает, возвращает Null иногда, с-но ловим NPE // Выставляем сдвиги для каждой партиции. //topicPartitionOffsetAndTimestampMap.forEach( // (topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); ///////////////////////////////////////////////////// /// Подписка на топик //consumer.subscribe(Arrays.asList(topic)); consumer.seekToBeginning(consumer.assignment()); // Read while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); consumer.commitAsync(); // Этот кусок для искуственной задержки между итерациями try { Thread.sleep(2000); } catch(InterruptedException e) { // this part is executed when an exception (in this example InterruptedException) occurs } } } } } mvn clean install java -jar target/ConsumerTest-1.0-SNAPSHOT-jar-with-dependencies.jar Еще пример, с явным назначением каждой партиции и заданием смещения каждой\\ со смещением правда неясно, какое именно задается, вроде как какое то последнее..\\ стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм\\ import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.util.Properties; import java.time.Duration; import java.util.Map; import java.util.List; import java.util.stream.Collectors; public class ConsumerTest { public static void main(String[] args) throws InterruptedException { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096"); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //props.setProperty("enable.auto.commit", "true"); //props.setProperty("auto.commit.interval.ms", "1000"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties); String topic = "secondTop"; // Автополучение и назначением всех имеющихся партиций // перечень имеющихся партиций final List partitionInfos = consumer.partitionsFor(topic); // собираем в мапу, каждой ставится текущий timestamp final Map partitions = partitionInfos.stream() .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) .collect(Collectors.toMap( partitionInfo -> partitionInfo, partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) ); // echo partitions.forEach((key, value) -> System.out.println(key + ":" + value)); // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции. // комент из игаса, не совсем явно какие именно задаются смещения, но какие то задаются, каждой партиции final Map topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions); // echo System.out.println("<=======>"); topicPartitionOffsetAndTimestampMap.forEach((key, value) -> System.out.println(key + ":" + value)); // Подписываемся на все партиции в топике. consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); // выставляем сдвиги на каждой партиции topicPartitionOffsetAndTimestampMap.forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); // Read topic while (true) { // получаем очередную запись ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); consumer.commitAsync(); // Искуственная задержка между итерациями чтения, 1сек Thread.sleep(1000); } } } }