Compare commits
1 Commits
master
...
test-branc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5414a49bfd |
@@ -13,18 +13,27 @@ import java.util.List;
|
|||||||
@Component
|
@Component
|
||||||
public class KafkaMessageReceiver {
|
public class KafkaMessageReceiver {
|
||||||
|
|
||||||
|
private final MessageBroker messageBroker;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작
|
* KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작
|
||||||
*/
|
*/
|
||||||
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
|
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers, MessageBroker messageBroker) {
|
||||||
|
this.messageBroker = messageBroker;
|
||||||
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
|
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
|
||||||
this.start(receiver);
|
this.start(receiver);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(KafkaReceiver<Integer, String> receiver) {
|
public void start(KafkaReceiver<Integer, String> receiver) {
|
||||||
receiver.receive().subscribe(record -> {
|
receiver.receive()
|
||||||
log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
|
.map(record -> {
|
||||||
});
|
messageBroker.distribute();
|
||||||
|
return record;
|
||||||
|
}).onErrorContinue((throwable, o) -> {
|
||||||
|
log.error("{} --- {}",throwable.getClass().getSimpleName(), throwable.getMessage());
|
||||||
|
|
||||||
|
})
|
||||||
|
.subscribe();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,25 @@
|
|||||||
|
package com.github.deogicorgi.reactive.consumer.listener;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
// 비동기 카프카 메시지 중개 클래스
|
||||||
|
@Component
|
||||||
|
public class MessageBroker {
|
||||||
|
|
||||||
|
private final ThreadPoolExecutor threadPoolExecutor =
|
||||||
|
new ThreadPoolExecutor(10, 50, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
|
||||||
|
|
||||||
|
|
||||||
|
public void distribute() {
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
threadPoolExecutor.submit(new MessageTask(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.github.deogicorgi.reactive.consumer.listener;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class MessageTask implements Runnable {
|
||||||
|
|
||||||
|
private TestService service = new TestService();
|
||||||
|
|
||||||
|
private int i;
|
||||||
|
|
||||||
|
public MessageTask(int i) {
|
||||||
|
this.i = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
|
||||||
|
Thread.sleep(5);
|
||||||
|
log.info("{}", i);
|
||||||
|
service.test();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("run에서 잡은거야! {}", e.getMessage());
|
||||||
|
throw new RuntimeException(Thread.currentThread().getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
package com.github.deogicorgi.reactive.consumer.listener;
|
||||||
|
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class TestService {
|
||||||
|
public void test() {
|
||||||
|
if (true) {
|
||||||
|
throw new RuntimeException("test메서드에서 발생");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,6 +22,7 @@ public class ProducerController {
|
|||||||
|
|
||||||
@PostMapping
|
@PostMapping
|
||||||
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaMessage message) {
|
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaMessage message) {
|
||||||
|
|
||||||
return produceService.produceMessage(message);
|
return produceService.produceMessage(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,20 @@ public class ProduceService {
|
|||||||
private final FailureMessageService failureMessageService;
|
private final FailureMessageService failureMessageService;
|
||||||
|
|
||||||
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
|
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
|
||||||
|
|
||||||
|
for (int i=0; i<100; i++) {
|
||||||
|
kafkaService.send(message)
|
||||||
|
.map(produceResult -> {
|
||||||
|
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
|
||||||
|
if (produceResult.hasError()) {
|
||||||
|
// TODO 카프카 프로듀싱 실패일 경우 처리
|
||||||
|
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
|
||||||
|
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
|
||||||
|
}
|
||||||
|
return produceResult;
|
||||||
|
}).subscribe();
|
||||||
|
}
|
||||||
|
|
||||||
return kafkaService.send(message)
|
return kafkaService.send(message)
|
||||||
.map(produceResult -> {
|
.map(produceResult -> {
|
||||||
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
|
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
|
||||||
|
|||||||
Reference in New Issue
Block a user