From 547d1da2af05fb6d4ba01a1368984117ba2bc2ce Mon Sep 17 00:00:00 2001 From: deogicorgi Date: Sun, 3 Apr 2022 00:17:32 +0900 Subject: [PATCH] =?UTF-8?q?=ED=81=B4=EB=9E=98=EC=8A=A4=EB=AA=85=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...oduceMessage.java => AbstractKafkaMessage.java} | 8 ++++---- ...dyProduceMessage.java => KafkaBodyMessage.java} | 4 ++-- ...UriProduceMessage.java => KafkaUriMessage.java} | 4 ++-- .../reactive/common/model/KafkaProduceResult.java | 10 +++++----- .../reactive/producer/config/KafkaConfig.java | 2 ++ .../reactive/producer/service/KafkaService.java | 7 +++---- .../web/controller/ProducerController.java | 4 ++-- .../web/service/FailureMessageService.java | 14 ++++++++++++++ .../producer/web/service/ProduceService.java | 6 ++++-- 9 files changed, 38 insertions(+), 21 deletions(-) rename common/src/main/java/com/github/deogicorgi/reactive/common/message/{AbstractKafkaProduceMessage.java => AbstractKafkaMessage.java} (72%) rename common/src/main/java/com/github/deogicorgi/reactive/common/message/{KafkaBodyProduceMessage.java => KafkaBodyMessage.java} (79%) rename common/src/main/java/com/github/deogicorgi/reactive/common/message/{KafkaUriProduceMessage.java => KafkaUriMessage.java} (78%) create mode 100644 producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/FailureMessageService.java diff --git a/common/src/main/java/com/github/deogicorgi/reactive/common/message/AbstractKafkaProduceMessage.java b/common/src/main/java/com/github/deogicorgi/reactive/common/message/AbstractKafkaMessage.java similarity index 72% rename from common/src/main/java/com/github/deogicorgi/reactive/common/message/AbstractKafkaProduceMessage.java rename to common/src/main/java/com/github/deogicorgi/reactive/common/message/AbstractKafkaMessage.java index 0b44107..a9454c2 100644 --- a/common/src/main/java/com/github/deogicorgi/reactive/common/message/AbstractKafkaProduceMessage.java +++ b/common/src/main/java/com/github/deogicorgi/reactive/common/message/AbstractKafkaMessage.java @@ -17,12 +17,12 @@ import java.time.LocalDateTime; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, property = "type", - defaultImpl = KafkaUriProduceMessage.class) + defaultImpl = KafkaUriMessage.class) @JsonSubTypes({ - @JsonSubTypes.Type(value = KafkaUriProduceMessage.class, names = {"uri", "Uri", "URI"}), - @JsonSubTypes.Type(value = KafkaBodyProduceMessage.class, names = {"message", "Message", "MESSAGE"}) + @JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}), + @JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"}) }) -public abstract class AbstractKafkaProduceMessage { +public abstract class AbstractKafkaMessage { // 요청 토픽 protected String topic; diff --git a/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaBodyProduceMessage.java b/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaBodyMessage.java similarity index 79% rename from common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaBodyProduceMessage.java rename to common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaBodyMessage.java index a91599c..518d26f 100644 --- a/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaBodyProduceMessage.java +++ b/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaBodyMessage.java @@ -9,12 +9,12 @@ import lombok.Setter; */ @Getter @Setter -public class KafkaBodyProduceMessage extends AbstractKafkaProduceMessage { +public class KafkaBodyMessage extends AbstractKafkaMessage { // 요청 메시지 private String message; - public KafkaBodyProduceMessage() { + public KafkaBodyMessage() { super.type = ProduceMessageType.Message; } diff --git a/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaUriProduceMessage.java b/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaUriMessage.java similarity index 78% rename from common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaUriProduceMessage.java rename to common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaUriMessage.java index 20040ae..22c3d9e 100644 --- a/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaUriProduceMessage.java +++ b/common/src/main/java/com/github/deogicorgi/reactive/common/message/KafkaUriMessage.java @@ -9,12 +9,12 @@ import lombok.Setter; */ @Getter @Setter -public class KafkaUriProduceMessage extends AbstractKafkaProduceMessage { +public class KafkaUriMessage extends AbstractKafkaMessage { // 요청 URI private String uri; - public KafkaUriProduceMessage() { + public KafkaUriMessage() { super.type = ProduceMessageType.URI; } diff --git a/common/src/main/java/com/github/deogicorgi/reactive/common/model/KafkaProduceResult.java b/common/src/main/java/com/github/deogicorgi/reactive/common/model/KafkaProduceResult.java index cbff84a..625ef36 100644 --- a/common/src/main/java/com/github/deogicorgi/reactive/common/model/KafkaProduceResult.java +++ b/common/src/main/java/com/github/deogicorgi/reactive/common/model/KafkaProduceResult.java @@ -1,14 +1,14 @@ package com.github.deogicorgi.reactive.common.model; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage; import com.github.deogicorgi.reactive.common.value.ProduceMessageType; import lombok.Getter; import java.time.LocalDateTime; /** - * 카프카 메시지 클래스 + * 카프카 메시지 전송결과 클래스 */ @Getter public class KafkaProduceResult { @@ -38,11 +38,11 @@ public class KafkaProduceResult { // 메시지를 처리한 시간 private LocalDateTime producedAt; - public KafkaProduceResult(AbstractKafkaProduceMessage message) { + public KafkaProduceResult(AbstractKafkaMessage message) { this.setRequestedMessage(message); } - public KafkaProduceResult(AbstractKafkaProduceMessage message, Throwable e) { + public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) { this.setRequestedMessage(message); this.status = false; this.error = e; @@ -54,7 +54,7 @@ public class KafkaProduceResult { return error != null; } - private void setRequestedMessage(AbstractKafkaProduceMessage requestedMessage) { + private void setRequestedMessage(AbstractKafkaMessage requestedMessage) { this.topic = requestedMessage.getTopic(); this.messageType = requestedMessage.getType(); this.requestedMessage = requestedMessage.getRequestedMessage(); diff --git a/producer/src/main/java/com/github/deogicorgi/reactive/producer/config/KafkaConfig.java b/producer/src/main/java/com/github/deogicorgi/reactive/producer/config/KafkaConfig.java index f173876..339ead5 100644 --- a/producer/src/main/java/com/github/deogicorgi/reactive/producer/config/KafkaConfig.java +++ b/producer/src/main/java/com/github/deogicorgi/reactive/producer/config/KafkaConfig.java @@ -27,6 +27,7 @@ public class KafkaConfig { ************************ Producer Options ************************ ******************************************************************/ + // 기본 설정들로 구성 @Bean("kafkaSender") public KafkaSender kafkaSender() { SenderOptions senderOptions = SenderOptions.create(getProducerProps()); @@ -35,6 +36,7 @@ public class KafkaConfig { return KafkaSender.create(senderOptions); } + // 프로듀서 옵션 private Map getProducerProps() { return new HashMap<>() {{ put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts()); diff --git a/producer/src/main/java/com/github/deogicorgi/reactive/producer/service/KafkaService.java b/producer/src/main/java/com/github/deogicorgi/reactive/producer/service/KafkaService.java index 88a01a6..dad8310 100644 --- a/producer/src/main/java/com/github/deogicorgi/reactive/producer/service/KafkaService.java +++ b/producer/src/main/java/com/github/deogicorgi/reactive/producer/service/KafkaService.java @@ -1,6 +1,6 @@ package com.github.deogicorgi.reactive.producer.service; -import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage; import com.github.deogicorgi.reactive.common.model.KafkaProduceResult; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,16 +20,15 @@ public class KafkaService { private final KafkaSender producer; - public Mono send(AbstractKafkaProduceMessage message) { + public Mono send(AbstractKafkaMessage message) { + return producer.createOutbound() // 지정된 토픽으로 메시지 전송 .send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage()))) - // 전송완료 된 레코드를 Outbound로 리턴 .then() // 에러 없이 전송이 완료 되었을 경우 .thenReturn(new KafkaProduceResult(message)) // 에러가 발생했을 경우 .onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e))); } - } diff --git a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/controller/ProducerController.java b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/controller/ProducerController.java index f2eae68..8e79c20 100644 --- a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/controller/ProducerController.java +++ b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/controller/ProducerController.java @@ -1,6 +1,6 @@ package com.github.deogicorgi.reactive.producer.web.controller; -import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage; import com.github.deogicorgi.reactive.common.model.KafkaProduceResult; import com.github.deogicorgi.reactive.producer.web.service.ProduceService; import lombok.RequiredArgsConstructor; @@ -21,7 +21,7 @@ public class ProducerController { private final ProduceService produceService; @PostMapping - public Mono send(@RequestBody AbstractKafkaProduceMessage message) { + public Mono send(@RequestBody AbstractKafkaMessage message) { return produceService.produceMessage(message); } } diff --git a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/FailureMessageService.java b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/FailureMessageService.java new file mode 100644 index 0000000..18715ad --- /dev/null +++ b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/FailureMessageService.java @@ -0,0 +1,14 @@ +package com.github.deogicorgi.reactive.producer.web.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class FailureMessageService { + + public void produceFailure() { + } +} diff --git a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java index b67772a..654b02d 100644 --- a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java +++ b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java @@ -1,6 +1,6 @@ package com.github.deogicorgi.reactive.producer.web.service; -import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage; import com.github.deogicorgi.reactive.common.model.KafkaProduceResult; import com.github.deogicorgi.reactive.producer.service.KafkaService; import lombok.RequiredArgsConstructor; @@ -18,10 +18,12 @@ import reactor.core.publisher.Mono; public class ProduceService { private final KafkaService kafkaService; + private final FailureMessageService failureMessageService; - public Mono produceMessage(AbstractKafkaProduceMessage message) { + public Mono produceMessage(AbstractKafkaMessage message) { return kafkaService.send(message) .map(produceResult -> { + failureMessageService.produceFailure(); if (produceResult.hasError()) { // TODO 카프카 프로듀싱 실패일 경우 처리 // ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등