From 67d9955fe2cdd73b993a8989160d4b5e2d54a486 Mon Sep 17 00:00:00 2001 From: abel Date: Tue, 16 Nov 2021 00:52:48 +0900 Subject: [PATCH] [Update] spring cloud stream kafka sample --- build.gradle | 4 +-- .../kafka/consumer/KafkaMessageConsumer.java | 10 +++--- .../KafkaSpringCloudMessageConsumer.java | 26 ++++++++++++++ .../controller/MessageSendController.java | 25 ++++++++++++- .../kafka/producer/KafkaMessageProducer.java | 13 +++++++ src/main/resources/application.yaml | 35 ++++++++++++++++++- 6 files changed, 105 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java diff --git a/build.gradle b/build.gradle index 784e27e..fc9b6d3 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'org.springframework.boot' version '2.5.6' + id 'org.springframework.boot' version '2.4.1' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } @@ -13,7 +13,7 @@ repositories { } ext { - set('springCloudVersion', "2020.0.4") + set('springCloudVersion', "Hoxton.SR9") } dependencies { diff --git a/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java b/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java index ffe1f6b..6825e42 100644 --- a/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java +++ b/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java @@ -1,23 +1,25 @@ package com.spring.kafka.consumer; import com.spring.kafka.domain.model.User; +import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; +@Slf4j @Component public class KafkaMessageConsumer { @KafkaListener(topics = "domain-event") public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) { - System.out.println("Received Headers : "+headers); - System.out.println("Received Payloads : "+message); + log.debug("Received Headers : " + headers); + log.debug("Received Payloads : " + message); } @KafkaListener(topics = "domain-event-user") public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { - System.out.println("Received Headers : "+headers); - System.out.println("Received Payloads : "+user.toString()); + log.debug("Received Headers : " + headers); + log.debug("Received Payloads : " + user.toString()); } } diff --git a/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java b/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java new file mode 100644 index 0000000..7f2b148 --- /dev/null +++ b/src/main/java/com/spring/kafka/consumer/KafkaSpringCloudMessageConsumer.java @@ -0,0 +1,26 @@ +package com.spring.kafka.consumer; + +import com.spring.kafka.domain.model.User; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; + +@Slf4j +@Component +public class KafkaSpringCloudMessageConsumer { + @Bean + Consumer domainEventString() { + return input -> { + System.out.println("Received Message : " + input); + }; + } + + @Bean + Consumer domainEventModel() { + return input -> { + System.out.println("Received Message : " + input); + }; + } +} diff --git a/src/main/java/com/spring/kafka/controller/MessageSendController.java b/src/main/java/com/spring/kafka/controller/MessageSendController.java index 18c8125..653af7e 100644 --- a/src/main/java/com/spring/kafka/controller/MessageSendController.java +++ b/src/main/java/com/spring/kafka/controller/MessageSendController.java @@ -3,7 +3,6 @@ package com.spring.kafka.controller; import com.spring.kafka.domain.model.User; import com.spring.kafka.producer.KafkaMessageProducer; import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -37,4 +36,28 @@ public class MessageSendController { public void sendMessage(@RequestBody User payload) { producer.sendMessage(payload); } + + /* + curl --location --request POST 'http://localhost:8080/send-message-by-spring-cloud' \ + --header 'Content-Type: text/plain' \ + --data-raw 'message' + */ + @PostMapping("/send-message-by-spring-cloud") + public void sendMessageBySpringCloud(@RequestBody String payload) { + producer.sendMessageBySpringCloud(payload); + } + + /* + curl --location --request POST 'http://localhost:8080/send-message-by-spring-cloud-user' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "id": "happydaddy@naver.com", + "name": "happydaddy", + "age": 28 + }' + */ + @PostMapping("/send-message-by-spring-cloud-user") + public void sendMessageBySpringCloud(@RequestBody User payload) { + producer.sendMessageBySpringCloud(payload); + } } diff --git a/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java b/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java index 3968bc0..a024345 100644 --- a/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java +++ b/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java @@ -1,15 +1,18 @@ package com.spring.kafka.producer; import com.spring.kafka.domain.model.User; +import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; import java.util.Properties; @Component +@RequiredArgsConstructor public class KafkaMessageProducer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @@ -18,6 +21,8 @@ public class KafkaMessageProducer { @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; + private final StreamBridge streamBridge; + public void sendMessage(String payload) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -37,4 +42,12 @@ public class KafkaMessageProducer { ProducerRecord message = new ProducerRecord<>("domain-event-user", user); producer.send(message); } + + public void sendMessageBySpringCloud(String payload) { + streamBridge.send("domainEventString-out-0", payload); + } + + public void sendMessageBySpringCloud(User payload) { + streamBridge.send("domainEventModel-out-0", payload); + } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index f773390..aab1e3c 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -1,3 +1,8 @@ +logging: + level: + org.springframework: WARN + com.spring.kafka: DEBUG +# spring-kafka configuration #spring: # kafka: # bootstrap-servers: localhost:9093,localhost:9094,localhost:9095 @@ -12,7 +17,7 @@ spring: kafka: - bootstrap-servers: localhost:9092,localhost:9094,localhost:9095 + bootstrap-servers: localhost:9093,localhost:9094,localhost:9095 consumer: group-id: consumer-group-1 auto-offset-reset: earliest @@ -23,3 +28,31 @@ spring: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + +# spring-cloud-stream configuration + cloud.stream: + bindings: + domainEventString-in-0: + content-type: text/plain + destination: domain-event-string + group: consumer-group-string + domainEventString-out-0: + destination: domain-event-string + group: consumer-group-string + domainEventModel-in-0: + destination: domain-event-model + group: consumer-group-model + domainEventModel-out-0: + destination: domain-event-model + group: consumer-group-model + kafka: + binder: + brokers: localhost:9093,localhost:9094,localhost:9095 + configuration: + auto.offset.reset: earliest + key.deserializer: org.apache.kafka.common.serialization.StringDeserializer + value.deserializer: org.apache.kafka.common.serialization.StringDeserializer + cloud: + stream: + function: + definition: domainEventString;domainEventModel \ No newline at end of file