클래스명 수정

This commit is contained in:
deogicorgi
2022-04-03 00:17:32 +09:00
parent b5f2b70c76
commit 547d1da2af
9 changed files with 38 additions and 21 deletions

View File

@@ -17,12 +17,12 @@ import java.time.LocalDateTime;
@JsonTypeInfo( @JsonTypeInfo(
use = JsonTypeInfo.Id.NAME, use = JsonTypeInfo.Id.NAME,
property = "type", property = "type",
defaultImpl = KafkaUriProduceMessage.class) defaultImpl = KafkaUriMessage.class)
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(value = KafkaUriProduceMessage.class, names = {"uri", "Uri", "URI"}), @JsonSubTypes.Type(value = KafkaUriMessage.class, names = {"uri", "Uri", "URI"}),
@JsonSubTypes.Type(value = KafkaBodyProduceMessage.class, names = {"message", "Message", "MESSAGE"}) @JsonSubTypes.Type(value = KafkaBodyMessage.class, names = {"message", "Message", "MESSAGE"})
}) })
public abstract class AbstractKafkaProduceMessage { public abstract class AbstractKafkaMessage {
// 요청 토픽 // 요청 토픽
protected String topic; protected String topic;

View File

@@ -9,12 +9,12 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public class KafkaBodyProduceMessage extends AbstractKafkaProduceMessage { public class KafkaBodyMessage extends AbstractKafkaMessage {
// 요청 메시지 // 요청 메시지
private String message; private String message;
public KafkaBodyProduceMessage() { public KafkaBodyMessage() {
super.type = ProduceMessageType.Message; super.type = ProduceMessageType.Message;
} }

View File

@@ -9,12 +9,12 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public class KafkaUriProduceMessage extends AbstractKafkaProduceMessage { public class KafkaUriMessage extends AbstractKafkaMessage {
// 요청 URI // 요청 URI
private String uri; private String uri;
public KafkaUriProduceMessage() { public KafkaUriMessage() {
super.type = ProduceMessageType.URI; super.type = ProduceMessageType.URI;
} }

View File

@@ -1,14 +1,14 @@
package com.github.deogicorgi.reactive.common.model; package com.github.deogicorgi.reactive.common.model;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage;
import com.github.deogicorgi.reactive.common.value.ProduceMessageType; import com.github.deogicorgi.reactive.common.value.ProduceMessageType;
import lombok.Getter; import lombok.Getter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* 카프카 메시지 클래스 * 카프카 메시지 전송결과 클래스
*/ */
@Getter @Getter
public class KafkaProduceResult { public class KafkaProduceResult {
@@ -38,11 +38,11 @@ public class KafkaProduceResult {
// 메시지를 처리한 시간 // 메시지를 처리한 시간
private LocalDateTime producedAt; private LocalDateTime producedAt;
public KafkaProduceResult(AbstractKafkaProduceMessage message) { public KafkaProduceResult(AbstractKafkaMessage message) {
this.setRequestedMessage(message); this.setRequestedMessage(message);
} }
public KafkaProduceResult(AbstractKafkaProduceMessage message, Throwable e) { public KafkaProduceResult(AbstractKafkaMessage message, Throwable e) {
this.setRequestedMessage(message); this.setRequestedMessage(message);
this.status = false; this.status = false;
this.error = e; this.error = e;
@@ -54,7 +54,7 @@ public class KafkaProduceResult {
return error != null; return error != null;
} }
private void setRequestedMessage(AbstractKafkaProduceMessage requestedMessage) { private void setRequestedMessage(AbstractKafkaMessage requestedMessage) {
this.topic = requestedMessage.getTopic(); this.topic = requestedMessage.getTopic();
this.messageType = requestedMessage.getType(); this.messageType = requestedMessage.getType();
this.requestedMessage = requestedMessage.getRequestedMessage(); this.requestedMessage = requestedMessage.getRequestedMessage();

View File

@@ -27,6 +27,7 @@ public class KafkaConfig {
************************ Producer Options ************************ ************************ Producer Options ************************
******************************************************************/ ******************************************************************/
// 기본 설정들로 구성
@Bean("kafkaSender") @Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() { public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps()); SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
@@ -35,6 +36,7 @@ public class KafkaConfig {
return KafkaSender.create(senderOptions); return KafkaSender.create(senderOptions);
} }
// 프로듀서 옵션
private Map<String, Object> getProducerProps() { private Map<String, Object> getProducerProps() {
return new HashMap<>() {{ return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts()); put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());

View File

@@ -1,6 +1,6 @@
package com.github.deogicorgi.reactive.producer.service; package com.github.deogicorgi.reactive.producer.service;
import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage;
import com.github.deogicorgi.reactive.common.model.KafkaProduceResult; import com.github.deogicorgi.reactive.common.model.KafkaProduceResult;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -20,16 +20,15 @@ public class KafkaService {
private final KafkaSender<String, Object> producer; private final KafkaSender<String, Object> producer;
public Mono<KafkaProduceResult> send(AbstractKafkaProduceMessage message) { public Mono<KafkaProduceResult> send(AbstractKafkaMessage message) {
return producer.createOutbound() return producer.createOutbound()
// 지정된 토픽으로 메시지 전송 // 지정된 토픽으로 메시지 전송
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage()))) .send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
// 전송완료 된 레코드를 Outbound로 리턴
.then() .then()
// 에러 없이 전송이 완료 되었을 경우 // 에러 없이 전송이 완료 되었을 경우
.thenReturn(new KafkaProduceResult(message)) .thenReturn(new KafkaProduceResult(message))
// 에러가 발생했을 경우 // 에러가 발생했을 경우
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e))); .onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
} }
} }

View File

@@ -1,6 +1,6 @@
package com.github.deogicorgi.reactive.producer.web.controller; package com.github.deogicorgi.reactive.producer.web.controller;
import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage;
import com.github.deogicorgi.reactive.common.model.KafkaProduceResult; import com.github.deogicorgi.reactive.common.model.KafkaProduceResult;
import com.github.deogicorgi.reactive.producer.web.service.ProduceService; import com.github.deogicorgi.reactive.producer.web.service.ProduceService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@@ -21,7 +21,7 @@ public class ProducerController {
private final ProduceService produceService; private final ProduceService produceService;
@PostMapping @PostMapping
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaProduceMessage message) { public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaMessage message) {
return produceService.produceMessage(message); return produceService.produceMessage(message);
} }
} }

View File

@@ -0,0 +1,14 @@
package com.github.deogicorgi.reactive.producer.web.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class FailureMessageService {
public void produceFailure() {
}
}

View File

@@ -1,6 +1,6 @@
package com.github.deogicorgi.reactive.producer.web.service; package com.github.deogicorgi.reactive.producer.web.service;
import com.github.deogicorgi.reactive.common.message.AbstractKafkaProduceMessage; import com.github.deogicorgi.reactive.common.message.AbstractKafkaMessage;
import com.github.deogicorgi.reactive.common.model.KafkaProduceResult; import com.github.deogicorgi.reactive.common.model.KafkaProduceResult;
import com.github.deogicorgi.reactive.producer.service.KafkaService; import com.github.deogicorgi.reactive.producer.service.KafkaService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@@ -18,10 +18,12 @@ import reactor.core.publisher.Mono;
public class ProduceService { public class ProduceService {
private final KafkaService kafkaService; private final KafkaService kafkaService;
private final FailureMessageService failureMessageService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaProduceMessage message) { public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
return kafkaService.send(message) return kafkaService.send(message)
.map(produceResult -> { .map(produceResult -> {
failureMessageService.produceFailure();
if (produceResult.hasError()) { if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리 // TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등 // ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등