diff --git a/multibinder-samples/multibinder-kafka-streams/.mvn b/multibinder-samples/multibinder-kafka-streams/.mvn new file mode 100644 index 0000000..19172e1 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/.mvn @@ -0,0 +1 @@ +../.mvn \ No newline at end of file diff --git a/multibinder-samples/multibinder-kafka-streams/README.adoc b/multibinder-samples/multibinder-kafka-streams/README.adoc new file mode 100644 index 0000000..f71499b --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/README.adoc @@ -0,0 +1,43 @@ +== Spring Cloud Stream Multibinder Application with Different Systems + +This example shows how to run a Spring Cloud Stream application with the same binder type configured for two separate Kafka clusters. + + +## Running the application + +The following instructions assume that you are running Kafka as a Docker image. + +* Go to the application root +* `docker-compose up -d` + +This brings up two Kafka clusters in docker containers. +Local ports mapped for kafka are 9092 and 9093 (Zookeeper local parts mapped are 2181 and 2182). + +* `./mvnw clean package` + +The sample comes with a convenient test producer and consumer to see the processor in action. +After running the program, watch your console, every second some data is sent to Kafka cluster 1 and it is received through Kafka cluster 2. + +To run the example, command line parameters for the Zookeeper ensembles and Kafka clusters must be provided, as in the following example: +``` +java -jar target/multibinder-kafka-streams-0.0.1-SNAPSHOT.jar --kafkaBroker1=localhost:9092 --zk1=localhost:2181 --kafkaBroker2=localhost:9093 --zk2=localhost:2182``` + +Alternatively, the default values of `localhost:9092` and `localhost:2181` can be provided for both clusters. + +Assuming you are running two dockerized Kafka clusters as above. + +Issue the following commands: + +`docker exec -it kafka-multibinder-1 /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic dataIn` + +On another terminal: + +`docker exec -it kafka-multibinder-2 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic dataOut` + +Enter some text on the first one and the same text appears on the second one. + +## Running the Kafka Streams processor: + +Run the stand-alone Producers application a few times to to generate some data. + +Then go to the URL: http://localhost:8080/events \ No newline at end of file diff --git a/multibinder-samples/multibinder-kafka-streams/docker-compose.yml b/multibinder-samples/multibinder-kafka-streams/docker-compose.yml new file mode 100644 index 0000000..e7082eb --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/docker-compose.yml @@ -0,0 +1,36 @@ +version: '3' +services: + kafka1: + image: wurstmeister/kafka + container_name: kafka-multibinder-1 + ports: + - "9092:9092" + environment: + - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 + - KAFKA_ADVERTISED_PORT=9092 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181 + depends_on: + - zookeeper1 + zookeeper1: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + environment: + - KAFKA_ADVERTISED_HOST_NAME=zookeeper1 + kafka2: + image: wurstmeister/kafka + container_name: kafka-multibinder-2 + ports: + - "9093:9092" + environment: + - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 + - KAFKA_ADVERTISED_PORT=9092 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper2:2181 + depends_on: + - zookeeper2 + zookeeper2: + image: wurstmeister/zookeeper + ports: + - "2182:2181" + environment: + - KAFKA_ADVERTISED_HOST_NAME=zookeeper2 \ No newline at end of file diff --git a/multibinder-samples/multibinder-kafka-streams/mvnw b/multibinder-samples/multibinder-kafka-streams/mvnw new file mode 100755 index 0000000..6efc7bd --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/mvnw @@ -0,0 +1,226 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven2 Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Migwn, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" + # TODO classpath? +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +echo $MAVEN_PROJECTBASEDIR +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +"$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" + diff --git a/multibinder-samples/multibinder-kafka-streams/mvnw.cmd b/multibinder-samples/multibinder-kafka-streams/mvnw.cmd new file mode 100644 index 0000000..b0dc0e7 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/mvnw.cmd @@ -0,0 +1,145 @@ +@REM ---------------------------------------------------------------------------- +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM ---------------------------------------------------------------------------- + +@REM ---------------------------------------------------------------------------- +@REM Maven2 Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +set MAVEN_CMD_LINE_ARGS=%* + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" + +set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar"" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS% +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/multibinder-samples/multibinder-kafka-streams/pom.xml b/multibinder-samples/multibinder-kafka-streams/pom.xml new file mode 100644 index 0000000..77939b3 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + + multibinder-kafka-streams + 0.0.1-SNAPSHOT + jar + multibinder-kafka-streams + Spring Cloud Stream Multibinder Two Kafka Clusters (Kafka Streams) Sample + + + io.spring.cloud.stream.sample + spring-cloud-stream-samples-parent + 0.0.1-SNAPSHOT + ../.. + + + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + 2.1.0.BUILD-SNAPSHOT + + + org.springframework.cloud + spring-cloud-stream-binder-kafka-core + 2.1.0.BUILD-SNAPSHOT + + + org.springframework.cloud + spring-cloud-stream-binder-kafka-streams + 2.1.0.BUILD-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/BridgeTransformer.java b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/BridgeTransformer.java new file mode 100644 index 0000000..c9d21a0 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/BridgeTransformer.java @@ -0,0 +1,153 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package multibinder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.core.MessageSource; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author Marius Bogoevici + * @author Soby Chacko + */ +@EnableBinding(Processor.class) +public class BridgeTransformer { + + @Autowired + private InteractiveQueryService interactiveQueryService; + + @StreamListener(Processor.INPUT) + @SendTo(Processor.OUTPUT) + public Object transform(Object payload) { + return payload; + } + + //Following source is used as test producer. + @EnableBinding(TestSource.class) + static class TestProducer { + + private AtomicBoolean semaphore = new AtomicBoolean(true); + + @Bean + @InboundChannelAdapter(channel = TestSource.OUTPUT, poller = @Poller(fixedDelay = "1000")) + public MessageSource sendTestData() { + return () -> + new GenericMessage<>(this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar"); + + } + } + + //Following sink is used as test consumer for the above processor. It logs the data received through the processor. + @EnableBinding(TestSink.class) + static class TestConsumer { + + private final Log logger = LogFactory.getLog(getClass()); + + @StreamListener(TestSink.INPUT) + public void receive(String data) { + logger.info("Data received..." + data); + } + } + + @EnableBinding(KafkaStreamsProcessorX.class) + static class KafkaStreamsAggregateSampleApplication { + + @StreamListener("input2") + public void process(KStream input) { + ObjectMapper mapper = new ObjectMapper(); + Serde domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper ); + + input + .groupBy( + (s, domainEvent) -> domainEvent.boardUuid, + Serialized.with(null, domainEventSerde)) + .aggregate( + String::new, + (s, domainEvent, board) -> board.concat(domainEvent.eventType), + Materialized.>as("test-events-snapshots").withKeySerde(Serdes.String()). + withValueSerde(Serdes.String()) + ); + } + } + + @RestController + public class FooController { + + @RequestMapping("/events") + public String events() { + + final ReadOnlyKeyValueStore topFiveStore = + interactiveQueryService.getQueryableStore("test-events-snapshots", QueryableStoreTypes.keyValueStore()); + return topFiveStore.get("12345"); + } + } + + interface TestSink { + + String INPUT = "input1"; + + @Input(INPUT) + SubscribableChannel input1(); + + } + + interface TestSource { + + String OUTPUT = "output1"; + + @Output(TestSource.OUTPUT) + MessageChannel output(); + + } + + interface KafkaStreamsProcessorX { + + @Input("input2") + KStream input2(); + } + +} diff --git a/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/DomainEvent.java b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/DomainEvent.java new file mode 100644 index 0000000..7c6d778 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/DomainEvent.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package multibinder; + +/** + * @author Soby Chacko + */ +public class DomainEvent { + + String eventType; + + String boardUuid; + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getBoardUuid() { + return boardUuid; + } + + public void setBoardUuid(String boardUuid) { + this.boardUuid = boardUuid; + } +} diff --git a/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/MultibinderApplication.java b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/MultibinderApplication.java new file mode 100644 index 0000000..7aba5f6 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/MultibinderApplication.java @@ -0,0 +1,29 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package multibinder; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MultibinderApplication { + + public static void main(String[] args) { + SpringApplication.run(MultibinderApplication.class, args); + } + +} diff --git a/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/Producers.java b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/Producers.java new file mode 100644 index 0000000..22edd38 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/src/main/java/multibinder/Producers.java @@ -0,0 +1,60 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package multibinder; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Soby Chacko + */ +public class Producers { + + public static void main(String... args) { + + ObjectMapper mapper = new ObjectMapper(); + Serde domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper); + + + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); + props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + props.put(ProducerConfig.LINGER_MS_CONFIG, 1); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, domainEventSerde.serializer().getClass()); + + DomainEvent ddEvent = new DomainEvent(); + ddEvent.setBoardUuid("12345"); + ddEvent.setEventType("thisisanevent"); + + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(props); + KafkaTemplate template = new KafkaTemplate<>(pf, true); + template.setDefaultTopic("foobar"); + + template.sendDefault("", ddEvent); + } +} diff --git a/multibinder-samples/multibinder-kafka-streams/src/main/resources/application.yml b/multibinder-samples/multibinder-kafka-streams/src/main/resources/application.yml new file mode 100644 index 0000000..219ae70 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/src/main/resources/application.yml @@ -0,0 +1,55 @@ +spring: + cloud: + stream: + kafka: + streams: + binder: + brokers: ${kafkaBroker2} + applicationId: multi-binder-kafka-streams + configuration: + default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + default.value.serde: org.apache.kafka.common.serialization.Serdes$BytesSerde + commit.interval.ms: 1000 + bindings: + input: + destination: dataIn + binder: kafka1 + group: testGroup + output: + destination: dataOut + binder: kafka2 + #Test source binding (used for testing) + output1: + destination: dataIn + binder: kafka1 + #Test sink binding (used for testing) + input1: + destination: dataOut + binder: kafka2 + input2: + destination: foobar + binder: kafka3 + binders: + kafka1: + type: kafka + environment: + spring: + cloud: + stream: + kafka: + binder: + brokers: ${kafkaBroker1} + kafka2: + type: kafka + environment: + spring: + cloud: + stream: + kafka: + binder: + brokers: ${kafkaBroker2} + kafka3: + type: kstream + + + diff --git a/multibinder-samples/multibinder-kafka-streams/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java b/multibinder-samples/multibinder-kafka-streams/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java new file mode 100644 index 0000000..9a42378 --- /dev/null +++ b/multibinder-samples/multibinder-kafka-streams/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package multibinder; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.binder.BinderFactory; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.UUID; + +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; + +@RunWith(SpringRunner.class) +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.NONE) +@DirtiesContext +public class TwoKafkaBindersApplicationTest { + + @ClassRule + public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1); + + @ClassRule + public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1); + + + @BeforeClass + public static void setupEnvironment() { + System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString()); + System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString()); + System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString()); + System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString()); + } + + @Autowired + private BinderFactory binderFactory; + + @Test + public void contextLoads() { + Binder binder1 = binderFactory.getBinder("kafka1", MessageChannel.class); + KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1; + DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1); + KafkaBinderConfigurationProperties configuration1 = + (KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties"); + Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1)); + Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString())); + + Binder binder2 = binderFactory.getBinder("kafka2", MessageChannel.class); + KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2; + DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2); + KafkaBinderConfigurationProperties configuration2 = + (KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties"); + Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1)); + Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString())); + } + + @Test + public void messagingWorks() { + QueueChannel dataConsumer = new QueueChannel(); + ((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2", MessageChannel.class)).bindConsumer("dataOut", UUID.randomUUID().toString(), + dataConsumer, new ExtendedConsumerProperties<>(new KafkaConsumerProperties())); + + //receiving test message sent by the test producer in the application + Message receive = dataConsumer.receive(60_000); + Assert.assertThat(receive, Matchers.notNullValue()); + Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo("foo".getBytes())); + } + +} diff --git a/multibinder-samples/pom.xml b/multibinder-samples/pom.xml index a72b096..0d98d62 100644 --- a/multibinder-samples/pom.xml +++ b/multibinder-samples/pom.xml @@ -12,6 +12,7 @@ multibinder-kafka-rabbit multibinder-two-kafka-clusters kafka-multibinder-jaas + multibinder-kafka-streams