Consumer 추가

This commit is contained in:
deogicorgi
2022-04-03 21:54:59 +09:00
parent 547d1da2af
commit b09375afba
10 changed files with 196 additions and 7 deletions

View File

@@ -4,7 +4,7 @@ plugins {
id 'java' id 'java'
} }
group = 'com.github.deogicorgi.reactive' group = 'com.github.deogicorgi.reactive.consumer'
version = '0.0.1-SNAPSHOT' version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11' sourceCompatibility = '11'
@@ -20,9 +20,9 @@ repositories {
dependencies { dependencies {
implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.projectlombok:lombok' implementation 'io.projectreactor.kafka:reactor-kafka:1.3.11'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
} }
tasks.named('test') { tasks.named('test') {

View File

@@ -5,7 +5,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication @SpringBootApplication
public class ConsumerApplication { public class ConsumerApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args); SpringApplication.run(ConsumerApplication.class, args);
} }

View File

@@ -0,0 +1,76 @@
package com.github.deogicorgi.reactive.consumer.config;
import com.github.deogicorgi.reactive.consumer.config.model.KafkaReceiverProperty;
import com.github.deogicorgi.reactive.consumer.config.properties.KafkaProperties;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ObjectUtils;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProperties properties;
// deogicorgi-uri 리시버
@Bean("uriMessageReceiver")
public KafkaReceiver<Integer, String> uriMessageReceiver() throws Exception {
Map.Entry<String, KafkaReceiverProperty> deogicorgiUri = properties.getProperty("deogicorgiUri").orElse(null);
if (ObjectUtils.isEmpty(deogicorgiUri)) {
throw new Exception("property is null");
}
KafkaReceiverProperty property = deogicorgiUri.getValue();
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(getConsumerProps(property))
.subscription(Collections.singleton(property.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
// deogicorgi-message 리시버
@Bean("messageReceiver")
public KafkaReceiver<Integer, String> messageReceiver() throws Exception {
Map.Entry<String, KafkaReceiverProperty> deogicorgiUri = properties.getProperty("deogicorgiMessage").orElse(null);
if (ObjectUtils.isEmpty(deogicorgiUri)) {
throw new Exception("property is null");
}
KafkaReceiverProperty property = deogicorgiUri.getValue();
ReceiverOptions<Integer, String> receiverOptions =
ReceiverOptions.<Integer, String>create(getConsumerProps(property))
.subscription(Collections.singleton(property.getTopic()));
return KafkaReceiver.create(receiverOptions);
}
/******************************************************************
************************ Consumer Options ************************
******************************************************************/
// 컨슈머 옵션
private Map<String, Object> getConsumerProps(KafkaReceiverProperty property) {
return new HashMap<>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getHosts());
put(ConsumerConfig.GROUP_ID_CONFIG, property.getGroupId());
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
}};
}
}

View File

@@ -0,0 +1,25 @@
package com.github.deogicorgi.reactive.consumer.config.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 카프카 리시버 설정 프로퍼티
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class KafkaReceiverProperty {
// 리시버 이름
private String name;
// 담당할 토픽
private String topic;
// 리시버 그룹 아이디
private String groupId;
}

View File

@@ -0,0 +1,36 @@
package com.github.deogicorgi.reactive.consumer.config.properties;
import com.github.deogicorgi.reactive.consumer.config.model.KafkaReceiverProperty;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Getter
@Setter
@Component
@EnableConfigurationProperties
@ConfigurationProperties("kafka")
public class KafkaProperties {
// 카프카 호스트
private String hosts;
// 리시버 프로퍼티 맵
private Map<String, KafkaReceiverProperty> receiver = new HashMap<>();
public void setReceiver(Map<String, KafkaReceiverProperty> receivers) {
this.receiver = receivers;
}
public Optional<Map.Entry<String, KafkaReceiverProperty>> getProperty(String key) {
return this.receiver.entrySet()
.stream().filter(entry -> entry.getValue().getName().equals(key))
.findFirst();
}
}

View File

@@ -0,0 +1,30 @@
package com.github.deogicorgi.reactive.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.KafkaReceiver;
import java.util.List;
/**
* 카프카 리시버
*/
@Slf4j
@Component
public class KafkaMessageReceiver {
/**
* KafkaMessageReceiver가 생성될 때 모든 카프카 리시버 시작
*/
public KafkaMessageReceiver(List<KafkaReceiver<Integer, String>> kafkaReceivers) {
for (KafkaReceiver<Integer, String> receiver : kafkaReceivers) {
this.start(receiver);
}
}
public void start(KafkaReceiver<Integer, String> receiver) {
receiver.receive().subscribe(record -> {
log.info("Kafka Reciever result : Topic >> [{}], message >> [{}], Offset >> [{}]", record.topic(), record.value(), record.receiverOffset());
});
}
}

View File

@@ -0,0 +1,11 @@
kafka:
hosts: deogicorgi.home:29092
receiver :
uri:
name : deogicorgiUri
topic : deogicorgi-uri
groupId : deogicorgi-uri-1
message:
name : deogicorgiMessage
topic : deogicorgi-message
groupId : deogicorgi-message-1

View File

@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<!-- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
<pattern>%d{HH:mm:ss.SSS} [%thread] %highlight(%-5level) %cyan(%logger{36}) - %msg %n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@@ -23,7 +23,7 @@ public class ProduceService {
public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) { public Mono<KafkaProduceResult> produceMessage(AbstractKafkaMessage message) {
return kafkaService.send(message) return kafkaService.send(message)
.map(produceResult -> { .map(produceResult -> {
failureMessageService.produceFailure(); log.info("Kafka Sender result : Topic >> [{}], message >> [{}]", produceResult.getTopic(), produceResult.getRequestedMessage());
if (produceResult.hasError()) { if (produceResult.hasError()) {
// TODO 카프카 프로듀싱 실패일 경우 처리 // TODO 카프카 프로듀싱 실패일 경우 처리
// ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등 // ex ) 처리하지못한 요청을 몽고등에 저장 후 재시도, 로깅 등등