[Update] spring-kafka sample

This commit is contained in:
abel
2021-11-15 17:39:32 +09:00
parent 47d63878ee
commit 1b1fbced36
9 changed files with 148 additions and 2 deletions

View File

@@ -21,6 +21,10 @@ dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream' implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka' implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test' testImplementation 'org.springframework.kafka:spring-kafka-test'
} }

View File

@@ -1,4 +1,4 @@
package com.spring.kafka.consumer; package com.spring.kafka;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@@ -0,0 +1,23 @@
package com.spring.kafka.consumer;
import com.spring.kafka.domain.model.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "domain-event")
public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
System.out.println("Received Headers : "+headers);
System.out.println("Received Payloads : "+message);
}
@KafkaListener(topics = "domain-event-user")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
System.out.println("Received Headers : "+headers);
System.out.println("Received Payloads : "+user.toString());
}
}

View File

@@ -0,0 +1,40 @@
package com.spring.kafka.controller;
import com.spring.kafka.domain.model.User;
import com.spring.kafka.producer.KafkaMessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class MessageSendController {
private final KafkaMessageProducer producer;
/*
curl --location --request POST 'http://localhost:8080/send-message' \
--header 'Content-Type: text/plain' \
--data-raw 'message'
*/
@PostMapping("/send-message")
public void sendMessage(@RequestBody String payload) {
producer.sendMessage(payload);
}
/*
curl --location --request POST 'http://localhost:8080/send-message' \
--header 'Content-Type: application/json' \
--data-raw '{
"id": "happydaddy@naver.com",
"name": "happydaddy",
"age": 28
}'
*/
@PostMapping("/send-message-user")
public void sendMessage(@RequestBody User payload) {
producer.sendMessage(payload);
}
}

View File

@@ -0,0 +1,14 @@
package com.spring.kafka.domain.model;
import lombok.*;
@Getter
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String id;
private String name;
private int age;
}

View File

@@ -0,0 +1,40 @@
package com.spring.kafka.producer;
import com.spring.kafka.domain.model.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
public class KafkaMessageProducer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
public void sendMessage(String payload) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload);
producer.send(message);
}
public void sendMessage(User user) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user);
producer.send(message);
}
}

View File

@@ -0,0 +1,25 @@
#spring:
# kafka:
# bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
# consumer:
# group-id: consumer-group-1
# auto-offset-reset: earliest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9094,localhost:9095
consumer:
group-id: consumer-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.spring.kafka.domain.model
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

View File

@@ -1,4 +1,4 @@
package com.spring.kafka.consumer; package com.spring.kafka;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;