diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml
index d51c2e300f..7b0bb0a8b7 100644
--- a/spring-kafka/pom.xml
+++ b/spring-kafka/pom.xml
@@ -9,9 +9,9 @@
com.baeldung
- parent-boot-2
+ parent-boot-3
0.0.1-SNAPSHOT
- ../parent-boot-2
+ ../parent-boot-3
@@ -61,8 +61,24 @@
awaitility
test
+
+ org.apache.commons
+ commons-lang3
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ com.baeldung.spring.kafka.KafkaApplication
+
+
+
+
+
1.16.2
diff --git a/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java b/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java
index 89cd1c8dac..f76be95c1c 100644
--- a/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java
+++ b/spring-kafka/src/main/java/com/baeldung/countingmessages/KafkaCountingMessagesComponent.java
@@ -6,7 +6,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import javax.annotation.PostConstruct;
+import jakarta.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java
index 895d437c6b..38ce366355 100644
--- a/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/ssl/KafkaProducer.java
@@ -15,9 +15,12 @@ public class KafkaProducer {
public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
- .addCallback(
- result -> log.info("Message sent to topic: {}", message),
- ex -> log.error("Failed to send message", ex)
- );
+ .whenComplete((result, ex) -> {
+ if (ex == null) {
+ log.info("Message sent to topic: {}", message);
+ } else {
+ log.error("Failed to send message", ex);
+ }
+ });
}
}
diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java
index 2b8e9bbfbd..664758a052 100644
--- a/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java
+++ b/spring-kafka/src/main/java/com/baeldung/kafka/streams/KafkaProducer.java
@@ -15,9 +15,12 @@ public class KafkaProducer {
public void sendMessage(String message) {
kafkaTemplate.send("input-topic", message)
- .addCallback(
- result -> log.info("Message sent to topic: {}", message),
- ex -> log.error("Failed to send message", ex)
- );
+ .whenComplete((result, ex) -> {
+ if (ex == null) {
+ log.info("Message sent to topic: {}", message);
+ } else {
+ log.error("Failed to send message", ex);
+ }
+ });
}
}
diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java
index 9b79f716e9..ff2d21668f 100644
--- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java
+++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaApplication.java
@@ -1,5 +1,6 @@
package com.baeldung.spring.kafka;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -16,8 +17,6 @@ import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
@SpringBootApplication
public class KafkaApplication {
@@ -102,18 +101,13 @@ public class KafkaApplication {
public void sendMessage(String message) {
- ListenableFuture> future = kafkaTemplate.send(topicName, message);
+ CompletableFuture> future = kafkaTemplate.send(topicName, message);
+ future.whenComplete((result, ex) -> {
- future.addCallback(new ListenableFutureCallback>() {
-
- @Override
- public void onSuccess(SendResult result) {
+ if (ex == null) {
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata()
.offset() + "]");
- }
-
- @Override
- public void onFailure(Throwable ex) {
+ } else {
System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
@@ -155,13 +149,13 @@ public class KafkaApplication {
}
@KafkaListener(topics = "${message.topic.name}", containerFactory = "headersKafkaListenerContainerFactory")
- public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
+ public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition);
latch.countDown();
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "${partitioned.topic.name}", partitions = { "0", "3" }), containerFactory = "partitionsKafkaListenerContainerFactory")
- public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
+ public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
System.out.println("Received Message: " + message + " from partition: " + partition);
this.partitionLatch.countDown();
}
diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java
index 85df8485d2..aee3c2c0dc 100644
--- a/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java
+++ b/spring-kafka/src/test/java/com/baeldung/kafka/streams/KafkaStreamsApplicationLiveTest.java
@@ -20,7 +20,7 @@ import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;