diff --git a/processor-samples/pom.xml b/processor-samples/pom.xml
index 869de21..bdf072a 100644
--- a/processor-samples/pom.xml
+++ b/processor-samples/pom.xml
@@ -12,7 +12,8 @@
polled-consumer
reactive-processor-kafka
reactive-processor-rabbit
- sensor-average-reactive
+ sensor-average-reactive-kafka
+ sensor-average-reactive-rabbit
streamlistener-basic
uppercase-transformer
diff --git a/processor-samples/reactive-processor-kafka/pom.xml b/processor-samples/reactive-processor-kafka/pom.xml
index 5c61098..cd29d3b 100644
--- a/processor-samples/reactive-processor-kafka/pom.xml
+++ b/processor-samples/reactive-processor-kafka/pom.xml
@@ -3,7 +3,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- reactive-processor
+ reactive-processor-kafka
0.0.1-SNAPSHOT
jar
reactive-processor-kafka
diff --git a/processor-samples/reactive-processor-rabbit/pom.xml b/processor-samples/reactive-processor-rabbit/pom.xml
index dca3876..3397fba 100644
--- a/processor-samples/reactive-processor-rabbit/pom.xml
+++ b/processor-samples/reactive-processor-rabbit/pom.xml
@@ -3,11 +3,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- reactive-processor
+ reactive-processor-rabbit
0.0.1-SNAPSHOT
jar
- reactive-processor-kafka
- Spring Cloud Stream Reactive Processor Kafka
+ reactive-processor-rabbit
+ Spring Cloud Stream Reactive Processor Rabbit
org.springframework.boot
diff --git a/processor-samples/sensor-average-reactive/.jdk8 b/processor-samples/sensor-average-reactive-kafka/.jdk8
similarity index 100%
rename from processor-samples/sensor-average-reactive/.jdk8
rename to processor-samples/sensor-average-reactive-kafka/.jdk8
diff --git a/processor-samples/sensor-average-reactive/.mvn b/processor-samples/sensor-average-reactive-kafka/.mvn
similarity index 100%
rename from processor-samples/sensor-average-reactive/.mvn
rename to processor-samples/sensor-average-reactive-kafka/.mvn
diff --git a/processor-samples/sensor-average-reactive/README.adoc b/processor-samples/sensor-average-reactive-kafka/README.adoc
similarity index 74%
rename from processor-samples/sensor-average-reactive/README.adoc
rename to processor-samples/sensor-average-reactive-kafka/README.adoc
index 1a72489..f9ff72a 100644
--- a/processor-samples/sensor-average-reactive/README.adoc
+++ b/processor-samples/sensor-average-reactive-kafka/README.adoc
@@ -11,10 +11,8 @@ To run this sample, you will need to have installed:
## Running the application
-The following instructions assume that you are running Kafka as a Docker image.
-
* Go to the application root (not the repository root, but this application)
-* `docker-compose up -d`
+* `docker-compose up -d` (Skip this section if you have Kafka running elsewhere)
* `./mvnw clean package`
@@ -43,14 +41,3 @@ Data received: {"id":100200,"average":90.0}
* `docker-compose down`
-## Running the application using Rabbit binder
-
-All the instructions above apply here also, but instead of running the default `docker-compose.yml`, use the command below to start a Rabbitmq cluser.
-
-* `docker-compose -f docker-compose-rabbit.yml up -d`
-
-* `./mvnw clean package -P rabbit-binder`
-
-* `java -jar target/sensor-average-reactive-0.0.1-SNAPSHOT.jar`
-
-Once you are done testing: `docker-compose -f docker-compose-rabbit.yml down`
diff --git a/processor-samples/sensor-average-reactive/docker-compose-rabbit.yml b/processor-samples/sensor-average-reactive-kafka/docker-compose-rabbit.yml
similarity index 100%
rename from processor-samples/sensor-average-reactive/docker-compose-rabbit.yml
rename to processor-samples/sensor-average-reactive-kafka/docker-compose-rabbit.yml
diff --git a/processor-samples/sensor-average-reactive/docker-compose.yml b/processor-samples/sensor-average-reactive-kafka/docker-compose.yml
similarity index 100%
rename from processor-samples/sensor-average-reactive/docker-compose.yml
rename to processor-samples/sensor-average-reactive-kafka/docker-compose.yml
diff --git a/processor-samples/sensor-average-reactive/mvnw b/processor-samples/sensor-average-reactive-kafka/mvnw
similarity index 100%
rename from processor-samples/sensor-average-reactive/mvnw
rename to processor-samples/sensor-average-reactive-kafka/mvnw
diff --git a/processor-samples/sensor-average-reactive/mvnw.cmd b/processor-samples/sensor-average-reactive-kafka/mvnw.cmd
similarity index 100%
rename from processor-samples/sensor-average-reactive/mvnw.cmd
rename to processor-samples/sensor-average-reactive-kafka/mvnw.cmd
diff --git a/processor-samples/sensor-average-reactive-kafka/pom.xml b/processor-samples/sensor-average-reactive-kafka/pom.xml
new file mode 100644
index 0000000..75969f3
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-kafka/pom.xml
@@ -0,0 +1,124 @@
+
+
+ 4.0.0
+
+ sensor-average-reactive-kafka
+ 0.0.1-SNAPSHOT
+ jar
+ sensor-average-reactive-kafka
+ Spring Cloud Stream Reactive Processor for Sensor Average Kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+
+
+ Hoxton.BUILD-SNAPSHOT
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+ spring-releases
+ Spring Releases
+ https://repo.spring.io/libs-release-local
+
+ false
+
+
+
+
+
diff --git a/processor-samples/sensor-average-reactive/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java b/processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
similarity index 71%
rename from processor-samples/sensor-average-reactive/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
rename to processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
index 8fa0e33..2fe8082 100644
--- a/processor-samples/sensor-average-reactive/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
+++ b/processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
@@ -1,41 +1,31 @@
package sample.sensor.average;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.Input;
-import org.springframework.cloud.stream.annotation.Output;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
-import org.springframework.integration.annotation.InboundChannelAdapter;
-import org.springframework.integration.annotation.Poller;
-import org.springframework.integration.core.MessageSource;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.SubscribableChannel;
-import org.springframework.messaging.support.GenericMessage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
-import java.time.Duration;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-
@SpringBootApplication
-@EnableBinding(Processor.class)
public class SensorAverageProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(SensorAverageProcessorApplication.class, args);
}
- @StreamListener
- @Output(Processor.OUTPUT)
- public Flux calculateAverage(@Input(Processor.INPUT) Flux data) {
- return data.window(Duration.ofSeconds(3)).flatMap(
+ @Bean
+ public Function, Flux> calculateAverage() {
+ return data -> data.window(Duration.ofSeconds(3)).flatMap(
window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
}
@@ -165,7 +155,6 @@ public class SensorAverageProcessorApplication {
//Test source will send data to the same destination where the processor receives data
//Test sink will consume data from the same destination where the processor produces data
- @EnableBinding(Source.class)
static class TestSource {
private AtomicBoolean semaphore = new AtomicBoolean(true);
@@ -173,8 +162,7 @@ public class SensorAverageProcessorApplication {
private int[] ids = new int[]{100100, 100200, 100300};
@Bean
- @InboundChannelAdapter(channel = "test-source", poller = @Poller(fixedDelay = "100"))
- public MessageSource sendTestData() {
+ public Supplier sendTestData() {
return () -> {
int id = ids[random.nextInt(3)];
@@ -182,29 +170,18 @@ public class SensorAverageProcessorApplication {
Sensor sensor = new Sensor();
sensor.setId(id);
sensor.setTemperature(temperature);
- return new GenericMessage<>(sensor);
+ return sensor;
};
}
}
- @EnableBinding(Sink.class)
static class TestSink {
private final Log logger = LogFactory.getLog(getClass());
- @StreamListener("test-sink")
- public void receive(String payload) {
- logger.info("Data received: " + payload);
+ @Bean
+ public Consumer receive() {
+ return payload -> logger.info("Data received: " + payload);
}
}
-
- public interface Sink {
- @Input("test-sink")
- SubscribableChannel sampleSink();
- }
-
- public interface Source {
- @Output("test-source")
- MessageChannel sampleSource();
- }
}
diff --git a/processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml b/processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml
new file mode 100644
index 0000000..66e31b5
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml
@@ -0,0 +1,14 @@
+spring:
+ cloud:
+ stream:
+ function:
+ definition: calculateAverage;receive;sendTestData
+ bindings:
+ calculateAverage-out-0:
+ destination: average
+ receive-in-0:
+ destination: average
+ calculateAverage-in-0:
+ destination: sensor
+ sendTestData-out-0:
+ destination: sensor
diff --git a/processor-samples/sensor-average-reactive-rabbit/.jdk8 b/processor-samples/sensor-average-reactive-rabbit/.jdk8
new file mode 100644
index 0000000..e69de29
diff --git a/processor-samples/sensor-average-reactive-rabbit/.mvn b/processor-samples/sensor-average-reactive-rabbit/.mvn
new file mode 120000
index 0000000..d21aa17
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/.mvn
@@ -0,0 +1 @@
+../../.mvn
\ No newline at end of file
diff --git a/processor-samples/sensor-average-reactive-rabbit/README.adoc b/processor-samples/sensor-average-reactive-rabbit/README.adoc
new file mode 100644
index 0000000..103a3f6
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/README.adoc
@@ -0,0 +1,42 @@
+Spring Cloud Stream Reactive Processor Sensor Average Sample
+=============================================================
+
+This is a Spring Cloud Stream reactive processor sample that showcase a sensor temperature average calculator.
+
+## Requirements
+
+To run this sample, you will need to have installed:
+
+* Java 8 or Above
+
+## Running the application
+
+* Go to the application root (not the repository root, but this application)
+* `docker-compose up -d` (Skip this section if you have Rabbitmq running elsewhere)
+
+* `./mvnw clean package`
+
+* `java -jar target/sensor-average-reactive-0.0.1-SNAPSHOT.jar`
+
+The main application contains the reactive processor that receives the sensor data for a duration (3 seconds) and averages them.
+It then sends the average data (per sensor id) through the outbound destination of the processor.
+
+The application also provides a source and sink for testing.
+Test source will generate some sensor data every 100 milliseconds and the test sink will verify that the processor has calculated the average.
+Test source is bound to the same broker destination where the processor is listening from.
+Similarly test sink is bound to the same broker destination where the processor is producing to.
+
+You will see output similar to the following on the console every 3 seconds.
+
+```
+Data received: {"id":100100,"average":89.0}
+Data received: {"id":100200,"average":84.0}
+Data received: {"id":100300,"average":88.0}
+Data received: {"id":100100,"average":85.0}
+Data received: {"id":100200,"average":85.0}
+Data received: {"id":100300,"average":83.0}
+Data received: {"id":100100,"average":80.0}
+Data received: {"id":100200,"average":90.0}
+```
+
+* `docker-compose down`
\ No newline at end of file
diff --git a/processor-samples/sensor-average-reactive-rabbit/docker-compose.yml b/processor-samples/sensor-average-reactive-rabbit/docker-compose.yml
new file mode 100644
index 0000000..7c3da92
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/docker-compose.yml
@@ -0,0 +1,7 @@
+version: '3'
+services:
+ rabbitmq:
+ image: rabbitmq:management
+ ports:
+ - 5672:5672
+ - 15672:15672
\ No newline at end of file
diff --git a/processor-samples/sensor-average-reactive-rabbit/mvnw b/processor-samples/sensor-average-reactive-rabbit/mvnw
new file mode 100755
index 0000000..0ce08e9
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/mvnw
@@ -0,0 +1,226 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven2 Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Migwn, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+ # TODO classpath?
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`which java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+echo $MAVEN_PROJECTBASEDIR
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+"$JAVACMD" \
+ $MAVEN_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
+
diff --git a/processor-samples/sensor-average-reactive-rabbit/mvnw.cmd b/processor-samples/sensor-average-reactive-rabbit/mvnw.cmd
new file mode 100644
index 0000000..7ecd01d
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/mvnw.cmd
@@ -0,0 +1,145 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM https://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven2 Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+set MAVEN_CMD_LINE_ARGS=%*
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+
+set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar""
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS%
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/processor-samples/sensor-average-reactive-rabbit/pom.xml b/processor-samples/sensor-average-reactive-rabbit/pom.xml
new file mode 100644
index 0000000..0d59c67
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/pom.xml
@@ -0,0 +1,119 @@
+
+
+ 4.0.0
+
+ sensor-average-reactive-rabbit
+ 0.0.1-SNAPSHOT
+ jar
+ sensor-average-reactive-rabbit
+ Spring Cloud Stream Reactive Processor for Sensor Average Rabbitmq
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+
+
+ Hoxton.BUILD-SNAPSHOT
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rabbit
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot-local
+
+ true
+
+
+ false
+
+
+
+ spring-milestones
+ Spring Milestones
+ https://repo.spring.io/libs-milestone-local
+
+ false
+
+
+
+ spring-releases
+ Spring Releases
+ https://repo.spring.io/libs-release-local
+
+ false
+
+
+
+
+
diff --git a/processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java b/processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
new file mode 100644
index 0000000..2fe8082
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java
@@ -0,0 +1,187 @@
+package sample.sensor.average;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.GroupedFlux;
+import reactor.core.publisher.Mono;
+
+@SpringBootApplication
+public class SensorAverageProcessorApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(SensorAverageProcessorApplication.class, args);
+ }
+
+ @Bean
+ public Function, Flux> calculateAverage() {
+ return data -> data.window(Duration.ofSeconds(3)).flatMap(
+ window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage));
+ }
+
+ private Mono calculateAverage(GroupedFlux group) {
+ return group
+ .reduce(new Accumulator(0, 0),
+ (a, d) -> new Accumulator(a.getCount() + 1, a.getTotalValue() + d.getTemperature()))
+ .map(accumulator -> new Average(group.key(), (accumulator.getTotalValue()) / accumulator.getCount()));
+ }
+
+ static class Accumulator {
+
+ private int count;
+
+ private int totalValue;
+
+ public Accumulator(int count, int totalValue) {
+ this.count = count;
+ this.totalValue = totalValue;
+ }
+
+ /**
+ * @return the count
+ */
+ public int getCount() {
+ return count;
+ }
+
+ /**
+ * @param count the count to set
+ */
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ /**
+ * @return the totalValue
+ */
+ public int getTotalValue() {
+ return totalValue;
+ }
+
+ /**
+ * @param totalValue the totalValue to set
+ */
+ public void setTotalValue(int totalValue) {
+ this.totalValue = totalValue;
+ }
+ }
+
+ static class Average {
+
+ private int id;
+
+ private double average;
+
+ public Average(int id, double average) {
+ this.id = id;
+ this.average = average;
+ }
+
+ /**
+ * @return the id
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * @param id the id to set
+ */
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the average
+ */
+ public double getAverage() {
+ return average;
+ }
+
+ /**
+ * @param average the average to set
+ */
+ public void setAverage(double average) {
+ this.average = average;
+ }
+ }
+
+ static class Sensor {
+
+ private int id;
+
+ private int temperature;
+
+ /**
+ * @return the id
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * @param id the id to set
+ */
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the temperature
+ */
+ public int getTemperature() {
+ return temperature;
+ }
+
+ /**
+ * @param temperature the temperature to set
+ */
+ public void setTemperature(int temperature) {
+ this.temperature = temperature;
+ }
+ }
+
+ //Following source and sinks are used for testing only.
+ //Test source will send data to the same destination where the processor receives data
+ //Test sink will consume data from the same destination where the processor produces data
+
+ static class TestSource {
+
+ private AtomicBoolean semaphore = new AtomicBoolean(true);
+ private Random random = new Random();
+ private int[] ids = new int[]{100100, 100200, 100300};
+
+ @Bean
+ public Supplier sendTestData() {
+
+ return () -> {
+ int id = ids[random.nextInt(3)];
+ int temperature = random.nextInt((102 - 65) + 1) + 65;
+ Sensor sensor = new Sensor();
+ sensor.setId(id);
+ sensor.setTemperature(temperature);
+ return sensor;
+ };
+ }
+ }
+
+ static class TestSink {
+
+ private final Log logger = LogFactory.getLog(getClass());
+
+ @Bean
+ public Consumer receive() {
+ return payload -> logger.info("Data received: " + payload);
+ }
+ }
+}
diff --git a/processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml b/processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml
new file mode 100644
index 0000000..66e31b5
--- /dev/null
+++ b/processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml
@@ -0,0 +1,14 @@
+spring:
+ cloud:
+ stream:
+ function:
+ definition: calculateAverage;receive;sendTestData
+ bindings:
+ calculateAverage-out-0:
+ destination: average
+ receive-in-0:
+ destination: average
+ calculateAverage-in-0:
+ destination: sensor
+ sendTestData-out-0:
+ destination: sensor
diff --git a/processor-samples/sensor-average-reactive/pom.xml b/processor-samples/sensor-average-reactive/pom.xml
deleted file mode 100644
index b06c488..0000000
--- a/processor-samples/sensor-average-reactive/pom.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-
-
- 4.0.0
-
- sensor-average-reactive
- 0.0.1-SNAPSHOT
- jar
- sensor-average-reactive
- Spring Cloud Stream Reactive Processor for Sensor Average
-
-
- io.spring.cloud.stream.sample
- spring-cloud-stream-samples-parent
- 0.0.1-SNAPSHOT
- ../..
-
-
-
-
- org.springframework.cloud
- spring-cloud-stream-reactive
-
-
- org.springframework.boot
- spring-boot-starter-test
- test
-
-
-
-
-
- kafka-binder
-
- true
-
-
-
- org.springframework.cloud
- spring-cloud-stream-binder-kafka
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
- kafka
-
-
-
-
-
-
- rabbit-binder
-
-
- org.springframework.cloud
- spring-cloud-stream-binder-rabbit
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
- rabbit
-
-
-
-
-
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
-
-
-
-
diff --git a/processor-samples/sensor-average-reactive/src/main/resources/application.yml b/processor-samples/sensor-average-reactive/src/main/resources/application.yml
deleted file mode 100644
index bce2b3a..0000000
--- a/processor-samples/sensor-average-reactive/src/main/resources/application.yml
+++ /dev/null
@@ -1,12 +0,0 @@
-spring:
- cloud:
- stream:
- bindings:
- output:
- destination: average
- test-sink:
- destination: average
- input:
- destination: sensor
- test-source:
- destination: sensor