[Update] kafka retry, deadletter 설정 추가

This commit is contained in:
abel
2021-11-17 15:03:24 +09:00
parent 67d9955fe2
commit acbde5b879
5 changed files with 100 additions and 2 deletions

View File

@@ -0,0 +1,87 @@
package com.spring.kafka.configuration;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@Slf4j
@Configuration
public class KafkaConfiguration {
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다.
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
// 최대 재시도 횟수 설정
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
private RecoveryCallback<Object> recoveryCallback(KafkaTemplate<String, String> kafkaTemplate) {
return context -> {
final var retryCount = context.getRetryCount();
final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment");
final var record = (ConsumerRecord) context.getAttribute("record");
final var topic = "dlt_" + record.topic();
final var value = record.value().toString();
try {
log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value);
kafkaTemplate.send(topic, value);
} catch (Exception e) {
log.error("[Fail to dead letter topic]: {}" , topic, e);
}
if (Objects.nonNull(acknowledgment)) {
acknowledgment.acknowledge();
}
return Optional.empty();
};
}
private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
final var props = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(),
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG, KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback(kafkaTemplate));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}

View File

@@ -17,9 +17,11 @@ public class KafkaMessageConsumer {
log.debug("Received Payloads : " + message);
}
@KafkaListener(topics = "domain-event-user")
@KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
log.debug("Received Headers : " + headers);
log.debug("Received Payloads : " + user.toString());
// 재시도 테스트를 위해 예외 처리 추가
throw new UnknownError("unexpected error");
}
}

View File

@@ -21,6 +21,8 @@ public class KafkaSpringCloudMessageConsumer {
Consumer<User> domainEventModel() {
return input -> {
System.out.println("Received Message : " + input);
// 재시도 테스트를 위해 예외 처리 추가
throw new UnknownError("unexpected error");
};
}
}

View File

@@ -24,7 +24,7 @@ public class MessageSendController {
}
/*
curl --location --request POST 'http://localhost:8080/send-message' \
curl --location --request POST 'http://localhost:8080/send-message-user' \
--header 'Content-Type: application/json' \
--data-raw '{
"id": "happydaddy@naver.com",

View File

@@ -42,10 +42,17 @@ spring:
domainEventModel-in-0:
destination: domain-event-model
group: consumer-group-model
consumer:
max-attempts: 2
domainEventModel-out-0:
destination: domain-event-model
group: consumer-group-model
kafka:
bindings:
domainEventModel-in-0:
consumer:
enableDlq: true
dlqName: dlt_domain-event-model
binder:
brokers: localhost:9093,localhost:9094,localhost:9095
configuration: