From acbde5b879b3048922a274ad8d814069227fe5c8 Mon Sep 17 00:00:00 2001 From: abel Date: Wed, 17 Nov 2021 15:03:24 +0900 Subject: [PATCH] =?UTF-8?q?[Update]=20kafka=20retry,=20deadletter=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../configuration/KafkaConfiguration.java | 87 +++++++++++++++++++ .../kafka/consumer/KafkaMessageConsumer.java | 4 +- .../KafkaSpringCloudMessageConsumer.java | 2 + .../controller/MessageSendController.java | 2 +- src/main/resources/application.yaml | 7 ++ 5 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/spring/kafka/configuration/KafkaConfiguration.java diff --git a/src/main/java/com/spring/kafka/configuration/KafkaConfiguration.java b/src/main/java/com/spring/kafka/configuration/KafkaConfiguration.java new file mode 100644 index 0000000..10b5f79 --- /dev/null +++ b/src/main/java/com/spring/kafka/configuration/KafkaConfiguration.java @@ -0,0 +1,87 @@ +package com.spring.kafka.configuration; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.backoff.FixedBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +@Slf4j +@Configuration +public class KafkaConfiguration { + + private RetryTemplate retryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + // 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다. + FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); + fixedBackOffPolicy.setBackOffPeriod(1000L); + retryTemplate.setBackOffPolicy(fixedBackOffPolicy); + // 최대 재시도 횟수 설정 + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(2); + retryTemplate.setRetryPolicy(retryPolicy); + return retryTemplate; + } + + private RecoveryCallback recoveryCallback(KafkaTemplate kafkaTemplate) { + return context -> { + final var retryCount = context.getRetryCount(); + final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment"); + final var record = (ConsumerRecord) context.getAttribute("record"); + final var topic = "dlt_" + record.topic(); + final var value = record.value().toString(); + try { + log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value); + kafkaTemplate.send(topic, value); + } catch (Exception e) { + log.error("[Fail to dead letter topic]: {}" , topic, e); + } + if (Objects.nonNull(acknowledgment)) { + acknowledgment.acknowledge(); + } + return Optional.empty(); + }; + } + + private ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) { + final var props = Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(), + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false, + ConsumerConfig.ISOLATION_LEVEL_CONFIG, KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class + ); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate kafkaTemplate) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(kafkaProperties)); + factory.setRetryTemplate(retryTemplate()); + factory.setRecoveryCallback(recoveryCallback(kafkaTemplate)); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } +} diff --git a/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java b/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java index 6825e42..1b60fb5 100644 --- a/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java +++ b/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java @@ -17,9 +17,11 @@ public class KafkaMessageConsumer { log.debug("Received Payloads : " + message); } - @KafkaListener(topics = "domain-event-user") + @KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory") public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { log.debug("Received Headers : " + headers); log.debug("Received Payloads : " + user.toString()); + // 재시도 테스트를 위해 예외 처리 추가 + throw new UnknownError("unexpected error"); } } diff --git a/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java b/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java index 7f2b148..0608619 100644 --- a/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java +++ b/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java @@ -21,6 +21,8 @@ public class KafkaSpringCloudMessageConsumer { Consumer domainEventModel() { return input -> { System.out.println("Received Message : " + input); + // 재시도 테스트를 위해 예외 처리 추가 + throw new UnknownError("unexpected error"); }; } } diff --git a/src/main/java/com/spring/kafka/controller/MessageSendController.java b/src/main/java/com/spring/kafka/controller/MessageSendController.java index 653af7e..1d5a9fc 100644 --- a/src/main/java/com/spring/kafka/controller/MessageSendController.java +++ b/src/main/java/com/spring/kafka/controller/MessageSendController.java @@ -24,7 +24,7 @@ public class MessageSendController { } /* - curl --location --request POST 'http://localhost:8080/send-message' \ + curl --location --request POST 'http://localhost:8080/send-message-user' \ --header 'Content-Type: application/json' \ --data-raw '{ "id": "happydaddy@naver.com", diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index aab1e3c..3e3f644 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -42,10 +42,17 @@ spring: domainEventModel-in-0: destination: domain-event-model group: consumer-group-model + consumer: + max-attempts: 2 domainEventModel-out-0: destination: domain-event-model group: consumer-group-model kafka: + bindings: + domainEventModel-in-0: + consumer: + enableDlq: true + dlqName: dlt_domain-event-model binder: brokers: localhost:9093,localhost:9094,localhost:9095 configuration: