diff --git a/spring-kafka/README.md b/spring-kafka/README.md
index 2731eca042..c8f01cc28b 100644
--- a/spring-kafka/README.md
+++ b/spring-kafka/README.md
@@ -2,8 +2,28 @@
This is a simple Spring Boot app to demonstrate sending and receiving of messages in Kafka using spring-kafka.
-As Kafka topics are not created automatically by default, this application requires that a topic named 'baeldung' is created manually.
+As Kafka topics are not created automatically by default, this application requires that you create the following topics manually.
-`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung`
+`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic baeldung`
+`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic partitioned`
+`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic filtered`
+`$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic greeting`
-Two listeners with group Ids **foo** and **bar** are configured. When run successfully, the *Hello World!* message will be received by both the listeners and logged on console.
+When the application runs successfully, following output is logged on to console (along with spring logs):
+
+#### Message received from the 'baeldung' topic by the basic listeners with groups foo and bar
+>Received Messasge in group 'foo': Hello, World!
+Received Messasge in group 'bar': Hello, World!
+
+#### Message received from the 'baeldung' topic, with the partition info
+>Received Messasge: Hello, World! from partition: 0
+
+#### Message received from the 'partitioned' topic, only from specific partitions
+>Received Message: Hello To Partioned Topic! from partition: 0
+Received Message: Hello To Partioned Topic! from partition: 3
+
+#### Message received from the 'filtered' topic after filtering
+>Recieved Message in filtered listener: Hello Baeldung!
+
+#### Message (Serialized Java Object) received from the 'greeting' topic
+>Recieved greeting message: Greetings, World!!
\ No newline at end of file
diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml
index 73eaf3acff..11810a17dd 100644
--- a/spring-kafka/pom.xml
+++ b/spring-kafka/pom.xml
@@ -12,6 +12,7 @@
1.8
1.1.3.RELEASE
+ 2.6.7
@@ -21,17 +22,22 @@
-
+
org.springframework.boot
spring-boot-starter
-
+
org.springframework.kafka
spring-kafka
-
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/Greeting.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/Greeting.java
new file mode 100644
index 0000000000..b4633e802a
--- /dev/null
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/Greeting.java
@@ -0,0 +1,37 @@
+package com.baeldung.spring.kafka;
+
+public class Greeting {
+
+ private String msg;
+ private String name;
+
+ public Greeting() {
+
+ }
+
+ public Greeting(String msg, String name) {
+ this.msg = msg;
+ this.name = name;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return msg + ", " + name + "!";
+ }
+}
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java
index 252054a9f1..50978d5ea9 100644
--- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java
@@ -10,21 +10,61 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) throws Exception {
+
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
+
MessageProducer producer = context.getBean(MessageProducer.class);
- producer.sendMessage("Hello, World!");
-
MessageListener listener = context.getBean(MessageListener.class);
- listener.latch.await(20, TimeUnit.SECONDS);
- Thread.sleep(60000);
- context.close();
+ /*
+ * Sending a Hello World message to topic 'baeldung'.
+ * Must be recieved by both listeners with group foo
+ * and bar with containerFactory fooKafkaListenerContainerFactory
+ * and barKafkaListenerContainerFactory respectively.
+ * It will also be recieved by the listener with
+ * headersKafkaListenerContainerFactory as container factory
+ */
+ producer.sendMessage("Hello, World!");
+ listener.latch.await(10, TimeUnit.SECONDS);
+ /*
+ * Sending message to a topic with 5 partition,
+ * each message to a different partition. But as per
+ * listener configuration, only the messages from
+ * partition 0 and 3 will be consumed.
+ */
+ for (int i = 0; i < 5; i++) {
+ producer.sendMessageToPartion("Hello To Partioned Topic!", i);
+ }
+ listener.partitionLatch.await(10, TimeUnit.SECONDS);
+
+ /*
+ * Sending message to 'filtered' topic. As per listener
+ * configuration, all messages with char sequence
+ * 'World' will be discarded.
+ */
+ producer.sendMessageToFiltered("Hello Baeldung!");
+ producer.sendMessageToFiltered("Hello World!");
+ listener.filterLatch.await(10, TimeUnit.SECONDS);
+
+ /*
+ * Sending message to 'greeting' topic. This will send
+ * and recieved a java object with the help of
+ * greetingKafkaListenerContainerFactory.
+ */
+ producer.sendGreetingMessage(new Greeting("Greetings", "World!"));
+ listener.greetingLatch.await(10, TimeUnit.SECONDS);
+
+ context.close();
}
@Bean
@@ -42,18 +82,47 @@ public class KafkaApplication {
@Autowired
private KafkaTemplate kafkaTemplate;
+ @Autowired
+ private KafkaTemplate greetingKafkaTemplate;
+
@Value(value = "${message.topic.name}")
private String topicName;
+ @Value(value = "${partitioned.topic.name}")
+ private String partionedTopicName;
+
+ @Value(value = "${filtered.topic.name}")
+ private String filteredTopicName;
+
+ @Value(value = "${greeting.topic.name}")
+ private String greetingTopicName;
+
public void sendMessage(String message) {
kafkaTemplate.send(topicName, message);
}
+ public void sendMessageToPartion(String message, int partition) {
+ kafkaTemplate.send(partionedTopicName, partition, message);
+ }
+
+ public void sendMessageToFiltered(String message) {
+ kafkaTemplate.send(filteredTopicName, message);
+ }
+
+ public void sendGreetingMessage(Greeting greeting) {
+ greetingKafkaTemplate.send(greetingTopicName, greeting);
+ }
}
public static class MessageListener {
- private CountDownLatch latch = new CountDownLatch(2);
+ private CountDownLatch latch = new CountDownLatch(3);
+
+ private CountDownLatch partitionLatch = new CountDownLatch(2);
+
+ private CountDownLatch filterLatch = new CountDownLatch(2);
+
+ private CountDownLatch greetingLatch = new CountDownLatch(1);
@KafkaListener(topics = "${message.topic.name}", group = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
@@ -67,6 +136,30 @@ public class KafkaApplication {
latch.countDown();
}
+ @KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
+ public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
+ System.out.println("Received Messasge: " + message + " from partition: " + partition);
+ latch.countDown();
+ }
+
+ @KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }))
+ public void listenToParition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
+ System.out.println("Received Message: " + message + " from partition: " + partition);
+ this.partitionLatch.countDown();
+ }
+
+ @KafkaListener(topics = "${filtered.topic.name}", containerFactory = "filterKafkaListenerContainerFactory")
+ public void listenWithFilter(String message) {
+ System.out.println("Recieved Message in filtered listener: " + message);
+ this.filterLatch.countDown();
+ }
+
+ @KafkaListener(topics = "${greeting.topic.name}", containerFactory = "greetingKafkaListenerContainerFactory")
+ public void greetingListener(Greeting greeting) {
+ System.out.println("Recieved greeting message: " + greeting);
+ this.greetingLatch.countDown();
+ }
+
}
}
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
index f9edda2435..9353e63ff6 100644
--- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java
@@ -12,6 +12,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
@EnableKafka
@Configuration
@@ -35,11 +36,49 @@ public class KafkaConsumerConfig {
factory.setConsumerFactory(consumerFactory("foo"));
return factory;
}
-
+
@Bean
public ConcurrentKafkaListenerContainerFactory barKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("bar"));
return factory;
}
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory headersKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory("headers"));
+ return factory;
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory partitionsKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory("partitions"));
+ return factory;
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory("filter"));
+ factory.setRecordFilterStrategy(record -> record.value()
+ .contains("World"));
+ return factory;
+ }
+
+ public ConsumerFactory greetingConsumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
+ return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(greetingConsumerFactory());
+ return factory;
+ }
+
}
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java
index 4f9f9719ee..84d57c9e92 100644
--- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaProducerConfig.java
@@ -11,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
public class KafkaProducerConfig {
@@ -29,8 +30,21 @@ public class KafkaProducerConfig {
@Bean
public KafkaTemplate kafkaTemplate() {
- KafkaTemplate template =
- new KafkaTemplate(producerFactory());
- return template;
+ return new KafkaTemplate(producerFactory());
}
+
+ @Bean
+ public ProducerFactory greetingProducerFactory() {
+ Map configProps = new HashMap();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return new DefaultKafkaProducerFactory(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate greetingKafkaTemplate() {
+ return new KafkaTemplate(greetingProducerFactory());
+ }
+
}
diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties
index a1d73b204c..eaf113191e 100644
--- a/spring-kafka/src/main/resources/application.properties
+++ b/spring-kafka/src/main/resources/application.properties
@@ -1,2 +1,5 @@
kafka.bootstrapAddress=localhost:9092
message.topic.name=baeldung
+greeting.topic.name=greeting
+filtered.topic.name=filtered
+partitioned.topic.name=partitioned
\ No newline at end of file