diff --git a/consumer/build.gradle b/consumer/build.gradle index e3f3c7f..875b3e0 100644 --- a/consumer/build.gradle +++ b/consumer/build.gradle @@ -4,7 +4,7 @@ plugins { id 'java' } -group = 'com.github.deogicorgi.reactive' +group = 'com.github.deogicorgi.reactive.consumer' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' @@ -20,9 +20,9 @@ repositories { dependencies { implementation 'org.springframework.boot:spring-boot-starter' - compileOnly 'org.projectlombok:lombok' - annotationProcessor 'org.projectlombok:lombok' - testImplementation 'org.springframework.boot:spring-boot-starter-test' + implementation 'io.projectreactor.kafka:reactor-kafka:1.3.11' + + annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' } tasks.named('test') { diff --git a/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/ConsumerApplication.java b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/ConsumerApplication.java index cea11d9..818178f 100644 --- a/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/ConsumerApplication.java +++ b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/ConsumerApplication.java @@ -5,7 +5,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ConsumerApplication { - public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } diff --git a/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/KafkaConfig.java b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/KafkaConfig.java new file mode 100644 index 0000000..1bc9e19 --- /dev/null +++ b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/KafkaConfig.java @@ -0,0 +1,76 @@ +package com.github.deogicorgi.reactive.consumer.config; + +import com.github.deogicorgi.reactive.consumer.config.model.KafkaReceiverProperty; +import com.github.deogicorgi.reactive.consumer.config.properties.KafkaProperties; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ObjectUtils; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Configuration +@RequiredArgsConstructor +public class KafkaConfig { + + private final KafkaProperties properties; + + // deogicorgi-uri 리시버 + @Bean("uriMessageReceiver") + public KafkaReceiver uriMessageReceiver() throws Exception { + Map.Entry deogicorgiUri = properties.getProperty("deogicorgiUri").orElse(null); + + if (ObjectUtils.isEmpty(deogicorgiUri)) { + throw new Exception("property is null"); + } + + KafkaReceiverProperty property = deogicorgiUri.getValue(); + + ReceiverOptions receiverOptions = + ReceiverOptions.create(getConsumerProps(property)) + .subscription(Collections.singleton(property.getTopic())); + + return KafkaReceiver.create(receiverOptions); + } + + // deogicorgi-message 리시버 + @Bean("messageReceiver") + public KafkaReceiver messageReceiver() throws Exception { + Map.Entry deogicorgiUri = properties.getProperty("deogicorgiMessage").orElse(null); + + if (ObjectUtils.isEmpty(deogicorgiUri)) { + throw new Exception("property is null"); + } + + KafkaReceiverProperty property = deogicorgiUri.getValue(); + + ReceiverOptions receiverOptions = + ReceiverOptions.create(getConsumerProps(property)) + .subscription(Collections.singleton(property.getTopic())); + + return KafkaReceiver.create(receiverOptions); + } + + /****************************************************************** + ************************ Consumer Options ************************ + ******************************************************************/ + + // 컨슈머 옵션 + private Map getConsumerProps(KafkaReceiverProperty property) { + return new HashMap<>() {{ + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts()); + put(ConsumerConfig.GROUP_ID_CONFIG, property.getGroupId()); + put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000); + }}; + } +} diff --git a/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/model/KafkaReceiverProperty.java b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/model/KafkaReceiverProperty.java new file mode 100644 index 0000000..bc9940d --- /dev/null +++ b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/model/KafkaReceiverProperty.java @@ -0,0 +1,25 @@ +package com.github.deogicorgi.reactive.consumer.config.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * 카프카 리시버 설정 프로퍼티 + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class KafkaReceiverProperty { + + // 리시버 이름 + private String name; + + // 담당할 토픽 + private String topic; + + // 리시버 그룹 아이디 + private String groupId; +} diff --git a/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/properties/KafkaProperties.java b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/properties/KafkaProperties.java new file mode 100644 index 0000000..9056a53 --- /dev/null +++ b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/config/properties/KafkaProperties.java @@ -0,0 +1,36 @@ +package com.github.deogicorgi.reactive.consumer.config.properties; + +import com.github.deogicorgi.reactive.consumer.config.model.KafkaReceiverProperty; +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Getter +@Setter +@Component +@EnableConfigurationProperties +@ConfigurationProperties("kafka") +public class KafkaProperties { + + // 카프카 호스트 + private String hosts; + + // 리시버 프로퍼티 맵 + private Map receiver = new HashMap<>(); + + public void setReceiver(Map receivers) { + this.receiver = receivers; + } + + public Optional> getProperty(String key) { + return this.receiver.entrySet() + .stream().filter(entry -> entry.getValue().getName().equals(key)) + .findFirst(); + } +} diff --git a/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/listener/KafkaMessageReceiver.java b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/listener/KafkaMessageReceiver.java new file mode 100644 index 0000000..67d5a8c --- /dev/null +++ b/consumer/src/main/java/com/github/deogicorgi/reactive/consumer/listener/KafkaMessageReceiver.java @@ -0,0 +1,30 @@ +package com.github.deogicorgi.reactive.consumer.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import reactor.kafka.receiver.KafkaReceiver; + +import java.util.List; + +/** + * 카프카 리시버 + */ +@Slf4j +@Component +public class KafkaMessageReceiver { + + /** + * KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작 + */ + public KafkaMessageReceiver(List> kafkaReceivers) { + for (KafkaReceiver receiver : kafkaReceivers) { + this.start(receiver); + } + } + + public void start(KafkaReceiver receiver) { + receiver.receive().subscribe(record -> { + log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset()); + }); + } +} diff --git a/consumer/src/main/resources/application.properties b/consumer/src/main/resources/application.properties deleted file mode 100644 index 8b13789..0000000 --- a/consumer/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/consumer/src/main/resources/application.yml b/consumer/src/main/resources/application.yml new file mode 100644 index 0000000..845ae9c --- /dev/null +++ b/consumer/src/main/resources/application.yml @@ -0,0 +1,11 @@ +kafka: + hosts: deogicorgi.home:29092 + receiver : + uri: + name : deogicorgiUri + topic : deogicorgi-uri + groupId : deogicorgi-uri-1 + message: + name : deogicorgiMessage + topic : deogicorgi-message + groupId : deogicorgi-message-1 diff --git a/consumer/src/main/resources/logback.xml b/consumer/src/main/resources/logback.xml new file mode 100644 index 0000000..66a5f32 --- /dev/null +++ b/consumer/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %highlight(%-5level) %cyan(%logger{36}) - %msg %n + + + + + + + \ No newline at end of file diff --git a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java index 654b02d..5eb7b54 100644 --- a/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java +++ b/producer/src/main/java/com/github/deogicorgi/reactive/producer/web/service/ProduceService.java @@ -23,7 +23,7 @@ public class ProduceService { public Mono produceMessage(AbstractKafkaMessage message) { return kafkaService.send(message) .map(produceResult -> { - failureMessageService.produceFailure(); + log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage()); if (produceResult.hasError()) { // TODO 카프카 프로듀싱 실패일 경우 처리 // ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등