BAEL-6057 - Implementing Retry In Kafka Consumer (#13274)

* BAEL-6057 - Implementing Retry In Kafka Consumer

(moved code example to new module)

* BAEL-6057 - Implementing Retry In Kafka Consumer

(fix on README.md)

Co-authored-by: Cesare <cesare.valenti@hotmail.com>
This commit is contained in:
cesarevalenti90
2023-01-11 22:25:43 +01:00
committed by GitHub
parent e8cf7721dc
commit b468cffb3f
17 changed files with 548 additions and 41 deletions

View File

@@ -1,6 +1,5 @@
package com.baeldung.spring.kafka;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
@@ -9,20 +8,15 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@EnableKafka
@Configuration
@@ -31,12 +25,6 @@ public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
@@ -83,7 +71,7 @@ public class KafkaConsumerConfig {
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory("filter");
factory.setRecordFilterStrategy(record -> record.value()
.contains("World"));
.contains("World"));
return factory;
}
@@ -95,7 +83,7 @@ public class KafkaConsumerConfig {
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> kafkaListenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
@@ -121,32 +109,15 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_test");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setMessageConverter(multiTypeConverter());
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
}, fixedBackOff);
//Commented because of the test
//errorHandler.addRetryableExceptions(SocketTimeoutException.class,RuntimeException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
}

View File

@@ -2,7 +2,6 @@ package com.baeldung.spring.kafka;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
@@ -10,12 +9,7 @@ import org.springframework.stereotype.Component;
public class MultiTypeKafkaListener {
@KafkaHandler
//@RetryableTopic(backoff = @Backoff(value = 3000L), attempts = "5", autoCreateTopics = "false",include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
if (greeting.getName()
.equalsIgnoreCase("test")) {
throw new MessagingException("test not allowed");
}
System.out.println("Greeting received: " + greeting);
}

View File

@@ -1,14 +0,0 @@
package com.baeldung.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@Import(value = { KafkaTopicConfig.class, KafkaConsumerConfig.class, KafkaProducerConfig.class })
public class RetryableApplicationKafkaApp {
public static void main(String[] args) {
SpringApplication.run(RetryableApplicationKafkaApp.class, args);
}
}

View File

@@ -14,7 +14,4 @@ monitor.producer.simulate=true
monitor.consumer.simulate=true
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
test.topic=testtopic1
kafka.backoff.interval=9000
kafka.backoff.max_failure=5

View File

@@ -1,84 +0,0 @@
package com.baeldung.spring.kafka;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootTest(classes = RetryableApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class KafkaRetryableIntegrationTest {
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
private ObjectMapper objectMapper = new ObjectMapper();
private static final String CONTAINER_GROUP = "multiGroup";
private static final String TOPIC = "topic";
@Before
public void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@Test
public void givenEmbeddedKafkaBroker_whenSendingAWellFormedMessage_thenMessageIsConsumed() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(CONTAINER_GROUP);
container.stop();
@SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container.getContainerProperties()
.getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener((AcknowledgingConsumerAwareMessageListener<String, String>) (data, acknowledgment, consumer) -> {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
});
Greeting greeting = new Greeting("test1", "test2");
container.start();
template.send(TOPIC, objectMapper.writeValueAsString(greeting));
assertThat(latch.await(10, TimeUnit.SECONDS)).isFalse();
}
@Test
public void givenEmbeddedKafkaBroker_whenSendingAMalFormedMessage_thenMessageIsConsumedAfterRetry() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(CONTAINER_GROUP);
container.stop();
@SuppressWarnings("unchecked") AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container.getContainerProperties()
.getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener((AcknowledgingConsumerAwareMessageListener<String, String>) (data, acknowledgment, consumer) -> {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
});
container.start();
Greeting greeting = new Greeting("test", "test");
template.send(TOPIC, objectMapper.writeValueAsString(greeting));
//this message will go on error
Greeting greeting2 = new Greeting("test2", "test2");
template.send(TOPIC, objectMapper.writeValueAsString(greeting2));
assertThat(latch.getCount()).isEqualTo(1);
}
}