Compare commits
1 Commits
test-branc
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8702a9009c |
@@ -13,27 +13,18 @@ import java.util.List;
|
||||
@Component
|
||||
public class KafkaMessageReceiver {
|
||||
|
||||
private final MessageBroker messageBroker;
|
||||
|
||||
/**
|
||||
* KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작
|
||||
*/
|
||||
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers, MessageBroker messageBroker) {
|
||||
this.messageBroker = messageBroker;
|
||||
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
|
||||
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
|
||||
this.start(receiver);
|
||||
}
|
||||
}
|
||||
|
||||
public void start(KafkaReceiver<Integer, String> receiver) {
|
||||
receiver.receive()
|
||||
.map(record -> {
|
||||
messageBroker.distribute();
|
||||
return record;
|
||||
}).onErrorContinue((throwable, o) -> {
|
||||
log.error("{} --- {}",throwable.getClass().getSimpleName(), throwable.getMessage());
|
||||
|
||||
})
|
||||
.subscribe();
|
||||
receiver.receive().subscribe(record -> {
|
||||
log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package com.github.deogicorgi.reactive.consumer.listener;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
// 비동기 카프카 메시지 중개 클래스
|
||||
@Component
|
||||
public class MessageBroker {
|
||||
|
||||
private final ThreadPoolExecutor threadPoolExecutor =
|
||||
new ThreadPoolExecutor(10, 50, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10));
|
||||
|
||||
|
||||
public void distribute() {
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
threadPoolExecutor.submit(new MessageTask(i));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
package com.github.deogicorgi.reactive.consumer.listener;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class MessageTask implements Runnable {
|
||||
|
||||
private TestService service = new TestService();
|
||||
|
||||
private int i;
|
||||
|
||||
public MessageTask(int i) {
|
||||
this.i = i;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
Thread.sleep(5);
|
||||
log.info("{}", i);
|
||||
service.test();
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("run에서 잡은거야! {}", e.getMessage());
|
||||
throw new RuntimeException(Thread.currentThread().getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package com.github.deogicorgi.reactive.consumer.listener;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class TestService {
|
||||
public void test() {
|
||||
if (true) {
|
||||
throw new RuntimeException("test메서드에서 발생");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
@SpringBootApplication
|
||||
public class ProducerApplication {
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ProducerApplication.class, args);
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ public class ProducerController {
|
||||
|
||||
@PostMapping
|
||||
public Mono<KafkaProduceResult> send(@RequestBody AbstractKafkaMessage message) {
|
||||
|
||||
return produceService.produceMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,20 +21,6 @@ public class ProduceService {
|
||||
private final FailureMessageService failureMessageService;
|
||||
|
||||
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
|
||||
|
||||
for (int i=0; i<100; i++) {
|
||||
kafkaService.send(message)
|
||||
.map(produceResult -> {
|
||||
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
|
||||
if (produceResult.hasError()) {
|
||||
// TODO 카프카 프로듀싱 실패일 경우 처리
|
||||
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등
|
||||
log.error("Kafka produce error : {}", produceResult.getErrorMessage());
|
||||
}
|
||||
return produceResult;
|
||||
}).subscribe();
|
||||
}
|
||||
|
||||
return kafkaService.send(message)
|
||||
.map(produceResult -> {
|
||||
log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
|
||||
|
||||
Reference in New Issue
Block a user