diff --git a/spring-cloud-stream-kafka-communication/pom.xml b/spring-cloud-stream-kafka-communication/pom.xml
new file mode 100644
index 0000000..874172f
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/pom.xml
@@ -0,0 +1,65 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.6.1
+
+
+
+ com.amrut.prabhu
+ spring-cloud-stream-kafka-communication
+ 0.0.1-SNAPSHOT
+ spring-cloud-stream-kafka-communication
+ Kafka communication with spring cloud Streams
+
+ 17
+ 2021.0.0
+
+
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+ test
+ test-binder
+ test-jar
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java
new file mode 100644
index 0000000..c65abd6
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java
@@ -0,0 +1,27 @@
+package com.amrut.prabhu;
+
+import com.amrut.prabhu.dto.Message;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+@SpringBootApplication
+public class SpringCloudStreamKafkaApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
+ }
+
+ @Bean
+ public Consumer consumer() {
+ return message -> System.out.println("received " + message);
+ }
+
+ @Bean
+ public Supplier producer() {
+ return () -> new Message(" jack from Streams");
+ }
+}
diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java
new file mode 100755
index 0000000..5ce2b81
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/Message.java
@@ -0,0 +1,4 @@
+package com.amrut.prabhu.dto;
+
+public record Message(String name) {
+}
diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java
new file mode 100644
index 0000000..157ef31
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java
@@ -0,0 +1,23 @@
+package com.amrut.prabhu.dto.coverters;
+
+import com.amrut.prabhu.dto.Message;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.IOException;
+
+public class MessageDeSerializer implements Deserializer {
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public Message deserialize(String topic, byte[] data) {
+ try {
+
+ return objectMapper.readValue(new String(data), Message.class);
+ } catch (IOException e) {
+ throw new SerializationException(e);
+ }
+ }
+}
diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java
new file mode 100644
index 0000000..382548c
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageSerializer.java
@@ -0,0 +1,22 @@
+package com.amrut.prabhu.dto.coverters;
+
+import com.amrut.prabhu.dto.Message;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class MessageSerializer implements Serializer {
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public byte[] serialize(String topic, Message data) {
+ try {
+ return objectMapper.writeValueAsBytes(data);
+ } catch (JsonProcessingException e) {
+ throw new SerializationException(e);
+ }
+
+ }
+}
diff --git a/spring-cloud-stream-kafka-communication/src/main/resources/application.yml b/spring-cloud-stream-kafka-communication/src/main/resources/application.yml
new file mode 100644
index 0000000..561edb8
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/src/main/resources/application.yml
@@ -0,0 +1,27 @@
+spring:
+ cloud:
+ function:
+ definition: consumer;producer
+ stream:
+ kafka:
+ bindings:
+ producer-out-0:
+ producer:
+ configuration:
+ value.serializer: com.amrut.prabhu.dto.coverters.MessageSerializer
+ consumer-in-0:
+ consumer:
+ configuration:
+ value.deserializer: com.amrut.prabhu.dto.coverters.MessageDeSerializer
+ binder:
+ brokers: localhost:9092
+
+ bindings:
+ producer-out-0:
+ destination : first-topic
+ producer:
+ useNativeEncoding: true # This enables using the custom serializer
+ consumer-in-0:
+ destination : first-topic
+ consumer:
+ use-native-decoding: true # This enables using the custom deserializer
diff --git a/spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java b/spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java
new file mode 100644
index 0000000..5ae18c4
--- /dev/null
+++ b/spring-cloud-stream-kafka-communication/src/test/java/com/amrut/prabhu/SpringCloudStreamKafkaApplicationTests.java
@@ -0,0 +1,13 @@
+package com.amrut.prabhu;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class SpringCloudStreamKafkaApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}