전반적인 프로듀싱 로직 작성

This commit is contained in:
deogicorgi
2022-04-01 22:52:23 +09:00
parent de3600b476
commit 433a76a721
10 changed files with 173 additions and 4 deletions

View File

@@ -0,0 +1,8 @@
package com.github.deogicorgi.reactor.kafka.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.WebFluxConfigurer;
@Configuration
public class WebFluxConfig implements WebFluxConfigurer {
}

View File

@@ -1,7 +1,28 @@
package com.github.deogicorgi.reactor.kafka.producer.message; package com.github.deogicorgi.reactor.kafka.producer.message;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
@Getter
@Setter
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "type",
defaultImpl = KafkaUriProduceMessage.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaUriProduceMessage.class, name = "uri"),
@JsonSubTypes.Type(value = KafkaBodyProduceMessage.class, name = "message")
})
public abstract class AbstractKafkaProduceMessage { public abstract class AbstractKafkaProduceMessage {
protected String message; protected String topic;
protected String type;
protected LocalDateTime requestedAt;
public abstract String getRequestedMessage();
} }

View File

@@ -0,0 +1,15 @@
package com.github.deogicorgi.reactor.kafka.producer.message;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class KafkaBodyProduceMessage extends AbstractKafkaProduceMessage {
private String message;
@Override
public String getRequestedMessage() {
return this.message;
}
}

View File

@@ -0,0 +1,15 @@
package com.github.deogicorgi.reactor.kafka.producer.message;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class KafkaUriProduceMessage extends AbstractKafkaProduceMessage {
private String uri;
@Override
public String getRequestedMessage() {
return this.uri;
}
}

View File

@@ -0,0 +1,40 @@
package com.github.deogicorgi.reactor.kafka.producer.model;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage;
import lombok.Getter;
import java.time.LocalDateTime;
@Getter
public class KafkaProduceResult {
private Boolean status = true;
private String message;
@JsonIgnore
private Throwable error = null;
private String errorMessage = null;
private LocalDateTime requestedAt;
private LocalDateTime producedAt;
public KafkaProduceResult(AbstractKafkaProduceMessage message) {
this.setMessage(message);
}
public KafkaProduceResult(AbstractKafkaProduceMessage message, Throwable e) {
this.setMessage(message);
this.status = false;
this.error = e;
this.errorMessage = e.getMessage();
}
public Boolean hasError() {
return error != null;
}
private void setMessage(AbstractKafkaProduceMessage message) {
this.message = message.getRequestedMessage();
this.producedAt = LocalDateTime.now();
this.requestedAt = message.getRequestedAt();
}
}

View File

@@ -1,13 +1,27 @@
package com.github.deogicorgi.reactor.kafka.web.controller; package com.github.deogicorgi.reactor.kafka.web.controller;
import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult;
import com.github.deogicorgi.reactor.kafka.web.service.ProduceService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/** /**
* 카프카 프로듀서 Controller * 카프카 프로듀서 Controller
*/ */
@RestController @RestController
@RequiredArgsConstructor @RequiredArgsConstructor
@RequestMapping("/produce")
public class ProducerController { public class ProducerController {
private final ProduceService produceService;
@PostMapping
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaProduceMessage message) {
return produceService.produceMessage(message);
}
} }

View File

@@ -0,0 +1,32 @@
package com.github.deogicorgi.reactor.kafka.web.service;
import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.message.KafkaUriProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
/**
* 카프카 서비스
* 실제 카프카로 메시지 프로듀싱
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaService {
private final KafkaSender<String, Object> producer;
public Mono<KafkaProduceResult> send(AbstractKafkaProduceMessage message) {
return producer.createOutbound()
.send(Mono.just(new ProducerRecord<>(message.getTopic(), null, message.getRequestedMessage()))) // 해당 topic으로 message 전송
.then()
.map(ret -> new KafkaProduceResult(message))
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
}
}

View File

@@ -1,9 +1,33 @@
package com.github.deogicorgi.reactor.kafka.web.service; package com.github.deogicorgi.reactor.kafka.web.service;
import com.github.deogicorgi.reactor.kafka.producer.message.AbstractKafkaProduceMessage;
import com.github.deogicorgi.reactor.kafka.producer.model.KafkaProduceResult;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
/**
* 프로듀싱 서비스
* Kafka 프로듀싱 전 로직 처리
*/
@Slf4j
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class ProduceService { public class ProduceService {
private final KafkaService kafkaService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaProduceMessage message) {
return kafkaService.send(message).map(produceResult -> {
if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
});
}
} }