diff --git a/rxjava-processor/pom.xml b/rxjava-processor/pom.xml index 3a98b14..bfa7131 100644 --- a/rxjava-processor/pom.xml +++ b/rxjava-processor/pom.xml @@ -29,6 +29,20 @@ org.springframework.cloud spring-cloud-stream-binder-rabbit + + org.springframework.cloud + spring-cloud-stream-reactive + + + io.projectreactor + reactor-core + 3.0.1.BUILD-SNAPSHOT + + + io.reactivex + rxjava + 1.1.10 + org.springframework.boot spring-boot-starter-redis diff --git a/rxjava-processor/src/main/java/demo/RxJavaTransformer.java b/rxjava-processor/src/main/java/demo/RxJavaTransformer.java index 8ed8146..e39633e 100644 --- a/rxjava-processor/src/main/java/demo/RxJavaTransformer.java +++ b/rxjava-processor/src/main/java/demo/RxJavaTransformer.java @@ -20,19 +20,22 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; -import org.springframework.cloud.stream.annotation.rxjava.EnableRxJavaProcessor; -import org.springframework.cloud.stream.annotation.rxjava.RxJavaProcessor; -import org.springframework.context.annotation.Bean; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.messaging.handler.annotation.SendTo; -@EnableRxJavaProcessor +@EnableBinding(Processor.class) public class RxJavaTransformer { private static Logger logger = LoggerFactory.getLogger(RxJavaTransformer.class); - @Bean - public RxJavaProcessor processor() { - return inputStream -> inputStream.map(data -> { + @StreamListener(Processor.INPUT) + @SendTo(Processor.OUTPUT) + public Observable processor(Observable inputStream) { + return inputStream.map(data -> { logger.info("Got data = " + data); return data; }).buffer(5).map(data -> String.valueOf(avg(data)));