diff --git a/build.gradle b/build.gradle index c6a464d..4aa5df6 100644 --- a/build.gradle +++ b/build.gradle @@ -19,17 +19,24 @@ repositories { } dependencies { + // Spring boot implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'io.projectreactor.kafka:reactor-kafka:1.3.11' + // swagger ui implementation 'io.springfox:springfox-boot-starter:3.0.0' implementation 'io.springfox:springfox-swagger-ui:3.0.0' + // logging + implementation('org.slf4j:jcl-over-slf4j') + implementation('ch.qos.logback:logback-classic') + // lombok compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' + // test testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' } diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/config/KafkaConfig.java b/src/main/java/com/github/deogicorgi/reactor/kafka/config/KafkaConfig.java index 772f1fa..c76b186 100644 --- a/src/main/java/com/github/deogicorgi/reactor/kafka/config/KafkaConfig.java +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/config/KafkaConfig.java @@ -1,10 +1,45 @@ package com.github.deogicorgi.reactor.kafka.config; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import reactor.core.scheduler.Schedulers; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; /** * Kafka 설정 */ @Configuration public class KafkaConfig { + + private String host; + private String groupId; + + /****************************************************************** + ************************ Producer Options ************************ + ******************************************************************/ + + @Bean("kafkaSender") + public KafkaSender kafkaSender() { + SenderOptions senderOptions = SenderOptions.create(getProducerProps()); + senderOptions.scheduler(Schedulers.parallel()); + senderOptions.closeTimeout(Duration.ofSeconds(5)); + + return KafkaSender.create(senderOptions); + } + + private Map getProducerProps() { + return new HashMap<>() {{ + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host); + put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); + }}; + } } diff --git a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java index 64d5041..e3d4179 100644 --- a/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java +++ b/src/main/java/com/github/deogicorgi/reactor/kafka/producer/message/AbstractKafkaProduceMessage.java @@ -1,19 +1,7 @@ package com.github.deogicorgi.reactor.kafka.producer.message; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.deogicorgi.reactor.kafka.exception.ProducerServiceException; - public abstract class AbstractKafkaProduceMessage { protected String message; - public void setMessage(Object message) throws ProducerServiceException { - ObjectMapper objectMapper = new ObjectMapper(); - try { - this.message = objectMapper.writeValueAsString(message); - } catch (Exception e) { - throw new ProducerServiceException(e); - } - - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8b13789..27a9101 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1 +1,6 @@ +server: + port: 18080 + +kafka : + host : test