From 2c77131e0d8e7e1064f9903f7f30580310b6b6dc Mon Sep 17 00:00:00 2001 From: anuragkumawat Date: Sat, 25 Feb 2023 15:06:03 +0530 Subject: [PATCH] JAVA-18598 Update Intro to Apache Kafka with Spring article --- spring-kafka/pom.xml | 20 +++++++++++++++++-- .../KafkaCountingMessagesComponent.java | 2 +- .../com/baeldung/kafka/ssl/KafkaProducer.java | 11 ++++++---- .../baeldung/kafka/streams/KafkaProducer.java | 11 ++++++---- .../spring/kafka/KafkaApplication.java | 20 +++++++------------ .../KafkaStreamsApplicationLiveTest.java | 2 +- 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml index d51c2e300f..7b0bb0a8b7 100644 --- a/spring-kafka/pom.xml +++ b/spring-kafka/pom.xml @@ -9,9 +9,9 @@ com.baeldung - parent-boot-2 + parent-boot-3 0.0.1-SNAPSHOT - ../parent-boot-2 + ../parent-boot-3 @@ -61,8 +61,24 @@ awaitility test + + org.apache.commons + commons-lang3 + + + + + org.springframework.boot + spring-boot-maven-plugin + + com.baeldung.spring.kafka.KafkaApplication + + + + + 1.16.2 diff --git a/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java b/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java index 89cd1c8dac..f76be95c1c 100644 --- a/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java +++ b/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java @@ -6,7 +6,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; +import jakarta.annotation.PostConstruct; import java.util.Collections; import java.util.HashMap; import java.util.List; diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java index 895d437c6b..38ce366355 100644 --- a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java +++ b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java @@ -15,9 +15,12 @@ public class KafkaProducer { public void sendMessage(String message, String topic) { log.info("Producing message: {}", message); kafkaTemplate.send(topic, "key", message) - .addCallback( - result -> log.info("Message sent to topic: {}", message), - ex -> log.error("Failed to send message", ex) - ); + .whenComplete((result, ex) -> { + if (ex == null) { + log.info("Message sent to topic: {}", message); + } else { + log.error("Failed to send message", ex); + } + }); } } diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java index 2b8e9bbfbd..664758a052 100644 --- a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java +++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java @@ -15,9 +15,12 @@ public class KafkaProducer { public void sendMessage(String message) { kafkaTemplate.send("input-topic", message) - .addCallback( - result -> log.info("Message sent to topic: {}", message), - ex -> log.error("Failed to send message", ex) - ); + .whenComplete((result, ex) -> { + if (ex == null) { + log.info("Message sent to topic: {}", message); + } else { + log.error("Failed to send message", ex); + } + }); } } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java index 9b79f716e9..ff2d21668f 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java @@ -1,5 +1,6 @@ package com.baeldung.spring.kafka; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -16,8 +17,6 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; @SpringBootApplication public class KafkaApplication { @@ -102,18 +101,13 @@ public class KafkaApplication { public void sendMessage(String message) { - ListenableFuture> future = kafkaTemplate.send(topicName, message); + CompletableFuture> future = kafkaTemplate.send(topicName, message); + future.whenComplete((result, ex) -> { - future.addCallback(new ListenableFutureCallback>() { - - @Override - public void onSuccess(SendResult result) { + if (ex == null) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata() .offset() + "]"); - } - - @Override - public void onFailure(Throwable ex) { + } else { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); @@ -155,13 +149,13 @@ public class KafkaApplication { } @KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory") - public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { + public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) { System.out.println("Received Message: " + message + " from partition: " + partition); latch.countDown(); } @KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory") - public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { + public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) { System.out.println("Received Message: " + message + " from partition: " + partition); this.partitionLatch.countDown(); } diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java index 85df8485d2..aee3c2c0dc 100644 --- a/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java +++ b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java @@ -20,7 +20,7 @@ import org.junit.jupiter.api.io.TempDir; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod;