====== Practics_GUI ======
===== Логирование =====
:!: Пример с логированием. logback.xml
Для использования функционала logback нужны доп модули, поэтому нужно ставить зависимости\\
В данном примере для установки зависимостей используется Maven\\
Формируется манифест, итог нужно компилировать в jar файл, включая зависимости\\
src/main/java/TestLog.java
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestLog{
public static final Logger logger = LoggerFactory.getLogger(TestLog.class);
public static void main(String[] args){
System.out.println("Hello world");
logger.info("==== Start program ====");
for (int i=0; i != 100; i += 1){
logger.info("{} - write ro log in loop", i);
}
}
}
src/main/recources/logback.xml
System.out
%d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
log/test_log.log
true
%d{yyyy-MM-dd} %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
log/test_log-%d{yyyy-MM-dd}.%i.zip
200KB
pom.xml
4.0.0
com.example
TestLog
1.2-SNAPSHOT
TestLog
UTF-8
1.8
1.8
ch.qos.logback
logback-core
1.4.7
ch.qos.logback
logback-classic
1.4.7
org.slf4j
slf4j-api
2.0.7
org.apache.maven.plugins
maven-assembly-plugin
3.6.0
package
single
com.example.TestLog
jar-with-dependencies
На выходе, собранная джарка включает в себя все подключенные пакеты, файлы "*.class", logback.xml, в корне, манифест\\
===== Over =====
:!: Минимальный пример
Вывод сообщения\\
файл TestLog.java
public class TestLog{
public static final Logger logger = LoggerFactory.getLogger(TestLog.class);
public static void main(String[] args){
System.out.println("Hello world");
}
}
Далее компилируем и запускаем\\
javac TestLog.jar
java TestLog
# Запуск без манифеста
java -cp jarfilename.jar com.example.MainClass
:!: Консьюмер для кафки
Применяется сборка Maven\\
pom.xml
4.0.0
com.consumertest
ConsumerTest
1.0-SNAPSHOT
org.apache.kafka
kafka-clients
3.4.0
org.slf4j
slf4j-api
2.0.13
org.slf4j
slf4j-simple
2.0.13
org.apache.maven.plugins
maven-assembly-plugin
package
single
ConsumerTest
jar-with-dependencies
// Часть импортов похоже не нужна
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import java.util.Collections;
import java.util.Properties;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.List;
import java.util.stream.Collectors;
public class ConsumerTest {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//props.setProperty("enable.auto.commit", "true");
//props.setProperty("auto.commit.interval.ms", "1000");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties);
String topic = "secondTop";
/////////////////////////////////////////////////////////
/// Ручное назначение партиций
/*
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
TopicPartition partition2 = new TopicPartition(topic, 2);
TopicPartition partition3 = new TopicPartition(topic, 3);
consumer.assign(Arrays.asList(partition0, partition1, partition2, partition3));
*/
/////////////////////////////////////////////////////////
// Пример с автополучением и назначением всех имеющихся партиций
final List partitionInfos = consumer.partitionsFor(topic);
final Map partitions = partitionInfos.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toMap(
partitionInfo -> partitionInfo,
partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis())
);
// Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции.
final Map topicPartitionOffsetAndTimestampMap =
consumer.offsetsForTimes(partitions);
// Подписываемся на все партиции в топике.
consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
// Что то не работает, возвращает Null иногда, с-но ловим NPE
// Выставляем сдвиги для каждой партиции.
//topicPartitionOffsetAndTimestampMap.forEach(
// (topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset()));
/////////////////////////////////////////////////////
/// Подписка на топик
//consumer.subscribe(Arrays.asList(topic));
consumer.seekToBeginning(consumer.assignment());
// Read
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
consumer.commitAsync();
// Этот кусок для искуственной задержки между итерациями
try
{
Thread.sleep(2000);
}
catch(InterruptedException e)
{
// this part is executed when an exception (in this example InterruptedException) occurs
}
}
}
}
}
mvn clean install
java -jar target/ConsumerTest-1.0-SNAPSHOT-jar-with-dependencies.jar
Еще пример, с явным назначением каждой партиции и заданием смещения каждой\\
со смещением правда неясно, какое именно задается, вроде как какое то последнее..\\
стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм\\
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.util.Properties;
import java.time.Duration;
import java.util.Map;
import java.util.List;
import java.util.stream.Collectors;
public class ConsumerTest {
public static void main(String[] args) throws InterruptedException {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//props.setProperty("enable.auto.commit", "true");
//props.setProperty("auto.commit.interval.ms", "1000");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties);
String topic = "secondTop";
// Автополучение и назначением всех имеющихся партиций
// перечень имеющихся партиций
final List partitionInfos = consumer.partitionsFor(topic);
// собираем в мапу, каждой ставится текущий timestamp
final Map partitions = partitionInfos.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toMap(
partitionInfo -> partitionInfo,
partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis())
);
// echo
partitions.forEach((key, value) -> System.out.println(key + ":" + value));
// Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции.
// комент из игаса, не совсем явно какие именно задаются смещения, но какие то задаются, каждой партиции
final Map topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions);
// echo
System.out.println("<=======>");
topicPartitionOffsetAndTimestampMap.forEach((key, value) -> System.out.println(key + ":" + value));
// Подписываемся на все партиции в топике.
consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
// выставляем сдвиги на каждой партиции
topicPartitionOffsetAndTimestampMap.forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset()));
// Read topic
while (true)
{
// получаем очередную запись
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
{
System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
consumer.commitAsync();
// Искуственная задержка между итерациями чтения, 1сек
Thread.sleep(1000);
}
}
}
}