diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java index e8aa63a88d..463d3209ea 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/KafkaConsumerConfig.java @@ -1,5 +1,6 @@ package com.baeldung.spring.kafka; +import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; @@ -8,15 +9,20 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; @EnableKafka @Configuration @@ -25,6 +31,12 @@ public class KafkaConsumerConfig { @Value(value = "${spring.kafka.bootstrap-servers}") private String bootstrapAddress; + @Value(value = "${kafka.backoff.interval}") + private Long interval; + + @Value(value = "${kafka.backoff.max_failure}") + private Long maxAttempts; + public ConsumerFactory consumerFactory(String groupId) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); @@ -71,7 +83,7 @@ public class KafkaConsumerConfig { public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory("filter"); factory.setRecordFilterStrategy(record -> record.value() - .contains("World")); + .contains("World")); return factory; } @@ -83,7 +95,7 @@ public class KafkaConsumerConfig { } @Bean - public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; @@ -109,15 +121,32 @@ public class KafkaConsumerConfig { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test"); return new DefaultKafkaConsumerFactory<>(props); } @Bean - public ConcurrentKafkaListenerContainerFactory multiTypeKafkaListenerContainerFactory() { + @Primary + public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(multiTypeConsumerFactory()); factory.setMessageConverter(multiTypeConverter()); + factory.setCommonErrorHandler(errorHandler()); + factory.getContainerProperties() + .setAckMode(ContainerProperties.AckMode.RECORD); return factory; } + @Bean + public DefaultErrorHandler errorHandler() { + BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts); + DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> { + System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName())); + }, fixedBackOff); + //Commented because of the test + //errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class); + errorHandler.addNotRetryableExceptions(NullPointerException.class); + return errorHandler; + } + } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java index 9afb5ff0b6..4b43c84f15 100644 --- a/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/MultiTypeKafkaListener.java @@ -2,6 +2,7 @@ package com.baeldung.spring.kafka; import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component @@ -9,7 +10,12 @@ import org.springframework.stereotype.Component; public class MultiTypeKafkaListener { @KafkaHandler + //@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class) public void handleGreeting(Greeting greeting) { + if (greeting.getName() + .equalsIgnoreCase("test")) { + throw new MessagingException("test not allowed"); + } System.out.println("Greeting received: " + greeting); } diff --git a/spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java b/spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java new file mode 100644 index 0000000000..e43207829a --- /dev/null +++ b/spring-kafka/src/main/java/com/baeldung/spring/kafka/RetryableApplicationKafkaApp.java @@ -0,0 +1,14 @@ +package com.baeldung.spring.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Import; + +@SpringBootApplication +@Import(value = { KafkaTopicConfig.class, KafkaConsumerConfig.class, KafkaProducerConfig.class }) +public class RetryableApplicationKafkaApp { + + public static void main(String[] args) { + SpringApplication.run(RetryableApplicationKafkaApp.class, args); + } +} diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties index c57537e2af..691b6f55b7 100644 --- a/spring-kafka/src/main/resources/application.properties +++ b/spring-kafka/src/main/resources/application.properties @@ -14,4 +14,7 @@ monitor.producer.simulate=true monitor.consumer.simulate=true monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate test.topic=testtopic1 +kafka.backoff.interval=9000 +kafka.backoff.max_failure=5 + diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java index eebcf778be..030d166ca4 100644 --- a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java +++ b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java @@ -1,7 +1,6 @@ package com.baeldung.kafka.embedded; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -16,6 +15,8 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; +import com.fasterxml.jackson.databind.ObjectMapper; + @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) @@ -33,6 +34,8 @@ class EmbeddedKafkaIntegrationTest { @Value("${test.topic}") private String topic; + private ObjectMapper objectMapper = new ObjectMapper(); + @BeforeEach void setup() { consumer.resetLatch(); @@ -44,7 +47,8 @@ class EmbeddedKafkaIntegrationTest { template.send(topic, data); - boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); + boolean messageConsumed = consumer.getLatch() + .await(10, TimeUnit.SECONDS); assertTrue(messageConsumed); assertThat(consumer.getPayload(), containsString(data)); } @@ -55,7 +59,8 @@ class EmbeddedKafkaIntegrationTest { producer.send(topic, data); - boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); + boolean messageConsumed = consumer.getLatch() + .await(10, TimeUnit.SECONDS); assertTrue(messageConsumed); assertThat(consumer.getPayload(), containsString(data)); } diff --git a/spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java b/spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java new file mode 100644 index 0000000000..029031923e --- /dev/null +++ b/spring-kafka/src/test/java/com/baeldung/spring/kafka/KafkaRetryableIntegrationTest.java @@ -0,0 +1,84 @@ +package com.baeldung.spring.kafka; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import com.fasterxml.jackson.databind.ObjectMapper; + +@SpringBootTest(classes = RetryableApplicationKafkaApp.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) +public class KafkaRetryableIntegrationTest { + @ClassRule + public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype"); + + @Autowired + private KafkaListenerEndpointRegistry registry; + + @Autowired + private KafkaTemplate template; + + private ObjectMapper objectMapper = new ObjectMapper(); + + private static final String CONTAINER_GROUP = "multiGroup"; + + private static final String TOPIC = "topic"; + + @Before + public void setup() { + System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); + } + + @Test + public void givenEmbeddedKafkaBroker_whenSendingAWellFormedMessage_thenMessageIsConsumed() throws Exception { + ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) registry.getListenerContainer(CONTAINER_GROUP); + container.stop(); + @SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener messageListener = (AcknowledgingConsumerAwareMessageListener) container.getContainerProperties() + .getMessageListener(); + CountDownLatch latch = new CountDownLatch(1); + container.getContainerProperties() + .setMessageListener((AcknowledgingConsumerAwareMessageListener) (data, acknowledgment, consumer) -> { + messageListener.onMessage(data, acknowledgment, consumer); + latch.countDown(); + }); + Greeting greeting = new Greeting("test1", "test2"); + container.start(); + template.send(TOPIC, objectMapper.writeValueAsString(greeting)); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + public void givenEmbeddedKafkaBroker_whenSendingAMalFormedMessage_thenMessageIsConsumedAfterRetry() throws Exception { + ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) registry.getListenerContainer(CONTAINER_GROUP); + container.stop(); + @SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener messageListener = (AcknowledgingConsumerAwareMessageListener) container.getContainerProperties() + .getMessageListener(); + CountDownLatch latch = new CountDownLatch(1); + container.getContainerProperties() + .setMessageListener((AcknowledgingConsumerAwareMessageListener) (data, acknowledgment, consumer) -> { + messageListener.onMessage(data, acknowledgment, consumer); + latch.countDown(); + }); + container.start(); + Greeting greeting = new Greeting("test", "test"); + template.send(TOPIC, objectMapper.writeValueAsString(greeting)); + //this message will go on error + Greeting greeting2 = new Greeting("test2", "test2"); + template.send(TOPIC, objectMapper.writeValueAsString(greeting2)); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + +}