Compare commits

1 Commits

Author SHA1 Message Date
deogicorgi
5414a49bfd 테스트중...
테스트중...
2022-04-14 22:43:49 +09:00
6 changed files with 94 additions and 4 deletions

View File

@@ -13,18 +13,27 @@ import java.util.List;
@Component
public class KafkaMessageReceiver {
private final MessageBroker messageBroker;
/**
* KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작
*/
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers, MessageBroker messageBroker) {
this.messageBroker = messageBroker;
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
this.start(receiver);
}
}
public void start(KafkaReceiver<Integer, String> receiver) {
receiver.receive().subscribe(record -> {
log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
});
receiver.receive()
.map(record -> {
messageBroker.distribute();
return record;
}).onErrorContinue((throwable, o) -> {
log.error("{} --- {}",throwable.getClass().getSimpleName(), throwable.getMessage());
})
.subscribe();
}
}

View File

@@ -0,0 +1,25 @@
package com.github.deogicorgi.reactive.consumer.listener;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// 비동기 카프카 메시지 중개 클래스
@Component
public class MessageBroker {
private final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, 50, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
public void distribute() {
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(new MessageTask(i));
}
}
}

View File

@@ -0,0 +1,29 @@
package com.github.deogicorgi.reactive.consumer.listener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MessageTask implements Runnable {
private TestService service = new TestService();
private int i;
public MessageTask(int i) {
this.i = i;
}
@Override
public void run() {
try {
Thread.sleep(5);
log.info("{}", i);
service.test();
} catch (Exception e) {
log.error("run에서 잡은거야! {}", e.getMessage());
throw new RuntimeException(Thread.currentThread().getName(), e);
}
}
}

View File

@@ -0,0 +1,12 @@
package com.github.deogicorgi.reactive.consumer.listener;
import org.springframework.stereotype.Service;
@Service
public class TestService {
public void test() {
if (true) {
throw new RuntimeException("test메서드에서 발생");
}
}
}

View File

@@ -22,6 +22,7 @@ public class ProducerController {
@PostMapping
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaMessage message) {
return produceService.produceMessage(message);
}
}

View File

@@ -21,6 +21,20 @@ public class ProduceService {
private final FailureMessageService failureMessageService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
for (int i=0; i<100; i++) {
kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
}).subscribe();
}
return kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());