diff --git a/build.gradle b/build.gradle index f4cabee..784e27e 100644 --- a/build.gradle +++ b/build.gradle @@ -21,6 +21,10 @@ dependencies { implementation 'org.springframework.cloud:spring-cloud-stream' implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka' implementation 'org.springframework.kafka:spring-kafka' + implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' + + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' } diff --git a/src/main/java/com/spring/kafka/consumer/SpringKafkaConsumerApplication.java b/src/main/java/com/spring/kafka/SpringKafkaConsumerApplication.java similarity index 90% rename from src/main/java/com/spring/kafka/consumer/SpringKafkaConsumerApplication.java rename to src/main/java/com/spring/kafka/SpringKafkaConsumerApplication.java index 2e7beda..28cc282 100644 --- a/src/main/java/com/spring/kafka/consumer/SpringKafkaConsumerApplication.java +++ b/src/main/java/com/spring/kafka/SpringKafkaConsumerApplication.java @@ -1,4 +1,4 @@ -package com.spring.kafka.consumer; +package com.spring.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java b/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java new file mode 100644 index 0000000..ffe1f6b --- /dev/null +++ b/src/main/java/com/spring/kafka/consumer/KafkaMessageConsumer.java @@ -0,0 +1,23 @@ +package com.spring.kafka.consumer; + +import com.spring.kafka.domain.model.User; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Headers; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +@Component +public class KafkaMessageConsumer { + @KafkaListener(topics = "domain-event") + public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) { + System.out.println("Received Headers : "+headers); + System.out.println("Received Payloads : "+message); + } + + @KafkaListener(topics = "domain-event-user") + public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { + System.out.println("Received Headers : "+headers); + System.out.println("Received Payloads : "+user.toString()); + } +} diff --git a/src/main/java/com/spring/kafka/controller/MessageSendController.java b/src/main/java/com/spring/kafka/controller/MessageSendController.java new file mode 100644 index 0000000..18c8125 --- /dev/null +++ b/src/main/java/com/spring/kafka/controller/MessageSendController.java @@ -0,0 +1,40 @@ +package com.spring.kafka.controller; + +import com.spring.kafka.domain.model.User; +import com.spring.kafka.producer.KafkaMessageProducer; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequiredArgsConstructor +public class MessageSendController { + + private final KafkaMessageProducer producer; + + /* + curl --location --request POST 'http://localhost:8080/send-message' \ + --header 'Content-Type: text/plain' \ + --data-raw 'message' + */ + @PostMapping("/send-message") + public void sendMessage(@RequestBody String payload) { + producer.sendMessage(payload); + } + + /* + curl --location --request POST 'http://localhost:8080/send-message' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "id": "happydaddy@naver.com", + "name": "happydaddy", + "age": 28 + }' + */ + @PostMapping("/send-message-user") + public void sendMessage(@RequestBody User payload) { + producer.sendMessage(payload); + } +} diff --git a/src/main/java/com/spring/kafka/domain/model/User.java b/src/main/java/com/spring/kafka/domain/model/User.java new file mode 100644 index 0000000..866272b --- /dev/null +++ b/src/main/java/com/spring/kafka/domain/model/User.java @@ -0,0 +1,14 @@ +package com.spring.kafka.domain.model; + +import lombok.*; + +@Getter +@Builder +@ToString +@AllArgsConstructor +@NoArgsConstructor +public class User { + private String id; + private String name; + private int age; +} diff --git a/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java b/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java new file mode 100644 index 0000000..3968bc0 --- /dev/null +++ b/src/main/java/com/spring/kafka/producer/KafkaMessageProducer.java @@ -0,0 +1,40 @@ +package com.spring.kafka.producer; + +import com.spring.kafka.domain.model.User; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.util.Properties; + +@Component +public class KafkaMessageProducer { + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + @Value("${spring.kafka.producer.key-serializer}") + private String keySerializer; + @Value("${spring.kafka.producer.value-serializer}") + private String valueSerializer; + + public void sendMessage(String payload) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + KafkaProducer producer = new KafkaProducer<>(properties); + ProducerRecord message = new ProducerRecord<>("domain-event", payload); + producer.send(message); + } + + public void sendMessage(User user) { + Properties properties = new Properties(); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); + properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + KafkaProducer producer = new KafkaProducer<>(properties); + ProducerRecord message = new ProducerRecord<>("domain-event-user", user); + producer.send(message); + } +} diff --git a/src/main/resources/applicaion.yaml b/src/main/resources/applicaion.yaml deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml new file mode 100644 index 0000000..f773390 --- /dev/null +++ b/src/main/resources/application.yaml @@ -0,0 +1,25 @@ +#spring: +# kafka: +# bootstrap-servers: localhost:9093,localhost:9094,localhost:9095 +# consumer: +# group-id: consumer-group-1 +# auto-offset-reset: earliest +# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer +# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer +# producer: +# key-serializer: org.apache.kafka.common.serialization.StringSerializer +# value-serializer: org.apache.kafka.common.serialization.StringSerializer + +spring: + kafka: + bootstrap-servers: localhost:9092,localhost:9094,localhost:9095 + consumer: + group-id: consumer-group-1 + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: com.spring.kafka.domain.model + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer diff --git a/src/test/java/com/spring/kafka/consumer/SpringKafkaConsumerApplicationTests.java b/src/test/java/com/spring/kafka/SpringKafkaConsumerApplicationTests.java similarity index 85% rename from src/test/java/com/spring/kafka/consumer/SpringKafkaConsumerApplicationTests.java rename to src/test/java/com/spring/kafka/SpringKafkaConsumerApplicationTests.java index 67c434e..c3fb4df 100644 --- a/src/test/java/com/spring/kafka/consumer/SpringKafkaConsumerApplicationTests.java +++ b/src/test/java/com/spring/kafka/SpringKafkaConsumerApplicationTests.java @@ -1,4 +1,4 @@ -package com.spring.kafka.consumer; +package com.spring.kafka; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;