From 175018af5ebee65277b63a95be21c180ca2119ab Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 29 Oct 2019 11:24:02 -0400 Subject: [PATCH] Processor samples refactoring --- processor-samples/pom.xml | 3 +- .../reactive-processor-kafka/pom.xml | 2 +- .../reactive-processor-rabbit/pom.xml | 6 +- .../.jdk8 | 0 .../.mvn | 0 .../README.adoc | 15 +- .../docker-compose-rabbit.yml | 0 .../docker-compose.yml | 0 .../mvnw | 0 .../mvnw.cmd | 0 .../sensor-average-reactive-kafka/pom.xml | 124 ++++++++++ .../SensorAverageProcessorApplication.java | 53 ++-- .../src/main/resources/application.yml | 14 ++ .../sensor-average-reactive-rabbit/.jdk8 | 0 .../sensor-average-reactive-rabbit/.mvn | 1 + .../README.adoc | 42 ++++ .../docker-compose.yml | 7 + .../sensor-average-reactive-rabbit/mvnw | 226 ++++++++++++++++++ .../sensor-average-reactive-rabbit/mvnw.cmd | 145 +++++++++++ .../sensor-average-reactive-rabbit/pom.xml | 119 +++++++++ .../SensorAverageProcessorApplication.java | 187 +++++++++++++++ .../src/main/resources/application.yml | 14 ++ .../sensor-average-reactive/pom.xml | 86 ------- .../src/main/resources/application.yml | 12 - 24 files changed, 901 insertions(+), 155 deletions(-) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/.jdk8 (100%) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/.mvn (100%) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/README.adoc (74%) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/docker-compose-rabbit.yml (100%) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/docker-compose.yml (100%) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/mvnw (100%) rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/mvnw.cmd (100%) create mode 100644 processor-samples/sensor-average-reactive-kafka/pom.xml rename processor-samples/{sensor-average-reactive => sensor-average-reactive-kafka}/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java (71%) create mode 100644 processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml create mode 100644 processor-samples/sensor-average-reactive-rabbit/.jdk8 create mode 120000 processor-samples/sensor-average-reactive-rabbit/.mvn create mode 100644 processor-samples/sensor-average-reactive-rabbit/README.adoc create mode 100644 processor-samples/sensor-average-reactive-rabbit/docker-compose.yml create mode 100755 processor-samples/sensor-average-reactive-rabbit/mvnw create mode 100644 processor-samples/sensor-average-reactive-rabbit/mvnw.cmd create mode 100644 processor-samples/sensor-average-reactive-rabbit/pom.xml create mode 100644 processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java create mode 100644 processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml delete mode 100644 processor-samples/sensor-average-reactive/pom.xml delete mode 100644 processor-samples/sensor-average-reactive/src/main/resources/application.yml diff --git a/processor-samples/pom.xml b/processor-samples/pom.xml index 869de21..bdf072a 100644 --- a/processor-samples/pom.xml +++ b/processor-samples/pom.xml @@ -12,7 +12,8 @@ polled-consumer reactive-processor-kafka reactive-processor-rabbit - sensor-average-reactive + sensor-average-reactive-kafka + sensor-average-reactive-rabbit streamlistener-basic uppercase-transformer diff --git a/processor-samples/reactive-processor-kafka/pom.xml b/processor-samples/reactive-processor-kafka/pom.xml index 5c61098..cd29d3b 100644 --- a/processor-samples/reactive-processor-kafka/pom.xml +++ b/processor-samples/reactive-processor-kafka/pom.xml @@ -3,7 +3,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - reactive-processor + reactive-processor-kafka 0.0.1-SNAPSHOT jar reactive-processor-kafka diff --git a/processor-samples/reactive-processor-rabbit/pom.xml b/processor-samples/reactive-processor-rabbit/pom.xml index dca3876..3397fba 100644 --- a/processor-samples/reactive-processor-rabbit/pom.xml +++ b/processor-samples/reactive-processor-rabbit/pom.xml @@ -3,11 +3,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - reactive-processor + reactive-processor-rabbit 0.0.1-SNAPSHOT jar - reactive-processor-kafka - Spring Cloud Stream Reactive Processor Kafka + reactive-processor-rabbit + Spring Cloud Stream Reactive Processor Rabbit org.springframework.boot diff --git a/processor-samples/sensor-average-reactive/.jdk8 b/processor-samples/sensor-average-reactive-kafka/.jdk8 similarity index 100% rename from processor-samples/sensor-average-reactive/.jdk8 rename to processor-samples/sensor-average-reactive-kafka/.jdk8 diff --git a/processor-samples/sensor-average-reactive/.mvn b/processor-samples/sensor-average-reactive-kafka/.mvn similarity index 100% rename from processor-samples/sensor-average-reactive/.mvn rename to processor-samples/sensor-average-reactive-kafka/.mvn diff --git a/processor-samples/sensor-average-reactive/README.adoc b/processor-samples/sensor-average-reactive-kafka/README.adoc similarity index 74% rename from processor-samples/sensor-average-reactive/README.adoc rename to processor-samples/sensor-average-reactive-kafka/README.adoc index 1a72489..f9ff72a 100644 --- a/processor-samples/sensor-average-reactive/README.adoc +++ b/processor-samples/sensor-average-reactive-kafka/README.adoc @@ -11,10 +11,8 @@ To run this sample, you will need to have installed: ## Running the application -The following instructions assume that you are running Kafka as a Docker image. - * Go to the application root (not the repository root, but this application) -* `docker-compose up -d` +* `docker-compose up -d` (Skip this section if you have Kafka running elsewhere) * `./mvnw clean package` @@ -43,14 +41,3 @@ Data received: {"id":100200,"average":90.0} * `docker-compose down` -## Running the application using Rabbit binder - -All the instructions above apply here also, but instead of running the default `docker-compose.yml`, use the command below to start a Rabbitmq cluser. - -* `docker-compose -f docker-compose-rabbit.yml up -d` - -* `./mvnw clean package -P rabbit-binder` - -* `java -jar target/sensor-average-reactive-0.0.1-SNAPSHOT.jar` - -Once you are done testing: `docker-compose -f docker-compose-rabbit.yml down` diff --git a/processor-samples/sensor-average-reactive/docker-compose-rabbit.yml b/processor-samples/sensor-average-reactive-kafka/docker-compose-rabbit.yml similarity index 100% rename from processor-samples/sensor-average-reactive/docker-compose-rabbit.yml rename to processor-samples/sensor-average-reactive-kafka/docker-compose-rabbit.yml diff --git a/processor-samples/sensor-average-reactive/docker-compose.yml b/processor-samples/sensor-average-reactive-kafka/docker-compose.yml similarity index 100% rename from processor-samples/sensor-average-reactive/docker-compose.yml rename to processor-samples/sensor-average-reactive-kafka/docker-compose.yml diff --git a/processor-samples/sensor-average-reactive/mvnw b/processor-samples/sensor-average-reactive-kafka/mvnw similarity index 100% rename from processor-samples/sensor-average-reactive/mvnw rename to processor-samples/sensor-average-reactive-kafka/mvnw diff --git a/processor-samples/sensor-average-reactive/mvnw.cmd b/processor-samples/sensor-average-reactive-kafka/mvnw.cmd similarity index 100% rename from processor-samples/sensor-average-reactive/mvnw.cmd rename to processor-samples/sensor-average-reactive-kafka/mvnw.cmd diff --git a/processor-samples/sensor-average-reactive-kafka/pom.xml b/processor-samples/sensor-average-reactive-kafka/pom.xml new file mode 100644 index 0000000..75969f3 --- /dev/null +++ b/processor-samples/sensor-average-reactive-kafka/pom.xml @@ -0,0 +1,124 @@ + + + 4.0.0 + + sensor-average-reactive-kafka + 0.0.1-SNAPSHOT + jar + sensor-average-reactive-kafka + Spring Cloud Stream Reactive Processor for Sensor Average Kafka + + + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + + + Hoxton.BUILD-SNAPSHOT + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + + diff --git a/processor-samples/sensor-average-reactive/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java b/processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java similarity index 71% rename from processor-samples/sensor-average-reactive/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java rename to processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java index 8fa0e33..2fe8082 100644 --- a/processor-samples/sensor-average-reactive/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java +++ b/processor-samples/sensor-average-reactive-kafka/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java @@ -1,41 +1,31 @@ package sample.sensor.average; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Processor; import org.springframework.context.annotation.Bean; -import org.springframework.integration.annotation.InboundChannelAdapter; -import org.springframework.integration.annotation.Poller; -import org.springframework.integration.core.MessageSource; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; -import org.springframework.messaging.support.GenericMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.Mono; -import java.time.Duration; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; - @SpringBootApplication -@EnableBinding(Processor.class) public class SensorAverageProcessorApplication { public static void main(String[] args) { SpringApplication.run(SensorAverageProcessorApplication.class, args); } - @StreamListener - @Output(Processor.OUTPUT) - public Flux calculateAverage(@Input(Processor.INPUT) Flux data) { - return data.window(Duration.ofSeconds(3)).flatMap( + @Bean + public Function, Flux> calculateAverage() { + return data -> data.window(Duration.ofSeconds(3)).flatMap( window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage)); } @@ -165,7 +155,6 @@ public class SensorAverageProcessorApplication { //Test source will send data to the same destination where the processor receives data //Test sink will consume data from the same destination where the processor produces data - @EnableBinding(Source.class) static class TestSource { private AtomicBoolean semaphore = new AtomicBoolean(true); @@ -173,8 +162,7 @@ public class SensorAverageProcessorApplication { private int[] ids = new int[]{100100, 100200, 100300}; @Bean - @InboundChannelAdapter(channel = "test-source", poller = @Poller(fixedDelay = "100")) - public MessageSource sendTestData() { + public Supplier sendTestData() { return () -> { int id = ids[random.nextInt(3)]; @@ -182,29 +170,18 @@ public class SensorAverageProcessorApplication { Sensor sensor = new Sensor(); sensor.setId(id); sensor.setTemperature(temperature); - return new GenericMessage<>(sensor); + return sensor; }; } } - @EnableBinding(Sink.class) static class TestSink { private final Log logger = LogFactory.getLog(getClass()); - @StreamListener("test-sink") - public void receive(String payload) { - logger.info("Data received: " + payload); + @Bean + public Consumer receive() { + return payload -> logger.info("Data received: " + payload); } } - - public interface Sink { - @Input("test-sink") - SubscribableChannel sampleSink(); - } - - public interface Source { - @Output("test-source") - MessageChannel sampleSource(); - } } diff --git a/processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml b/processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml new file mode 100644 index 0000000..66e31b5 --- /dev/null +++ b/processor-samples/sensor-average-reactive-kafka/src/main/resources/application.yml @@ -0,0 +1,14 @@ +spring: + cloud: + stream: + function: + definition: calculateAverage;receive;sendTestData + bindings: + calculateAverage-out-0: + destination: average + receive-in-0: + destination: average + calculateAverage-in-0: + destination: sensor + sendTestData-out-0: + destination: sensor diff --git a/processor-samples/sensor-average-reactive-rabbit/.jdk8 b/processor-samples/sensor-average-reactive-rabbit/.jdk8 new file mode 100644 index 0000000..e69de29 diff --git a/processor-samples/sensor-average-reactive-rabbit/.mvn b/processor-samples/sensor-average-reactive-rabbit/.mvn new file mode 120000 index 0000000..d21aa17 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/.mvn @@ -0,0 +1 @@ +../../.mvn \ No newline at end of file diff --git a/processor-samples/sensor-average-reactive-rabbit/README.adoc b/processor-samples/sensor-average-reactive-rabbit/README.adoc new file mode 100644 index 0000000..103a3f6 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/README.adoc @@ -0,0 +1,42 @@ +Spring Cloud Stream Reactive Processor Sensor Average Sample +============================================================= + +This is a Spring Cloud Stream reactive processor sample that showcase a sensor temperature average calculator. + +## Requirements + +To run this sample, you will need to have installed: + +* Java 8 or Above + +## Running the application + +* Go to the application root (not the repository root, but this application) +* `docker-compose up -d` (Skip this section if you have Rabbitmq running elsewhere) + +* `./mvnw clean package` + +* `java -jar target/sensor-average-reactive-0.0.1-SNAPSHOT.jar` + +The main application contains the reactive processor that receives the sensor data for a duration (3 seconds) and averages them. +It then sends the average data (per sensor id) through the outbound destination of the processor. + +The application also provides a source and sink for testing. +Test source will generate some sensor data every 100 milliseconds and the test sink will verify that the processor has calculated the average. +Test source is bound to the same broker destination where the processor is listening from. +Similarly test sink is bound to the same broker destination where the processor is producing to. + +You will see output similar to the following on the console every 3 seconds. + +``` +Data received: {"id":100100,"average":89.0} +Data received: {"id":100200,"average":84.0} +Data received: {"id":100300,"average":88.0} +Data received: {"id":100100,"average":85.0} +Data received: {"id":100200,"average":85.0} +Data received: {"id":100300,"average":83.0} +Data received: {"id":100100,"average":80.0} +Data received: {"id":100200,"average":90.0} +``` + +* `docker-compose down` \ No newline at end of file diff --git a/processor-samples/sensor-average-reactive-rabbit/docker-compose.yml b/processor-samples/sensor-average-reactive-rabbit/docker-compose.yml new file mode 100644 index 0000000..7c3da92 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3' +services: + rabbitmq: + image: rabbitmq:management + ports: + - 5672:5672 + - 15672:15672 \ No newline at end of file diff --git a/processor-samples/sensor-average-reactive-rabbit/mvnw b/processor-samples/sensor-average-reactive-rabbit/mvnw new file mode 100755 index 0000000..0ce08e9 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/mvnw @@ -0,0 +1,226 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +echo $MAVEN_PROJECTBASEDIR +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +"$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" + diff --git a/processor-samples/sensor-average-reactive-rabbit/mvnw.cmd b/processor-samples/sensor-average-reactive-rabbit/mvnw.cmd new file mode 100644 index 0000000..7ecd01d --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/mvnw.cmd @@ -0,0 +1,145 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM https://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +set MAVEN_CMD_LINE_ARGS=%* + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar"" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS% +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/processor-samples/sensor-average-reactive-rabbit/pom.xml b/processor-samples/sensor-average-reactive-rabbit/pom.xml new file mode 100644 index 0000000..0d59c67 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/pom.xml @@ -0,0 +1,119 @@ + + + 4.0.0 + + sensor-average-reactive-rabbit + 0.0.1-SNAPSHOT + jar + sensor-average-reactive-rabbit + Spring Cloud Stream Reactive Processor for Sensor Average Rabbitmq + + + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + + + Hoxton.BUILD-SNAPSHOT + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + org.springframework.cloud + spring-cloud-stream-binder-rabbit + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + https://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release-local + + false + + + + + diff --git a/processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java b/processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java new file mode 100644 index 0000000..2fe8082 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/src/main/java/sample/sensor/average/SensorAverageProcessorApplication.java @@ -0,0 +1,187 @@ +package sample.sensor.average; + +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import reactor.core.publisher.Flux; +import reactor.core.publisher.GroupedFlux; +import reactor.core.publisher.Mono; + +@SpringBootApplication +public class SensorAverageProcessorApplication { + + public static void main(String[] args) { + SpringApplication.run(SensorAverageProcessorApplication.class, args); + } + + @Bean + public Function, Flux> calculateAverage() { + return data -> data.window(Duration.ofSeconds(3)).flatMap( + window -> window.groupBy(Sensor::getId).flatMap(this::calculateAverage)); + } + + private Mono calculateAverage(GroupedFlux group) { + return group + .reduce(new Accumulator(0, 0), + (a, d) -> new Accumulator(a.getCount() + 1, a.getTotalValue() + d.getTemperature())) + .map(accumulator -> new Average(group.key(), (accumulator.getTotalValue()) / accumulator.getCount())); + } + + static class Accumulator { + + private int count; + + private int totalValue; + + public Accumulator(int count, int totalValue) { + this.count = count; + this.totalValue = totalValue; + } + + /** + * @return the count + */ + public int getCount() { + return count; + } + + /** + * @param count the count to set + */ + public void setCount(int count) { + this.count = count; + } + + /** + * @return the totalValue + */ + public int getTotalValue() { + return totalValue; + } + + /** + * @param totalValue the totalValue to set + */ + public void setTotalValue(int totalValue) { + this.totalValue = totalValue; + } + } + + static class Average { + + private int id; + + private double average; + + public Average(int id, double average) { + this.id = id; + this.average = average; + } + + /** + * @return the id + */ + public int getId() { + return id; + } + + /** + * @param id the id to set + */ + public void setId(int id) { + this.id = id; + } + + /** + * @return the average + */ + public double getAverage() { + return average; + } + + /** + * @param average the average to set + */ + public void setAverage(double average) { + this.average = average; + } + } + + static class Sensor { + + private int id; + + private int temperature; + + /** + * @return the id + */ + public int getId() { + return id; + } + + /** + * @param id the id to set + */ + public void setId(int id) { + this.id = id; + } + + /** + * @return the temperature + */ + public int getTemperature() { + return temperature; + } + + /** + * @param temperature the temperature to set + */ + public void setTemperature(int temperature) { + this.temperature = temperature; + } + } + + //Following source and sinks are used for testing only. + //Test source will send data to the same destination where the processor receives data + //Test sink will consume data from the same destination where the processor produces data + + static class TestSource { + + private AtomicBoolean semaphore = new AtomicBoolean(true); + private Random random = new Random(); + private int[] ids = new int[]{100100, 100200, 100300}; + + @Bean + public Supplier sendTestData() { + + return () -> { + int id = ids[random.nextInt(3)]; + int temperature = random.nextInt((102 - 65) + 1) + 65; + Sensor sensor = new Sensor(); + sensor.setId(id); + sensor.setTemperature(temperature); + return sensor; + }; + } + } + + static class TestSink { + + private final Log logger = LogFactory.getLog(getClass()); + + @Bean + public Consumer receive() { + return payload -> logger.info("Data received: " + payload); + } + } +} diff --git a/processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml b/processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml new file mode 100644 index 0000000..66e31b5 --- /dev/null +++ b/processor-samples/sensor-average-reactive-rabbit/src/main/resources/application.yml @@ -0,0 +1,14 @@ +spring: + cloud: + stream: + function: + definition: calculateAverage;receive;sendTestData + bindings: + calculateAverage-out-0: + destination: average + receive-in-0: + destination: average + calculateAverage-in-0: + destination: sensor + sendTestData-out-0: + destination: sensor diff --git a/processor-samples/sensor-average-reactive/pom.xml b/processor-samples/sensor-average-reactive/pom.xml deleted file mode 100644 index b06c488..0000000 --- a/processor-samples/sensor-average-reactive/pom.xml +++ /dev/null @@ -1,86 +0,0 @@ - - - 4.0.0 - - sensor-average-reactive - 0.0.1-SNAPSHOT - jar - sensor-average-reactive - Spring Cloud Stream Reactive Processor for Sensor Average - - - io.spring.cloud.stream.sample - spring-cloud-stream-samples-parent - 0.0.1-SNAPSHOT - ../.. - - - - - org.springframework.cloud - spring-cloud-stream-reactive - - - org.springframework.boot - spring-boot-starter-test - test - - - - - - kafka-binder - - true - - - - org.springframework.cloud - spring-cloud-stream-binder-kafka - - - - - - org.springframework.boot - spring-boot-maven-plugin - - kafka - - - - - - - rabbit-binder - - - org.springframework.cloud - spring-cloud-stream-binder-rabbit - - - - - - org.springframework.boot - spring-boot-maven-plugin - - rabbit - - - - - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - - diff --git a/processor-samples/sensor-average-reactive/src/main/resources/application.yml b/processor-samples/sensor-average-reactive/src/main/resources/application.yml deleted file mode 100644 index bce2b3a..0000000 --- a/processor-samples/sensor-average-reactive/src/main/resources/application.yml +++ /dev/null @@ -1,12 +0,0 @@ -spring: - cloud: - stream: - bindings: - output: - destination: average - test-sink: - destination: average - input: - destination: sensor - test-source: - destination: sensor