Инструменты пользователя

Инструменты сайта


develop:java:practics_overall

Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
Следующая версия
Предыдущая версия
develop:java:practics_overall [2024/04/14 05:46]
admin [Over]
develop:java:practics_overall [2024/04/27 09:49] (текущий)
admin
Строка 165: Строка 165:
 java -cp jarfilename.jar com.example.MainClass java -cp jarfilename.jar com.example.MainClass
 </code> </code>
 +</details>
 +
 +
 +
 +<details>
 +<summary>:!: Консьюмер для кафки</summary>
 +Применяется сборка Maven\\
 +pom.xml
 +<code xml>
 +<?xml version="1.0" encoding="UTF-8"?>
 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 +    <modelVersion>4.0.0</modelVersion>
 +    <groupId>com.consumertest</groupId>
 +    <artifactId>ConsumerTest</artifactId>
 +    <version>1.0-SNAPSHOT</version>
 +
 + <dependencies>
 + <dependency>
 + <groupId>org.apache.kafka</groupId>
 + <artifactId>kafka-clients</artifactId>
 + <version>3.4.0</version>
 + </dependency>
 +
 + <dependency>
 + <groupId>org.slf4j</groupId>
 + <artifactId>slf4j-api</artifactId>
 + <version>2.0.13</version>
 + </dependency>
 + <dependency>
 + <groupId>org.slf4j</groupId>
 + <artifactId>slf4j-simple</artifactId>
 + <version>2.0.13</version>
 + </dependency>
 + </dependencies>
 +
 + <build>
 + <plugins>
 + <plugin>
 +                <groupId>org.apache.maven.plugins</groupId>
 +                <artifactId>maven-assembly-plugin</artifactId>
 +                <executions>
 +                    <execution>
 +                        <phase>package</phase>
 +                        <goals>
 +                            <goal>single</goal>
 +                        </goals>
 +                    </execution>
 +                </executions>
 +                <configuration>
 +                    <archive>
 +                        <manifest>
 +                            <mainClass>ConsumerTest</mainClass>
 +                        </manifest>
 +                    </archive>
 +                    <descriptorRefs>
 +                        <descriptorRef>jar-with-dependencies</descriptorRef>
 +                    </descriptorRefs>
 +                </configuration>
 +            </plugin>
 + </plugins>
 + </build>
 +</project>
 +</code>
 +
 +<code java>
 +// Часть импортов похоже не нужна
 +import org.apache.kafka.clients.consumer.*;
 +import org.apache.kafka.clients.consumer.Consumer;
 +import org.apache.kafka.common.serialization.LongDeserializer;
 +import org.apache.kafka.common.serialization.StringDeserializer;
 +
 +import org.apache.kafka.clients.consumer.ConsumerRecord;
 +import org.apache.kafka.clients.consumer.ConsumerRecords;
 +import org.apache.kafka.clients.consumer.KafkaConsumer;
 +import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 +import org.apache.kafka.common.PartitionInfo;
 +import org.apache.kafka.common.TopicPartition;
 +import org.apache.kafka.common.record.TimestampType;
 +
 +import java.util.Collections;
 +import java.util.Properties;
 +import java.time.Duration;
 +import java.util.Arrays;
 +import java.util.Map;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +
 +public class ConsumerTest {
 +    public static void main(String[] args) {
 +
 + Properties consumerProperties = new Properties();
 + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096");
 + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app");
 +
 + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 + //props.setProperty("enable.auto.commit", "true");
 + //props.setProperty("auto.commit.interval.ms", "1000");
 +
 + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 +
 + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
 +
 + String topic = "secondTop";
 + /////////////////////////////////////////////////////////
 + /// Ручное назначение партиций
 + /*
 + TopicPartition partition0 = new TopicPartition(topic, 0);
 + TopicPartition partition1 = new TopicPartition(topic, 1);
 + TopicPartition partition2 = new TopicPartition(topic, 2);
 + TopicPartition partition3 = new TopicPartition(topic, 3);
 + consumer.assign(Arrays.asList(partition0, partition1, partition2, partition3));
 + */
 +
 + /////////////////////////////////////////////////////////
 + // Пример с автополучением и назначением всех имеющихся партиций
 + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
 +
 +        final Map<TopicPartition, Long> partitions = partitionInfos.stream()
 +                .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
 +                .collect(Collectors.toMap(
 +                        partitionInfo -> partitionInfo,
 +                        partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis())
 +                );
 +
 + // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции.
 +        final Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap =
 +                consumer.offsetsForTimes(partitions);
 +
 + // Подписываемся на все партиции в топике.
 +        consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
 +
 + // Что то не работает, возвращает Null иногда, с-но ловим NPE
 +        // Выставляем сдвиги для каждой партиции.
 +        //topicPartitionOffsetAndTimestampMap.forEach(
 +        //        (topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); 
 + /////////////////////////////////////////////////////
 +
 +
 + /// Подписка на топик
 + //consumer.subscribe(Arrays.asList(topic));
 + consumer.seekToBeginning(consumer.assignment());
 +
 + // Read
 + while (true) {
 + ConsumerRecords<String, String> records = consumer.poll(100);
 + for (ConsumerRecord<String, String> record : records) {
 + System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
 + consumer.commitAsync();
 +
 + // Этот кусок для искуственной задержки между итерациями
 + try 
 + {
 + Thread.sleep(2000);
 +
 + catch(InterruptedException e)
 + {
 + // this part is executed when an exception (in this example InterruptedException) occurs
 + }
 + }
 + }
 +    }
 +}
 +</code>
 +
 +
 +<code bash>
 +mvn clean install
 +java -jar target/ConsumerTest-1.0-SNAPSHOT-jar-with-dependencies.jar
 +</code>
 +
 +
 +Еще пример, с явным назначением каждой партиции и заданием смещения каждой\\
 +со смещением правда неясно, какое именно задается, вроде как какое то последнее..\\
 +стреляет NPE если в момент назначения получаем на какой либо партиции NULL, надо чтобы в каждой были записи тогда все норм\\
 +<code java>
 +import org.apache.kafka.clients.consumer.*;
 +import org.apache.kafka.common.serialization.StringDeserializer;
 +import org.apache.kafka.common.PartitionInfo;
 +import org.apache.kafka.common.TopicPartition;
 +import java.util.Properties;
 +import java.time.Duration;
 +import java.util.Map;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +
 +public class ConsumerTest {
 +    public static void main(String[] args) throws InterruptedException {
 +
 + Properties consumerProperties = new Properties();
 + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094, localhost:9096");
 + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test_app");
 +
 + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 + //props.setProperty("enable.auto.commit", "true");
 + //props.setProperty("auto.commit.interval.ms", "1000");
 +
 + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 +
 + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
 + String topic = "secondTop";
 +
 + // Автополучение и назначением всех имеющихся партиций
 + // перечень имеющихся партиций
 + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
 + // собираем в мапу, каждой ставится текущий timestamp 
 +        final Map<TopicPartition, Long> partitions = partitionInfos.stream()
 +                .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
 +                .collect(Collectors.toMap(
 +                        partitionInfo -> partitionInfo,
 +                        partitionInfo -> System.currentTimeMillis() - Duration.ofMinutes(1).toMillis())
 +                );
 + // echo 
 + partitions.forEach((key, value) -> System.out.println(key + ":" + value));
 +
 +
 + // Получаем смещения, ближайшие и большие или равные требуемому, для каждой партиции.
 + // комент из игаса, не совсем явно какие именно задаются смещения, но какие то задаются, каждой партиции
 + final Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(partitions);
 + // echo 
 + System.out.println("<=======>");
 + topicPartitionOffsetAndTimestampMap.forEach((key, value) -> System.out.println(key + ":" + value));
 +
 + // Подписываемся на все партиции в топике.
 +        consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
 +
 + // выставляем сдвиги на каждой партиции
 + topicPartitionOffsetAndTimestampMap.forEach((topicPartition, offsetAndTimestamp) -> consumer.seek(topicPartition, offsetAndTimestamp.offset())); 
 +
 + // Read topic 
 + while (true) 
 + {
 + // получаем очередную запись
 + ConsumerRecords<String, String> records = consumer.poll(100);
 + for (ConsumerRecord<String, String> record : records) 
 + {
 + System.out.printf("\n====================\noffset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
 + consumer.commitAsync();
 + // Искуственная задержка между итерациями чтения, 1сек
 + Thread.sleep(1000);
 + }
 + }
 +    }
 +}
 +</code>
 +
 </details> </details>
  
  
develop/java/practics_overall.1713073597.txt.gz · Последнее изменение: 2024/04/14 05:46 — admin