Для использования функционала logback нужны доп модули, поэтому нужно ставить зависимости
В данном примере для установки зависимостей используется Maven
Формируется манифест, итог нужно компилировать в jar файл, включая зависимости
src/main/java/TestLog.java
package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestLog{ public static final Logger logger = LoggerFactory.getLogger(TestLog.class); public static void main(String[] args){ System.out.println("Hello world"); logger.info("==== Start program ===="); for (int i=0; i != 100; i += 1){ logger.info("{} - write ro log in loop", i); } } }
src/main/recources/logback.xml
<?xml version="1.0" encoding="UTF-8" ?> <configuration scan="true" scanPeriod="5 seconds" debug="true"> <appender name="ConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <target>System.out</target> <encoder> <pattern>%d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n </pattern> </encoder> </appender> <appender name="TestLogAppender" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>log/test_log.log</file> <append>true</append> <encoder> <pattern>%d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n </pattern> </encoder> <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> <fileNamePattern>log/test_log-%d{yyyy-MM-dd}.%i.zip</fileNamePattern> <maxFileSize>200KB</maxFileSize> </rollingPolicy> </appender> <root level="DEBUG"> <appender-ref ref="TestLogAppender"/> <appender-ref ref="ConsoleAppender"/> </root> </configuration>
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>TestLog</artifactId> <version>1.2-SNAPSHOT</version> <name>TestLog</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.4.7</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.4.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.6.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.example.TestLog</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
На выходе, собранная джарка включает в себя все подключенные пакеты, файлы «*.class», logback.xml, в корне, манифест
Вывод сообщения
файл TestLog.java
public class TestLog{ public static final Logger logger = LoggerFactory.getLogger(TestLog.class); public static void main(String[] args){ System.out.println("Hello world"); } }
Далее компилируем и запускаем
javac TestLog.jar java TestLog # Запуск без манифеста java -cp jarfilename.jar com.example.MainClass
Применяется сборка Maven
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.consumertest</groupId> <artifactId>ConsumerTest</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.13</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.13</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>ConsumerTest</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
// Часть импортов похоже не нужна import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import java.util.Collections; import java.util.Properties; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.List; import java.util.stream.Collectors; public class ConsumerTest { public static void main(String[] args) { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096"); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //props.setProperty("enable.auto.commit", "true"); //props.setProperty("auto.commit.interval.ms", "1000"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); String topic = "secondTop"; ///////////////////////////////////////////////////////// /// Ручное назначение партиций /* TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); TopicPartition partition2 = new TopicPartition(topic, 2); TopicPartition partition3 = new TopicPartition(topic, 3); consumer.assign(Arrays.asList(partition0, partition1, partition2, partition3)); */ ///////////////////////////////////////////////////////// // Пример с автополучением и назначением всех имеющихся партиций final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); final Map<TopicPartition, Long> partitions = partitionInfos.stream() .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) .collect(Collectors.toMap( partitionInfo -> partitionInfo, partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) ); // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции. final Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions); // Подписываемся на все партиции в топике. consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); // Что то не работает, возвращает Null иногда, с-но ловим NPE // Выставляем сдвиги для каждой партиции. //topicPartitionOffsetAndTimestampMap.forEach( // (topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); ///////////////////////////////////////////////////// /// Подписка на топик //consumer.subscribe(Arrays.asList(topic)); consumer.seekToBeginning(consumer.assignment()); // Read while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); consumer.commitAsync(); // Этот кусок для искуственной задержки между итерациями try { Thread.sleep(2000); } catch(InterruptedException e) { // this part is executed when an exception (in this example InterruptedException) occurs } } } } }
mvn clean install java -jar target/ConsumerTest-1.0-SNAPSHOT-jar-with-dependencies.jar
Еще пример, с явным назначением каждой партиции и заданием смещения каждой
со смещением правда неясно, какое именно задается, вроде как какое то последнее..
стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.util.Properties; import java.time.Duration; import java.util.Map; import java.util.List; import java.util.stream.Collectors; public class ConsumerTest { public static void main(String[] args) throws InterruptedException { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096"); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //props.setProperty("enable.auto.commit", "true"); //props.setProperty("auto.commit.interval.ms", "1000"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); String topic = "secondTop"; // Автополучение и назначением всех имеющихся партиций // перечень имеющихся партиций final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); // собираем в мапу, каждой ставится текущий timestamp final Map<TopicPartition, Long> partitions = partitionInfos.stream() .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) .collect(Collectors.toMap( partitionInfo -> partitionInfo, partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis()) ); // echo partitions.forEach((key, value) -> System.out.println(key + ":" + value)); // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции. // комент из игаса, не совсем явно какие именно задаются смещения, но какие то задаются, каждой партиции final Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions); // echo System.out.println("<=======>"); topicPartitionOffsetAndTimestampMap.forEach((key, value) -> System.out.println(key + ":" + value)); // Подписываемся на все партиции в топике. consumer.assign(topicPartitionOffsetAndTimestampMap.keySet()); // выставляем сдвиги на каждой партиции topicPartitionOffsetAndTimestampMap.forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); // Read topic while (true) { // получаем очередную запись ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); consumer.commitAsync(); // Искуственная задержка между итерациями чтения, 1сек Thread.sleep(1000); } } } }