From 28801f85174af8bf2d2bb89e1fb39b52c49a6cf0 Mon Sep 17 00:00:00 2001 From: Muhammad Abdullah Azam Khan Date: Wed, 29 Jun 2022 02:57:35 +0400 Subject: [PATCH] Update KafkaApplication.java Added partition offset to the listener so that it can start consuming the messages from the beginning of the topic. --- .../java/com/baeldung/spring/kafka/KafkaApplication.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java index 9b79f716e9..2e86ae20b6 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java @@ -18,6 +18,7 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.kafka.annotation.PartitionOffset; @SpringBootApplication public class KafkaApplication { @@ -142,7 +143,10 @@ public class KafkaApplication { private CountDownLatch greetingLatch = new CountDownLatch(1); - @KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory") + @KafkaListener(topicPartitions = + { @TopicPartition(topic = "${message.topic.name}", + partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}, + topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory") public void listenGroupFoo(String message) { System.out.println("Received Message in group 'foo': " + message); latch.countDown();