[Update] spring cloud stream kafka sample

This commit is contained in:
abel
2021-11-16 00:52:48 +09:00
parent 1b1fbced36
commit 67d9955fe2
6 changed files with 105 additions and 8 deletions

View File

@@ -1,5 +1,5 @@
plugins {
id 'org.springframework.boot' version '2.5.6'
id 'org.springframework.boot' version '2.4.1'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
@@ -13,7 +13,7 @@ repositories {
}
ext {
set('springCloudVersion', "2020.0.4")
set('springCloudVersion', "Hoxton.SR9")
}
dependencies {

View File

@@ -1,23 +1,25 @@
package com.spring.kafka.consumer;
import com.spring.kafka.domain.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "domain-event")
public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
System.out.println("Received Headers : "+headers);
System.out.println("Received Payloads : "+message);
log.debug("Received Headers : " + headers);
log.debug("Received Payloads : " + message);
}
@KafkaListener(topics = "domain-event-user")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
System.out.println("Received Headers : "+headers);
System.out.println("Received Payloads : "+user.toString());
log.debug("Received Headers : " + headers);
log.debug("Received Payloads : " + user.toString());
}
}

View File

@@ -0,0 +1,26 @@
package com.spring.kafka.consumer;
import com.spring.kafka.domain.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
@Bean
Consumer<String> domainEventString() {
return input -> {
System.out.println("Received Message : " + input);
};
}
@Bean
Consumer<User> domainEventModel() {
return input -> {
System.out.println("Received Message : " + input);
};
}
}

View File

@@ -3,7 +3,6 @@ package com.spring.kafka.controller;
import com.spring.kafka.domain.model.User;
import com.spring.kafka.producer.KafkaMessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@@ -37,4 +36,28 @@ public class MessageSendController {
public void sendMessage(@RequestBody User payload) {
producer.sendMessage(payload);
}
/*
curl --location --request POST 'http://localhost:8080/send-message-by-spring-cloud' \
--header 'Content-Type: text/plain' \
--data-raw 'message'
*/
@PostMapping("/send-message-by-spring-cloud")
public void sendMessageBySpringCloud(@RequestBody String payload) {
producer.sendMessageBySpringCloud(payload);
}
/*
curl --location --request POST 'http://localhost:8080/send-message-by-spring-cloud-user' \
--header 'Content-Type: application/json' \
--data-raw '{
"id": "happydaddy@naver.com",
"name": "happydaddy",
"age": 28
}'
*/
@PostMapping("/send-message-by-spring-cloud-user")
public void sendMessageBySpringCloud(@RequestBody User payload) {
producer.sendMessageBySpringCloud(payload);
}
}

View File

@@ -1,15 +1,18 @@
package com.spring.kafka.producer;
import com.spring.kafka.domain.model.User;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@@ -18,6 +21,8 @@ public class KafkaMessageProducer {
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
private final StreamBridge streamBridge;
public void sendMessage(String payload) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -37,4 +42,12 @@ public class KafkaMessageProducer {
ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user);
producer.send(message);
}
public void sendMessageBySpringCloud(String payload) {
streamBridge.send("domainEventString-out-0", payload);
}
public void sendMessageBySpringCloud(User payload) {
streamBridge.send("domainEventModel-out-0", payload);
}
}

View File

@@ -1,3 +1,8 @@
logging:
level:
org.springframework: WARN
com.spring.kafka: DEBUG
# spring-kafka configuration
#spring:
# kafka:
# bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
@@ -12,7 +17,7 @@
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9094,localhost:9095
bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
consumer:
group-id: consumer-group-1
auto-offset-reset: earliest
@@ -23,3 +28,31 @@ spring:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# spring-cloud-stream configuration
cloud.stream:
bindings:
domainEventString-in-0:
content-type: text/plain
destination: domain-event-string
group: consumer-group-string
domainEventString-out-0:
destination: domain-event-string
group: consumer-group-string
domainEventModel-in-0:
destination: domain-event-model
group: consumer-group-model
domainEventModel-out-0:
destination: domain-event-model
group: consumer-group-model
kafka:
binder:
brokers: localhost:9093,localhost:9094,localhost:9095
configuration:
auto.offset.reset: earliest
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
cloud:
stream:
function:
definition: domainEventString;domainEventModel