카프카 설정 추가

This commit is contained in:
deogicorgi
2022-04-01 20:29:38 +09:00
parent a854fd0f35
commit 95104fad1e
4 changed files with 47 additions and 12 deletions

View File

@@ -19,17 +19,24 @@ repositories {
}
dependencies {
// Spring boot
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'io.projectreactor.kafka:reactor-kafka:1.3.11'
// swagger ui
implementation 'io.springfox:springfox-boot-starter:3.0.0'
implementation 'io.springfox:springfox-swagger-ui:3.0.0'
// logging
implementation('org.slf4j:jcl-over-slf4j')
implementation('ch.qos.logback:logback-classic')
// lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
// test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}

View File

@@ -1,10 +1,45 @@
package com.github.deogicorgi.reactor.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka 설정
*/
@Configuration
public class KafkaConfig {
private String host;
private String groupId;
/******************************************************************
************************ Producer Options ************************
******************************************************************/
@Bean("kafkaSender")
public KafkaSender<String, Object> kafkaSender() {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
private Map<String, Object> getProducerProps() {
return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
}};
}
}

View File

@@ -1,19 +1,7 @@
package com.github.deogicorgi.reactor.kafka.producer.message;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.deogicorgi.reactor.kafka.exception.ProducerServiceException;
public abstract class AbstractKafkaProduceMessage {
protected String message;
public void setMessage(Object message) throws ProducerServiceException {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.message = objectMapper.writeValueAsString(message);
} catch (Exception e) {
throw new ProducerServiceException(e);
}
}
}

View File

@@ -1 +1,6 @@
server:
port: 18080
kafka :
host : test