From 43cc49e87cd856bdf035e7dc488706be61388e97 Mon Sep 17 00:00:00 2001 From: amrutprabhu Date: Sun, 5 Dec 2021 16:48:17 +0100 Subject: [PATCH] Adding kafka communication using spring cloud streams kafka --- .../pom.xml | 65 +++++++++++++++++++ .../SpringCloudStreamKafkaApplication.java | 27 ++++++++ .../java/com/amrut/prabhu/dto/Message.java | 4 ++ .../dto/coverters/MessageDeSerializer.java | 23 +++++++ .../dto/coverters/MessageSerializer.java | 22 +++++++ .../src/main/resources/application.yml | 27 ++++++++ ...pringCloudStreamKafkaApplicationTests.java | 13 ++++ 7 files changed, 181 insertions(+) create mode 100644 spring-cloud-stream-kafka-communication/pom.xml create mode 100644 spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java create mode 100755 spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java create mode 100644 spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java create mode 100644 spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java create mode 100644 spring-cloud-stream-kafka-communication/src/main/resources/application.yml create mode 100644 spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java diff --git a/spring-cloud-stream-kafka-communication/pom.xml b/spring-cloud-stream-kafka-communication/pom.xml new file mode 100644 index 0000000..874172f --- /dev/null +++ b/spring-cloud-stream-kafka-communication/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.6.1 + + + + com.amrut.prabhu + spring-cloud-stream-kafka-communication + 0.0.1-SNAPSHOT + spring-cloud-stream-kafka-communication + Kafka communication with spring cloud Streams + + 17 + 2021.0.0 + + + + + org.apache.kafka + kafka-streams + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-stream + test + test-binder + test-jar + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java new file mode 100644 index 0000000..c65abd6 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java @@ -0,0 +1,27 @@ +package com.amrut.prabhu; + +import com.amrut.prabhu.dto.Message; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +@SpringBootApplication +public class SpringCloudStreamKafkaApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringCloudStreamKafkaApplication.class, args); + } + + @Bean + public Consumer consumer() { + return message -> System.out.println("received " + message); + } + + @Bean + public Supplier producer() { + return () -> new Message(" jack from Streams"); + } +} diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java new file mode 100755 index 0000000..5ce2b81 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java @@ -0,0 +1,4 @@ +package com.amrut.prabhu.dto; + +public record Message(String name) { +} diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java new file mode 100644 index 0000000..157ef31 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java @@ -0,0 +1,23 @@ +package com.amrut.prabhu.dto.coverters; + +import com.amrut.prabhu.dto.Message; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.IOException; + +public class MessageDeSerializer implements Deserializer { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public Message deserialize(String topic, byte[] data) { + try { + + return objectMapper.readValue(new String(data), Message.class); + } catch (IOException e) { + throw new SerializationException(e); + } + } +} diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java new file mode 100644 index 0000000..382548c --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java @@ -0,0 +1,22 @@ +package com.amrut.prabhu.dto.coverters; + +import com.amrut.prabhu.dto.Message; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +public class MessageSerializer implements Serializer { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public byte[] serialize(String topic, Message data) { + try { + return objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new SerializationException(e); + } + + } +} diff --git a/spring-cloud-stream-kafka-communication/src/main/resources/application.yml b/spring-cloud-stream-kafka-communication/src/main/resources/application.yml new file mode 100644 index 0000000..561edb8 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/resources/application.yml @@ -0,0 +1,27 @@ +spring: + cloud: + function: + definition: consumer;producer + stream: + kafka: + bindings: + producer-out-0: + producer: + configuration: + value.serializer: com.amrut.prabhu.dto.coverters.MessageSerializer + consumer-in-0: + consumer: + configuration: + value.deserializer: com.amrut.prabhu.dto.coverters.MessageDeSerializer + binder: + brokers: localhost:9092 + + bindings: + producer-out-0: + destination : first-topic + producer: + useNativeEncoding: true # This enables using the custom serializer + consumer-in-0: + destination : first-topic + consumer: + use-native-decoding: true # This enables using the custom deserializer diff --git a/spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java b/spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java new file mode 100644 index 0000000..5ae18c4 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java @@ -0,0 +1,13 @@ +package com.amrut.prabhu; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class SpringCloudStreamKafkaApplicationTests { + + @Test + void contextLoads() { + } + +}