Update KafkaApplication.java
Added partition offset to the listener so that it can start consuming the messages from the beginning of the topic.
This commit is contained in:
committed by
GitHub
parent
2f91a6f66f
commit
28801f8517
@@ -18,6 +18,7 @@ import org.springframework.messaging.handler.annotation.Header;
|
|||||||
import org.springframework.messaging.handler.annotation.Payload;
|
import org.springframework.messaging.handler.annotation.Payload;
|
||||||
import org.springframework.util.concurrent.ListenableFuture;
|
import org.springframework.util.concurrent.ListenableFuture;
|
||||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||||
|
import org.springframework.kafka.annotation.PartitionOffset;
|
||||||
|
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class KafkaApplication {
|
public class KafkaApplication {
|
||||||
@@ -142,7 +143,10 @@ public class KafkaApplication {
|
|||||||
|
|
||||||
private CountDownLatch greetingLatch = new CountDownLatch(1);
|
private CountDownLatch greetingLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
|
@KafkaListener(topicPartitions =
|
||||||
|
{ @TopicPartition(topic = "${message.topic.name}",
|
||||||
|
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))},
|
||||||
|
topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
|
||||||
public void listenGroupFoo(String message) {
|
public void listenGroupFoo(String message) {
|
||||||
System.out.println("Received Message in group 'foo': " + message);
|
System.out.println("Received Message in group 'foo': " + message);
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
|||||||
Reference in New Issue
Block a user