diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/config/WebFluxConfig.java b/src/main/java/com/github/deogicorgi/reactor/kafka/config/WebFluxConfig.java new file mode 100644 index 0000000..95b25e3 --- /dev/null +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/config/WebFluxConfig.java @@ -0,0 +1,8 @@ +package com.github.deogicorgi.reactor.kafka.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.config.WebFluxConfigurer; + +@Configuration +public class WebFluxConfig implements WebFluxConfigurer { +} diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/exception/ProducerServiceException.java b/src/main/java/com/github/deogicorgi/reactor/kafka/exception/ProducerServiceException.java index 02a3938..78262ba 100644 --- a/src/main/java/com/github/deogicorgi/reactor/kafka/exception/ProducerServiceException.java +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/exception/ProducerServiceException.java @@ -1,6 +1,6 @@ package com.github.deogicorgi.reactor.kafka.exception; -public class ProducerServiceException extends Exception{ +public class ProducerServiceException extends Exception { public ProducerServiceException(Exception e) { super(e); } diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java index e3d4179..5872466 100644 --- a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java @@ -1,7 +1,28 @@ package com.github.deogicorgi.reactor.kafka.producer.message; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Getter; +import lombok.Setter; + +import java.time.LocalDateTime; + +@Getter +@Setter +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "type", + defaultImpl = KafkaUriProduceMessage.class) +@JsonSubTypes({ + @JsonSubTypes.Type(value = KafkaUriProduceMessage.class, name = "uri"), + @JsonSubTypes.Type(value = KafkaBodyProduceMessage.class, name = "message") +}) public abstract class AbstractKafkaProduceMessage { - protected String message; + protected String topic; + protected String type; + protected LocalDateTime requestedAt; + + public abstract String getRequestedMessage(); } diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/KafkaBodyProduceMessage.java b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/KafkaBodyProduceMessage.java new file mode 100644 index 0000000..95e8317 --- /dev/null +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/KafkaBodyProduceMessage.java @@ -0,0 +1,15 @@ +package com.github.deogicorgi.reactor.kafka.producer.message; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class KafkaBodyProduceMessage extends AbstractKafkaProduceMessage { + private String message; + + @Override + public String getRequestedMessage() { + return this.message; + } +} diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/KafkaUriProduceMessage.java b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/KafkaUriProduceMessage.java new file mode 100644 index 0000000..6abe0a9 --- /dev/null +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/KafkaUriProduceMessage.java @@ -0,0 +1,15 @@ +package com.github.deogicorgi.reactor.kafka.producer.message; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class KafkaUriProduceMessage extends AbstractKafkaProduceMessage { + private String uri; + + @Override + public String getRequestedMessage() { + return this.uri; + } +} diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/model/KafkaProduceResult.java b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/model/KafkaProduceResult.java new file mode 100644 index 0000000..f85dc76 --- /dev/null +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/model/KafkaProduceResult.java @@ -0,0 +1,40 @@ +package com.github.deogicorgi.reactor.kafka.producer.model; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage; +import lombok.Getter; + +import java.time.LocalDateTime; + +@Getter +public class KafkaProduceResult { + private Boolean status = true; + private String message; + @JsonIgnore + private Throwable error = null; + private String errorMessage = null; + private LocalDateTime requestedAt; + private LocalDateTime producedAt; + + public KafkaProduceResult(AbstractKafkaProduceMessage message) { + this.setMessage(message); + } + + public KafkaProduceResult(AbstractKafkaProduceMessage message, Throwable e) { + this.setMessage(message); + this.status = false; + this.error = e; + this.errorMessage = e.getMessage(); + } + + public Boolean hasError() { + return error != null; + } + + private void setMessage(AbstractKafkaProduceMessage message) { + this.message = message.getRequestedMessage(); + this.producedAt = LocalDateTime.now(); + this.requestedAt = message.getRequestedAt(); + } + +} diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/web/controller/ProducerController.java b/src/main/java/com/github/deogicorgi/reactor/kafka/web/controller/ProducerController.java index f6d4836..66a811e 100644 --- a/src/main/java/com/github/deogicorgi/reactor/kafka/web/controller/ProducerController.java +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/web/controller/ProducerController.java @@ -1,13 +1,27 @@ package com.github.deogicorgi.reactor.kafka.web.controller; +import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult; +import com.github.deogicorgi.reactor.kafka.web.service.ProduceService; import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; /** * 카프카 프로듀서 Controller */ @RestController @RequiredArgsConstructor +@RequestMapping("/produce") public class ProducerController { + private final ProduceService produceService; + + @PostMapping + public Mono send(@RequestBody AbstractKafkaProduceMessage message) { + return produceService.produceMessage(message); + } } diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/KafkaService.java b/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/KafkaService.java new file mode 100644 index 0000000..1725b85 --- /dev/null +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/KafkaService.java @@ -0,0 +1,32 @@ +package com.github.deogicorgi.reactor.kafka.web.service; + +import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactor.kafka.producer.message.KafkaUriProduceMessage; +import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; +import reactor.kafka.sender.KafkaSender; + +/** + * 카프카 서비스 + * 실제 카프카로 메시지 프로듀싱 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class KafkaService { + + private final KafkaSender producer; + + public Mono send(AbstractKafkaProduceMessage message) { + return producer.createOutbound() + .send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage()))) // 해당 topic으로 message 전송 + .then() + .map(ret -> new KafkaProduceResult(message)) + .onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e))); + } + +} diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/ProduceService.java b/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/ProduceService.java index 432b007..e23fb6d 100644 --- a/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/ProduceService.java +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/web/service/ProduceService.java @@ -1,9 +1,33 @@ package com.github.deogicorgi.reactor.kafka.web.service; +import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage; +import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; +/** + * 프로듀싱 서비스 + * Kafka 프로듀싱 전 로직 처리 + */ +@Slf4j @Service @RequiredArgsConstructor public class ProduceService { + + private final KafkaService kafkaService; + + public Mono produceMessage(AbstractKafkaProduceMessage message) { + return kafkaService.send(message).map(produceResult -> { + + if (produceResult.hasError()) { + // TODO 카프카 프로듀싱 실패일 경우 처리 + // ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등 + log.error("Kafka produce error : {}", produceResult.getErrorMessage()); + } + + return produceResult; + }); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 27a9101..47692f3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ server: port: 18080 -kafka : - host : test +kafka: + host: test