주석 추가

This commit is contained in:
deogicorgi
2022-04-01 23:43:51 +09:00
parent 78be3a6b20
commit 7c179f693b
7 changed files with 85 additions and 22 deletions

View File

@@ -2,11 +2,16 @@ package com.github.deogicorgi.reactor.kafka.producer.message;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.github.deogicorgi.reactor.kafka.producer.values.ProduceMessageType;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
/**
* 카프카 메시지 베이스
* 프로듀서 내 에러 발생시 처리를 쉽게하기 위해 URI 형태와 Message 형태로 나눔
*/
@Getter
@Setter
@JsonTypeInfo(
@@ -14,13 +19,18 @@ import java.time.LocalDateTime;
property = "type",
defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaUriProduceMessage.class, name = "uri"),
@JsonSubTypes.Type(value = KafkaBodyProduceMessage.class, name = "message")
@JsonSubTypes.Type(value = KafkaUriProduceMessage.class, names = {"uri", "Uri", "URI"}),
@JsonSubTypes.Type(value = KafkaBodyProduceMessage.class, names = {"message", "Message", "MESSAGE"})
})
public abstract class AbstractKafkaProduceMessage {
// 요청 토픽
protected String topic;
protected String type;
// 메시지 타입 (uri , message)
protected ProduceMessageType type;
// 요청 시간
protected LocalDateTime requestedAt;
public abstract String getRequestedMessage();

View File

@@ -1,13 +1,23 @@
package com.github.deogicorgi.reactor.kafka.producer.message;
import com.github.deogicorgi.reactor.kafka.producer.values.ProduceMessageType;
import lombok.Getter;
import lombok.Setter;
/**
* JSON String message 카프카 메시지
*/
@Getter
@Setter
public class KafkaBodyProduceMessage extends AbstractKafkaProduceMessage {
// 요청 메시지
private String message;
public KafkaBodyProduceMessage () {
super.type = ProduceMessageType.Message;
}
@Override
public String getRequestedMessage() {
return this.message;

View File

@@ -1,13 +1,23 @@
package com.github.deogicorgi.reactor.kafka.producer.message;
import com.github.deogicorgi.reactor.kafka.producer.values.ProduceMessageType;
import lombok.Getter;
import lombok.Setter;
/**
* URI 카프카 메시지
*/
@Getter
@Setter
public class KafkaUriProduceMessage extends AbstractKafkaProduceMessage {
// 요청 URI
private String uri;
public KafkaUriProduceMessage () {
super.type = ProduceMessageType.URI;
}
@Override
public String getRequestedMessage() {
return this.uri;

View File

@@ -2,39 +2,64 @@ package com.github.deogicorgi.reactor.kafka.producer.model;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.values.ProduceMessageType;
import lombok.Getter;
import java.time.LocalDateTime;
/**
* 카프카 메시지 클래스
*/
@Getter
public class KafkaProduceResult {
// 메시지 전송 상태 - true : 전송완료, false : 전송실패
private Boolean status = true;
private String message;
// 메시지 전송 토픽
private String topic;
// 요청받은 메시지 타입 (uri, message)
private ProduceMessageType messageType;
// 요청받은 메시지 - URI 또는 JSON String
private String requestedMessage;
// 에러 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
@JsonIgnore
private Throwable error = null;
// 에러 메시지 - 전송과정 중 발생된 에러, 전송완료 일 경우 null
private String errorMessage = null;
// 메시지를 요청받은 시간
private LocalDateTime requestedAt;
// 메시지를 처리한 시간
private LocalDateTime producedAt;
public KafkaProduceResult(AbstractKafkaProduceMessage message) {
this.setMessage(message);
this.setRequestedMessage(message);
}
public KafkaProduceResult(AbstractKafkaProduceMessage message, Throwable e) {
this.setMessage(message);
this.setRequestedMessage(message);
this.status = false;
this.error = e;
this.errorMessage = e.getMessage();
this.producedAt = null;
}
public Boolean hasError() {
return error != null;
}
private void setMessage(AbstractKafkaProduceMessage message) {
this.message = message.getRequestedMessage();
private void setRequestedMessage(AbstractKafkaProduceMessage requestedMessage) {
this.topic = requestedMessage.getTopic();
this.messageType = requestedMessage.getType();
this.requestedMessage = requestedMessage.getRequestedMessage();
this.producedAt = LocalDateTime.now();
this.requestedAt = message.getRequestedAt();
this.requestedAt = requestedMessage.getRequestedAt();
}
}

View File

@@ -0,0 +1,6 @@
package com.github.deogicorgi.reactor.kafka.producer.values;
public enum ProduceMessageType {
URI,
Message;
}

View File

@@ -1,7 +1,6 @@
package com.github.deogicorgi.reactor.kafka.web.service;
import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.message.KafkaUriProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -23,9 +22,13 @@ public class KafkaService {
public Mono<KafkaProduceResult> send(AbstractKafkaProduceMessage message) {
return producer.createOutbound()
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage()))) // 해당 topic으로 message 전송
// 지정된 토픽으로 메시지 전송
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage())))
// 전송완료 된 레코드를 Outbound로 리턴
.then()
.map(ret -> new KafkaProduceResult(message))
// 에러 없이 전송이 완료 되었을 경우
.map(v -> new KafkaProduceResult(message))
// 에러가 발생하였을 경우
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
}

View File

@@ -19,15 +19,14 @@ public class ProduceService {
private final KafkaService kafkaService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaProduceMessage message) {
return kafkaService.send(message).map(produceResult -> {
if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
});
return kafkaService.send(message)
.map(produceResult -> {
if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
});
}
}