실제 카프카 세팅

This commit is contained in:
deogicorgi
2022-04-02 19:20:02 +09:00
parent 543354e39c
commit b5f2b70c76
6 changed files with 48 additions and 8 deletions

22
docker-compose.yml Normal file
View File

@@ -0,0 +1,22 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://deogicorgi.home:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

View File

@@ -31,6 +31,7 @@ dependencies {
// // logging
// implementation('org.slf4j:jcl-over-slf4j')
// implementation('ch.qos.logback:logback-classic')
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
// test
testImplementation 'org.springframework.boot:spring-boot-starter-test'

View File

@@ -1,5 +1,7 @@
package com.github.deogicorgi.reactive.producer.config;
import com.github.deogicorgi.reactive.producer.config.properties.KafkaProperties;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
@@ -16,10 +18,10 @@ import java.util.Map;
* Kafka 설정
*/
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private String host;
private String groupId;
private final KafkaProperties properties;
/******************************************************************
************************ Producer Options ************************
@@ -30,16 +32,15 @@ public class KafkaConfig {
SenderOptions<String, Object> senderOptions = SenderOptions.create(getProducerProps());
senderOptions.scheduler(Schedulers.parallel());
senderOptions.closeTimeout(Duration.ofSeconds(5));
return KafkaSender.create(senderOptions);
}
private Map<String, Object> getProducerProps() {
return new HashMap<>() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
}};
}
}

View File

@@ -0,0 +1,16 @@
package com.github.deogicorgi.reactive.producer.config.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@EnableConfigurationProperties
@ConfigurationProperties("kafka")
public class KafkaProperties {
private String hosts;
}

View File

@@ -27,8 +27,8 @@ public class KafkaService {
// 전송완료 된 레코드를 Outbound로 리턴
.then()
// 에러 없이 전송이 완료 되었을 경우
.map(v -> new KafkaProduceResult(message))
// 에러가 발생하였을 경우
.thenReturn(new KafkaProduceResult(message))
// 에러가 발생을 경우
.onErrorResume(e -> Mono.just(new KafkaProduceResult(message, e)));
}

View File

@@ -2,5 +2,5 @@ server:
port: 18080
kafka:
host: test
hosts: deogicorgi.home:29092