diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java new file mode 100644 index 0000000..a330547 --- /dev/null +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/KafkaProducer.java @@ -0,0 +1,19 @@ +package com.amrut.prabhu; + +import com.amrut.prabhu.dto.Message; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +public class KafkaProducer { + + @Autowired + private StreamBridge streamBridge; + + @Scheduled(cron = "*/2 * * * * *") + public void sendMessage(){ + streamBridge.send("producer-out-0",new Message(" jack from Stream bridge")); + } +} diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java index c65abd6..c9398a3 100644 --- a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/SpringCloudStreamKafkaApplication.java @@ -4,11 +4,13 @@ import com.amrut.prabhu.dto.Message; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; import java.util.function.Consumer; import java.util.function.Supplier; @SpringBootApplication +@EnableScheduling public class SpringCloudStreamKafkaApplication { public static void main(String[] args) { diff --git a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java index 157ef31..f7847a5 100644 --- a/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java +++ b/spring-cloud-stream-kafka-communication/src/main/java/com/amrut/prabhu/dto/coverters/MessageDeSerializer.java @@ -14,7 +14,6 @@ public class MessageDeSerializer implements Deserializer { @Override public Message deserialize(String topic, byte[] data) { try { - return objectMapper.readValue(new String(data), Message.class); } catch (IOException e) { throw new SerializationException(e);