Update KafkaApplication.java (#12424)
* Update KafkaApplication.java
Added partition offset to the listener so that it can start consuming the messages from the beginning of the topic.
* Revert "Update KafkaApplication.java"
This reverts commit 28801f8517.
* Counting Messages
1. Counting messages from Producer offset
2. Counting messages from Consumer
3. Rest APIs to view the count
* Refactoring
* Adding 3rd programmatic method to count number of messages in Kafka topic
* Removing irrelevant code
* Server port removed
This commit is contained in:
committed by
GitHub
parent
112fda8603
commit
a998854249
@@ -0,0 +1,11 @@
|
||||
package com.baeldung.countingmessages;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class Application {
|
||||
public static void main(String[] args){
|
||||
SpringApplication.run(Application.class,args);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.baeldung.countingmessages;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Component
|
||||
public class KafkaCountingMessagesComponent {
|
||||
@Value(value = "${kafka.bootstrapAddress}")
|
||||
private String bootstrapAddress;
|
||||
|
||||
public static Map<String, Object> props = new HashMap<>();
|
||||
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
System.out.println(getTotalNumberOfMessagesInATopic("baeldung"));
|
||||
}
|
||||
|
||||
public Long getTotalNumberOfMessagesInATopic(String topic){
|
||||
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(getProps());
|
||||
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
|
||||
.map(p -> new TopicPartition(topic, p.partition()))
|
||||
.collect(Collectors.toList());
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToEnd(Collections.emptySet());
|
||||
Map<TopicPartition, Long> endPartitions = partitions.stream()
|
||||
.collect(Collectors.toMap(Function.identity(), consumer::position));
|
||||
return partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();
|
||||
}
|
||||
|
||||
public Map<String, Object> getProps() {
|
||||
if (props.isEmpty()) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
|
||||
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
|
||||
}
|
||||
return props;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user