1 Commits

Author SHA1 Message Date
Deogicorgi
8702a9009c 테스트 커밋
테스트커밋
2023-03-23 06:41:23 +09:00
7 changed files with 5 additions and 94 deletions

View File

@@ -13,27 +13,18 @@ import java.util.List;
@Component
public class KafkaMessageReceiver {
private final MessageBroker messageBroker;
/**
* KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작
*/
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers, MessageBroker messageBroker) {
this.messageBroker = messageBroker;
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
this.start(receiver);
}
}
public void start(KafkaReceiver<Integer, String> receiver) {
receiver.receive()
.map(record -> {
messageBroker.distribute();
return record;
}).onErrorContinue((throwable, o) -> {
log.error("{} --- {}",throwable.getClass().getSimpleName(), throwable.getMessage());
})
.subscribe();
receiver.receive().subscribe(record -> {
log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
});
}
}

View File

@@ -1,25 +0,0 @@
package com.github.deogicorgi.reactive.consumer.listener;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// 비동기 카프카 메시지 중개 클래스
@Component
public class MessageBroker {
private final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, 50, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
public void distribute() {
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(new MessageTask(i));
}
}
}

View File

@@ -1,29 +0,0 @@
package com.github.deogicorgi.reactive.consumer.listener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MessageTask implements Runnable {
private TestService service = new TestService();
private int i;
public MessageTask(int i) {
this.i = i;
}
@Override
public void run() {
try {
Thread.sleep(5);
log.info("{}", i);
service.test();
} catch (Exception e) {
log.error("run에서 잡은거야! {}", e.getMessage());
throw new RuntimeException(Thread.currentThread().getName(), e);
}
}
}

View File

@@ -1,12 +0,0 @@
package com.github.deogicorgi.reactive.consumer.listener;
import org.springframework.stereotype.Service;
@Service
public class TestService {
public void test() {
if (true) {
throw new RuntimeException("test메서드에서 발생");
}
}
}

View File

@@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

View File

@@ -22,7 +22,6 @@ public class ProducerController {
@PostMapping
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaMessage message) {
return produceService.produceMessage(message);
}
}

View File

@@ -21,20 +21,6 @@ public class ProduceService {
private final FailureMessageService failureMessageService;
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
for (int i=0; i<100; i++) {
kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
}
return produceResult;
}).subscribe();
}
return kafkaService.send(message)
.map(produceResult -> {
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());