From 29cbea478bf68f71365ca7833e54ad44e2a09c09 Mon Sep 17 00:00:00 2001 From: amrutprabhu Date: Sat, 11 Dec 2021 12:29:24 +0100 Subject: [PATCH] Adding on demand producer to send messages. --- .../java/com/amrut/prabhu/KafkaProducer.java | 19 +++++++++++++++++++ .../SpringCloudStreamKafkaApplication.java | 2 ++ .../dto/coverters/MessageDeSerializer.java | 1 - 3 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java new file mode 100644 index 0000000..a330547 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java @@ -0,0 +1,19 @@ +package com.amrut.prabhu; + +import com.amrut.prabhu.dto.Message; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +public class KafkaProducer { + + @Autowired + private StreamBridge streamBridge; + + @Scheduled(cron = "*/2 * * * * *") + public void sendMessage(){ + streamBridge.send("producer-out-0",new Message(" jack from Stream bridge")); + } +} diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java index c65abd6..c9398a3 100644 --- a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java @@ -4,11 +4,13 @@ import com.amrut.prabhu.dto.Message; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; import java.util.function.Consumer; import java.util.function.Supplier; @SpringBootApplication +@EnableScheduling public class SpringCloudStreamKafkaApplication { public static void main(String[] args) { diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java index 157ef31..f7847a5 100644 --- a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java @@ -14,7 +14,6 @@ public class MessageDeSerializer implements Deserializer { @Override public Message deserialize(String topic, byte[] data) { try { - return objectMapper.readValue(new String(data), Message.class); } catch (IOException e) { throw new SerializationException(e);