From a9988542494aa918c4467c6ee0774a2088f602d0 Mon Sep 17 00:00:00 2001 From: Muhammad Abdullah Azam Khan Date: Mon, 8 Aug 2022 08:03:20 +0400 Subject: [PATCH] Update KafkaApplication.java (#12424) * Update KafkaApplication.java Added partition offset to the listener so that it can start consuming the messages from the beginning of the topic. * Revert "Update KafkaApplication.java" This reverts commit 28801f85174af8bf2d2bb89e1fb39b52c49a6cf0. * Counting Messages 1. Counting messages from Producer offset 2. Counting messages from Consumer 3. Rest APIs to view the count * Refactoring * Adding 3rd programmatic method to count number of messages in Kafka topic * Removing irrelevant code * Server port removed --- .../countingmessages/Application.java | 11 ++++ .../KafkaCountingMessagesComponent.java | 52 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 spring-kafka/src/main/java/com/baeldung/countingmessages/Application.java create mode 100644 spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java diff --git a/spring-kafka/src/main/java/com/baeldung/countingmessages/Application.java b/spring-kafka/src/main/java/com/baeldung/countingmessages/Application.java new file mode 100644 index 0000000000..7649494438 --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/countingmessages/Application.java @@ -0,0 +1,11 @@ +package com.baeldung.countingmessages; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args){ + SpringApplication.run(Application.class,args); + } +} diff --git a/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java b/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java new file mode 100644 index 0000000000..91ce1b0cbe --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java @@ -0,0 +1,52 @@ +package com.baeldung.countingmessages; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + + +@Component +public class KafkaCountingMessagesComponent { + @Value(value = "${kafka.bootstrapAddress}") + private String bootstrapAddress; + + public static Map props = new HashMap<>(); + + @PostConstruct + public void init(){ + System.out.println(getTotalNumberOfMessagesInATopic("baeldung")); + } + + public Long getTotalNumberOfMessagesInATopic(String topic){ + org.apache.kafka.clients.consumer.KafkaConsumer consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(getProps()); + List partitions = consumer.partitionsFor(topic).stream() + .map(p -> new TopicPartition(topic, p.partition())) + .collect(Collectors.toList()); + consumer.assign(partitions); + consumer.seekToEnd(Collections.emptySet()); + Map endPartitions = partitions.stream() + .collect(Collectors.toMap(Function.identity(), consumer::position)); + return partitions.stream().mapToLong(p -> endPartitions.get(p)).sum(); + } + + public Map getProps() { + if (props.isEmpty()) { + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520"); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520"); + } + return props; + } +}