Compare commits

..

4 Commits

Author SHA1 Message Date
Spring Operator
65563dd4af URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* [ ] http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* [ ] http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 26 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 13 occurrences
2019-04-24 12:52:36 -04:00
Spring Operator
95d61b1c8e URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://maven.apache.org/xsd/maven-4.0.0.xsd with 13 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 2 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://www.spring.io with 1 occurrences migrated to:
  https://www.spring.io ([https](https://www.spring.io) result 301).
* http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* http://repo.spring.io/libs-release-local with 1 occurrences migrated to:
  https://repo.spring.io/libs-release-local ([https](https://repo.spring.io/libs-release-local) result 302).
* http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 26 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 13 occurrences
2019-03-21 15:20:40 -04:00
Spring Operator
4ca25898e4 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 49 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-21 15:18:21 -04:00
Marius Bogoevici
c7896ceca9 Remove unnecessary binder dependency 2017-04-20 20:15:26 -04:00
81 changed files with 345 additions and 2946 deletions

1
.gitignore vendored
View File

@@ -19,7 +19,6 @@ _site/
*.ipr
*.iws
.idea/*
*/.idea
.factorypath
spring-xd-samples/*/xd
dump.rdb

View File

@@ -1,6 +1,6 @@
== Samples
There are several samples, most running on the RabbitMQ transport (so you need RabbitMQ running locally to test them).
There are several samples, all running on the RabbitMQ transport (so you need RabbitMQ running locally to test them).
To build the samples do:
@@ -8,37 +8,19 @@ To build the samples do:
./mvnw clean build
```
NOTE: The main set of samples are "vanilla" in the sense that they are not deployable as XD modules by the current generation (1.x) of XD. You can still interact with an XD system using the appropriate naming convention for input and output channel names (`<stream>.<index>` format).
* `double` is an example of an aggregate application, the Source and Sink are combined into one single application.
* `source` is a Java config version of the classic "timer" module from Spring XD. It has a "fixedDelay" option (in milliseconds) for the period between emitting messages.
* `dynamic-source` publishes messages to dynamically created destinations.
* `kinesis-produce-consume` An example application using spring-cloud-stream-binder-aws-kinesis. Presents a web endpoint to send Orders, these are placed on a Kinesis stream and then consumed by the application from that stream.
* `multi-io` shows how to use configure multiple input/output channels inside a single application.
* `multibinder-differentsystems` shows how an application could use same binder implementation but different configurations for its channels. In this case, a processor's input/output channels connect to same binder implementation but with two separate broker configurations.
* `multibinder` shows how an application could use multiple binders. In this case, the processor's input/output channels connect to different brokers using their own binder configurations.
* `non-self-contained-aggregate-app` shows how to write a non self-contained aggregate application.
* `reactive-processor-kafka` shows how to create a reactive Apache Kafka processor application.
* `rxjava-processor` shows how to create an RxJava processor application.
* `sink` A simple sink that logs the incoming payload. It has no options (but some could easily be added), and just logs incoming messages at INFO level.
* `source` A simple time source example. It has a "fixedDelay" option (in milliseconds) for the period between emitting messages.
* `stream-listener` shows how to use StreamListener support to enable message mapping and automatic type conversion.
* `test-embedded-kafka` is a sample that shows how to test with an embedded Apache Kafka broker.
We generally recommend testing with the https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_testing[TestSupportBinder] but if you have a need for testing with an embedded broker, you can use the techniques in this sample.
* `sink` is a Java config version of the classic "log" module from Spring XD. It has no options (but some could easily be added), and just logs incoming messages at INFO level.
* `transform` is a simple pass through logging transformer (just logs the incoming message and passes it on).
* `kstream` is a collection of applications that demonstrate the capabilities of the Spring Cloud Stream support for Kafka Streams
* `double` is a combination of 2 modules defined locally (a source and a sink, so the whole app is self contained).
* `testing` is a bunch of applications and tests for them to demonstrate the capabilities of testing for the the Spring Cloud Stream applications.
* `multibinder` is a sample application that shows how an application could use multiple binders. In this case, the processor's input/output channels connect to different brokers using their own binder configurations.
* `multibinder-differentsystems` shows how an application could use same binder implementation but different configurations for its channels. In this case, a processor's input/output channels connect to same binder implementation but with two separate broker configurations.
If you run the source and the sink and point them at the same redis instance (e.g. do nothing to get the one on localhost, or the one they are both bound to as a service on Cloud Foundry) then they will form a "stream" and start talking to each other. All the samples have friendly JMX and Actuator endpoints for inspecting what is going on in the system.

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -34,6 +34,7 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -18,14 +18,13 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DoubleApplication.class)
@SpringApplicationConfiguration(classes = DoubleApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -1 +0,0 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -1,46 +0,0 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on a contrived use case of tracking products by interactively querying their status.
The program accepts product ID's and track their counts hitherto by interactively querying the underlying store. \
This sample uses lambda expressions and thus requires Java 8+.
==== Starting Kafka in a docker container
* Skip steps 1-3 if you already have a non-Docker Kafka environment.
1. Go to the docker directory in this repo and invoke the command `docker-compose up -d`.
2. Ensure that in the docker directory and then invoke the script `start-kafka-shell.sh`
3. cd $KAFKA_HOME
4. Start the console producer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic products`
=== Running the app:
Go to the root of the repository and do:
`./mvnw clean package`
`java -jar target/kstream-interactive-query-0.0.1-SNAPSHOT.jar --kstream.product.tracker.productIds=123,124,125`
The above command will track products with ID's 123,124 and 125 and print their counts seen so far every 30 seconds.
* By default we use the docker container IP (mac osx specific) in the `application.yml` for Kafka broker and zookeeper.
Change it in `application.yml` (which requires a rebuild) or pass them as runtime arguments as below.
`spring.cloud.stream.kstream.binder.brokers=<Broker IP Address>` +
`spring.cloud.stream.kstream.binder.zkNodes=<Zookeeper IP Address>`
Enter the following in the console producer (one line at a time) and watch the output on the console (or IDE) where the application is running.
```
{"id":"123"}
{"id":"124"}
{"id":"125"}
{"id":"123"}
{"id":"123"}
{"id":"123"}
```

View File

@@ -1,14 +0,0 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@@ -1,2 +0,0 @@
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

View File

@@ -1,225 +0,0 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kstream.product.tracker</groupId>
<artifactId>kstream-interactive-query</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kstream-interactive-query</name>
<description>Spring Cloud Stream sample for KStream interactive queries</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -1,127 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kstream.word.count.kstreamwordcount;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.kafka.core.KStreamBuilderFactoryBean;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.StringUtils;
@SpringBootApplication
public class KStreamInteractiveQueryApplication {
public static void main(String[] args) {
SpringApplication.run(KStreamInteractiveQueryApplication.class, args);
}
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(ProductTrackerProperties.class)
@EnableScheduling
public static class InteractiveProductCountApplication {
private static final String STORE_NAME = "prod-id-count-store";
@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;
@Autowired
ProductTrackerProperties productTrackerProperties;
ReadOnlyKeyValueStore<Object, Object> keyValueStore;
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
.count(STORE_NAME)
.toStream();
}
private Set<Integer> productIds() {
return StringUtils.commaDelimitedListToSet(productTrackerProperties.getProductIds())
.stream().map(Integer::parseInt).collect(Collectors.toSet());
}
@Scheduled(fixedRate = 30000, initialDelay = 5000)
public void printProductCounts() {
if (keyValueStore == null) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
keyValueStore = streams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
}
for (Integer id : productIds()) {
System.out.println("Product ID: " + id + " Count: " + keyValueStore.get(id));
}
}
}
@ConfigurationProperties(prefix = "kstream.product.tracker")
static class ProductTrackerProperties {
private String productIds;
public String getProductIds() {
return productIds;
}
public void setProductIds(String productIds) {
this.productIds = productIds;
}
}
static class Product {
Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
}

View File

@@ -1,21 +0,0 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kstream:
binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.output.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
headerMode: raw
spring.cloud.stream.kstream.binder:
brokers: 192.168.99.100
zkNodes: 192.168.99.100

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -1 +0,0 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -1,58 +0,0 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on a contrived use case of tracking products.
Although contrived, this type of use cases are pretty common in the industry where some organizations want to track the statistics of some entities over a time window.
In essence, the application receives product information from input topic and count the interested products in a configurable time window and report that in an output topic.
This sample uses lambda expressions and thus requires Java 8+.
==== Starting Kafka in a docker container
* Skip steps 1-3 if you already have a non-Docker Kafka environment.
1. Go to the docker directory in this repo and invoke the command `docker-compose up -d`.
2. Ensure that in the docker directory and then invoke the script `start-kafka-shell.sh`
3. cd $KAFKA_HOME
4. Start the console producer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic products`
5. Start the console consumer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --key-deserializer org.apache.kafka.common.serialization.IntegerDeserializer --property print.key=true --topic product-counts`
=== Running the app:
Go to the root of the repository and do:
`./mvnw clean package`
`java -jar target/kstream-product-tracker-0.0.1-SNAPSHOT.jar --kstream.product.tracker.productIds=123,124,125 --spring.cloud.stream.kstream.timeWindow.length=60000 --spring.cloud.stream.kstream.timeWindow.advanceBy=30000 --spring.cloud.stream.bindings.input.destination=products`
The above command will track products with ID's 123,124 and 125 every 30 seconds with the counts from the last minute.
In other words, every 30 seconds a new 1 minute window is started.
* By default we use the docker container IP (mac osx specific) in the `application.yml` for Kafka broker and zookeeper.
Change it in `application.yml` (which requires a rebuild) or pass them as runtime arguments as below.
`spring.cloud.stream.kstream.binder.brokers=<Broker IP Address>` +
`spring.cloud.stream.kstream.binder.zkNodes=<Zookeeper IP Address>`
Enter the following in the console producer (one line at a time) and watch the output on the console consumer:
```
{"id":"123"}
{"id":"124"}
{"id":"125"}
{"id":"123"}
{"id":"123"}
{"id":"123"}
```
The default time window is configured for 30 seconds and you can change that using the following property.
`kstream.word.count.windowLength` (value is expressed in milliseconds)
In order to switch to a hopping window, you can use the `kstream.word.count.advanceBy` (value in milliseconds).
This will create an overlapped hopping windows depending on the value you provide.

View File

@@ -1,14 +0,0 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@@ -1,2 +0,0 @@
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

View File

@@ -1,225 +0,0 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kstream.product.tracker</groupId>
<artifactId>kstream-product-tracker</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kstream-product-tracker</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -1,155 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kstream.word.count.kstreamwordcount;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.StringUtils;
@SpringBootApplication
public class KStreamProductTrackerApplication {
public static void main(String[] args) {
SpringApplication.run(KStreamProductTrackerApplication.class, args);
}
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(ProductTrackerProperties.class)
public static class ProductCountApplication {
@Autowired
ProductTrackerProperties productTrackerProperties;
@Autowired
TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<Integer, ProductStatus> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(timeWindows, "product-counts")
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
Instant.ofEpochMilli(key.window().end()).atZone(ZoneId.systemDefault()).toLocalTime())));
}
private Set<Integer> productIds() {
return StringUtils.commaDelimitedListToSet(productTrackerProperties.getProductIds())
.stream().map(Integer::parseInt).collect(Collectors.toSet());
}
}
@ConfigurationProperties(prefix = "kstream.product.tracker")
static class ProductTrackerProperties {
private String productIds;
public String getProductIds() {
return productIds;
}
public void setProductIds(String productIds) {
this.productIds = productIds;
}
}
static class Product {
Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
static class ProductStatus {
private Integer id;
private long count;
private LocalTime windowStart;
private LocalTime windowEnd;
public ProductStatus(Integer id, long count, LocalTime windowStart, LocalTime windowEnd) {
this.id = id;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public LocalTime getWindowStart() {
return windowStart;
}
public void setWindowStart(LocalTime windowStart) {
this.windowStart = windowStart;
}
public LocalTime getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(LocalTime windowEnd) {
this.windowEnd = windowEnd;
}
}
}

View File

@@ -1,20 +0,0 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kstream:
binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.output.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
headerMode: raw
spring.cloud.stream.kstream.binder:
brokers: 192.168.99.100
zkNodes: 192.168.99.100

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -1,24 +0,0 @@
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/

View File

@@ -1 +0,0 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -1,43 +0,0 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on the word count application from the https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java[reference documentation].
It uses a single input and a single output.
In essence, the application receives text messages from an input topic and computes word occurrence counts in a configurable time window and report that in an output topic.
This sample uses lambda expressions and thus requires Java 8+.
==== Starting Kafka in a docker container
* Skip steps 1-3 if you already have a non-Docker Kafka environment.
1. Go to the docker directory in this repo and invoke the command `docker-compose up -d`.
2. Ensure that in the docker directory and then invoke the script `start-kafka-shell.sh`
3. cd $KAFKA_HOME
4. Start the console producer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic words`
5. Start the console consumer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --topic counts`
=== Running the app:
Go to the root of the repository and do: `./mvnw clean package`
`java -jar target/kstream-word-count-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kstream.timeWindow.length=5000`
* By default we use the docker container IP (mac osx specific) in the `application.yml` for Kafka broker and zookeeper.
Change it in `application.yml` (which requires a rebuild) or pass them as runtime arguments as below.
`spring.cloud.stream.kstream.binder.brokers=<Broker IP Address>` +
`spring.cloud.stream.kstream.binder.zkNodes=<Zookeeper IP Address>`
Enter some text in the console producer and watch the output in the console consumer.
In order to switch to a hopping window, you can use the `spring.cloud.stream.kstream.timeWindow.advanceBy` (value in milliseconds).
This will create an overlapped hopping windows depending on the value you provide.
Here is an example with 2 overlapping windows (window length of 10 seconds and a hop (advance) by 5 seconds:
`java -jar target/kstream-word-count-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kstream.timeWindow.length=10000 --spring.cloud.stream.kstream.timeWindow.advanceBy=5000`

View File

@@ -1,14 +0,0 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@@ -1,2 +0,0 @@
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

View File

@@ -1,225 +0,0 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -1,143 +0,0 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

View File

@@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kstream.word.count</groupId>
<artifactId>kstream-word-count</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kstream-word-count</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -1,114 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kstream.word.count.kstreamwordcount;
import java.util.Arrays;
import java.util.Date;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
@SpringBootApplication
public class KStreamWordCountApplication {
public static void main(String[] args) {
SpringApplication.run(KStreamWordCountApplication.class, args);
}
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts")
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
}
static class WordCount {
private String word;
private long count;
private Date start;
private Date end;
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;
this.start = start;
this.end = end;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public Date getStart() {
return start;
}
public void setStart(Date start) {
this.start = start;
}
public Date getEnd() {
return end;
}
public void setEnd(Date end) {
this.end = end;
}
}
}

View File

@@ -1,17 +0,0 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kstream.binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output:
destination: counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: words
consumer:
headerMode: raw
spring.cloud.stream.kstream.binder:
brokers: 192.168.99.100
zkNodes: 192.168.99.100

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -1,22 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<packaging>pom</packaging>
<name>spring-cloud-stream-sample-kstream-parent</name>
<description>Parent Project for KStream Samples</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<modules>
<module>kstream-word-count</module>
<module>kstream-product-tracker</module>
<module>kstream-interactive-query</module>
</modules>
</project>

View File

@@ -21,8 +21,8 @@ multiple input/output channels.
* SampleSource - the app that configures two output channels (output1 and output2).
* SampleSink - the app that configures two input channels (input1 and input2).
The channels output1 and input1 connect to the same destination (test1) on the broker (Rabbit) and the channels output2 and
input2 connect to the same destination (test2) on Rabbit.
The channels output1 and input1 connect to the same destination (test1) on the broker (Redis) and the channels output2 and
input2 connect to the same destination (test2) on redis.
For demo purpose, the apps `SampleSource` and `SampleSink` are bundled together. In practice they are separate applications
unless bundled together by the `AggregateApplicationBuilder`.

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -18,14 +18,13 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MultipleIOChannelsApplication.class)
@SpringApplicationConfiguration(classes = MultipleIOChannelsApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -12,7 +12,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -40,6 +40,24 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<version>0.9.0.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
@@ -63,11 +81,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -17,9 +17,8 @@
package multibinder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.integration.annotation.ServiceActivator;
/**
* @author Marius Bogoevici
@@ -27,8 +26,7 @@ import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding(Processor.class)
public class BridgeTransformer {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(Object payload) {
return payload;
}

View File

@@ -7,7 +7,6 @@ spring:
input:
destination: dataIn
binder: kafka1
group: testGroup
output:
destination: dataOut
binder: kafka2

View File

@@ -16,6 +16,8 @@
package multibinder;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
@@ -23,17 +25,20 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -44,8 +49,6 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import java.util.UUID;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@@ -104,6 +107,7 @@ public class TwoKafkaBindersApplicationTest {
String testPayload = "testFoo" + UUID.randomUUID().toString();
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -22,6 +22,18 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
@@ -35,6 +47,11 @@
<artifactId>spring-cloud-stream-binder-rabbit-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
@@ -46,11 +63,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -17,10 +17,8 @@
package multibinder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author Marius Bogoevici
@@ -28,8 +26,7 @@ import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding(Processor.class)
public class BridgeTransformer {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(Object payload) {
return payload;
}

View File

@@ -1,106 +1,106 @@
/*
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import java.util.UUID;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@DirtiesContext
public class RabbitAndKafkaBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test");
private final String randomGroup = UUID.randomUUID().toString();
@After
public void cleanUp() {
RabbitAdmin admin = new RabbitAdmin(rabbitTestSupport.getResource());
admin.deleteQueue("binder.dataOut.default");
admin.deleteQueue("binder.dataOut." + this.randomGroup);
admin.deleteExchange("binder.dataOut");
}
@Test
public void contextLoads() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString());
context.close();
}
@Test
public void messagingWorks() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(),
"--spring.cloud.stream.bindings.input.group=testGroup",
"--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup);
DirectChannel dataProducer = new DirectChannel();
BinderFactory binderFactory = context.getBean(BinderFactory.class);
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit", MessageChannel.class)).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
context.close();
}
}
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@DirtiesContext
public class RabbitAndKafkaBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test");
private final String randomGroup = UUID.randomUUID().toString();
@After
public void cleanUp() {
RabbitAdmin admin = new RabbitAdmin(rabbitTestSupport.getResource());
admin.deleteQueue("binder.dataOut.default");
admin.deleteQueue("binder.dataOut." + this.randomGroup);
admin.deleteExchange("binder.dataOut");
}
@Test
public void contextLoads() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString());
context.close();
}
@Test
public void messagingWorks() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(),
"--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup);
DirectChannel dataProducer = new DirectChannel();
BinderFactory binderFactory = context.getBean(BinderFactory.class);
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit", MessageChannel.class)).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
context.close();
}
}

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -28,7 +28,7 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

286
pom.xml
View File

@@ -1,148 +1,138 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>https://www.spring.io</url>
</organization>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.5.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>Ditmars.BUILD-SNAPSHOT</spring-cloud-stream.version>
<java.version>1.8</java.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
</properties>
<modules>
<module>source</module>
<module>dynamic-source</module>
<module>sink</module>
<module>transform</module>
<module>double</module>
<module>non-self-contained-aggregate-app</module>
<module>multibinder</module>
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
<module>reactive-processor-kafka</module>
<module>test-embedded-kafka</module>
<module>kstream</module>
<module>testing</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<version>${spring-kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<!--skip deploy (this is just a test module) -->
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>https://www.spring.io</url>
</organization>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.2.0.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>Brooklyn.SR3</spring-cloud-stream.version>
<java.version>1.8</java.version>
</properties>
<modules>
<module>source</module>
<module>sink</module>
<module>transform</module>
<module>double</module>
<module>non-self-contained-aggregate-app</module>
<module>multibinder</module>
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
<module>reactive-processor-kafka</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<!--skip deploy (this is just a test module) -->
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>

View File

@@ -5,11 +5,11 @@
<parent>
<artifactId>spring-cloud-stream-samples</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-reactive-processor-kafka</artifactId>
<artifactId>reactive-processor-kafka</artifactId>
<properties>
<java.version>1.8</java.version>
@@ -59,4 +59,4 @@
</plugins>
</build>
</project>
</project>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -43,6 +43,10 @@
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>

View File

@@ -16,22 +16,20 @@
package demo;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = RxJavaApplication.class)
@SpringApplicationConfiguration(classes = RxJavaApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
@Ignore
public void contextLoads() {
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -16,11 +16,12 @@
package demo;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.stream.annotation.Bindings;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
@@ -30,10 +31,8 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import static org.junit.Assert.assertNotNull;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SinkApplication.class)
@SpringApplicationConfiguration(classes = SinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -18,14 +18,13 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SourceApplication.class)
@SpringApplicationConfiguration(classes = SourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -18,14 +18,13 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TypeConversionApplication.class)
@SpringApplicationConfiguration(classes = TypeConversionApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -1,24 +0,0 @@
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/

View File

@@ -1,37 +0,0 @@
Spring Cloud Stream Testing with Embedded Kafka Broker Sample
=============================
In this *Spring Cloud Stream* sample, an embedded kafka broker is used for testing.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
## Code Tour
This sample is a Spring Boot application that uses Spring Cloud Stream to receive a `String` message and forward an upper case version of that String; it uses the Kafka binder.
* EmbeddedKafkaApplication - the Spring Boot Main Application
* EmbeddedKafkaApplicationTests - the test case
The `spring-kafka-test` dependency added to the `pom.xml` puts the `KafkaEmbedded` JUnit `@Rule` on the class path.
Refer to the [Spring for Apache Kafka Reference Manual](https://docs.spring.io/spring-kafka/reference/htmlsingle/#testing) for more information about this.
Notice how the `@BeforeClass` method sets up the Boot and binder properties to locate the servers.
The test method starts by using the Spring Boot auto-configured `KafkaTemplate` to send a message to the input destination.
It then creates a consumer to consume from the output destination; gets the output message and asserts that it's an upper case version of the sent message.
See the `application.properties` file for the properties that are required for this test case and application.
In particular, `raw` header mode is used.
## Building with Maven
Build the sample and run the test by executing:
source>$ mvn clean package
or run the test in your favorite IDE.

View File

@@ -1,56 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-test-embedded-kafka</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-test-embedded-kafka</name>
<description>Demo project for Testing with an embedded Kafka broker</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.EmbeddedKafkaApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,44 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author Gary Russell
*
*/
@SpringBootApplication
@EnableBinding(Processor.class)
public class EmbeddedKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(EmbeddedKafkaApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}
}

View File

@@ -1,12 +0,0 @@
# Test consumer properties
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=testEmbeddedKafkaApplication
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
# Binding properties
spring.cloud.stream.bindings.output.destination=testEmbeddedOut
spring.cloud.stream.bindings.input.destination=testEmbeddedIn
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=embeddedKafkaApplication

View File

@@ -1,81 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Test class demonstrating how to use an embedded kafka service with the
* kafka binder.
*
* @author Gary Russell
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class EmbeddedKafkaApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory;
@Value("${spring.cloud.stream.bindings.input.destination}")
private String inputDestination;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String outputDestination;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
@Test
public void testSendReceive() {
template.send(this.inputDestination, "foo".getBytes());
Consumer<byte[], byte[]> consumer = this.consumerFactory.createConsumer();
consumer.subscribe(Collections.singleton(this.outputDestination));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}
}

View File

@@ -1,3 +0,0 @@
== How to test Spring Cloud Stream applications?
This project contains a set of tests for simple Spring Cloud Stream applications to demonstrate what, how and when we can use to test this kind of microservices.

View File

@@ -1,55 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-testing</artifactId>
<packaging>jar</packaging>
<name>testing</name>
<description>Demonstrating how to test spring cloud stream apps</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,47 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.processor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* The Spring Cloud Stream Processor application,
* which convert incoming String to its upper case representation.
*
* @author Artem Bilan
*
*/
@SpringBootApplication
@EnableBinding(Processor.class)
public class ToUpperCaseProcessor {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}
public static void main(String[] args) {
SpringApplication.run(ToUpperCaseProcessor.class, args);
}
}

View File

@@ -1,51 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.sink;
import javax.sql.DataSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageHandler;
/**
* The Spring Cloud Stream Sink application,
* which insert payloads of incoming messages to the FOOBAR table in the RDBMS.
*
* @author Artem Bilan
*
*/
@SpringBootApplication
@EnableBinding(Sink.class)
public class JdbcSink {
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler jdbcHandler(DataSource dataSource) {
return new JdbcMessageHandler(dataSource, "INSERT INTO foobar (value) VALUES (:payload)");
}
public static void main(String[] args) {
SpringApplication.run(JdbcSink.class, args);
}
}

View File

@@ -1,55 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.source;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
/**
* The Spring Cloud Stream Source application,
* which generates each 100 milliseconds "foo" or "bar" string in round-robin manner.
*
* @author Artem Bilan
*
*/
@SpringBootApplication
@EnableBinding(Source.class)
public class FooBarSource {
private AtomicBoolean semaphore = new AtomicBoolean(true);
@Bean
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedDelay = "100"))
public MessageSource<String> fooBarStrings() {
return () ->
new GenericMessage<>(this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar");
}
public static void main(String[] args) {
SpringApplication.run(FooBarSource.class, args);
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.processor;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
/**
* The simple test-case to demonstrate how it is easy to apply unit testing assertion
* for te Spring Cloud Stream applications.
*
* @author Artem Bilan
*
*/
public class NaiveToUpperCaseTests {
@Test
public void testUpperCase() {
assertEquals("FOO", new ToUpperCaseProcessor().transform("foo"));
}
}

View File

@@ -1,106 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.processor;
import org.hamcrest.Matcher;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.BlockingQueue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesMessageThat;
import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
import static org.springframework.integration.test.matcher.PayloadAndHeaderMatcher.sameExceptIgnorableHeaders;
/**
* The Spring Boot-base test-case to demonstrate how can we test Spring Cloud Stream applications
* with available testing tools.
*
* @author Artem Bilan
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class ToUpperCaseProcessorTests {
@Autowired
private Processor channels;
@Autowired
private MessageCollector collector;
@SpyBean
private ToUpperCaseProcessor toUpperCaseProcessor;
@Test
@SuppressWarnings("unchecked")
@Ignore
public void testMessages() {
SubscribableChannel input = this.channels.input();
input.send(new GenericMessage<>("foo"));
input.send(new GenericMessage<>("bar"));
input.send(new GenericMessage<>("foo meets bar"));
input.send(new GenericMessage<>("nothing but the best test"));
BlockingQueue<Message<?>> messages = this.collector.forChannel(channels.output());
assertThat(messages, receivesPayloadThat(is("FOO")));
assertThat(messages, receivesPayloadThat(is("BAR")));
assertThat(messages, receivesPayloadThat(is("FOO MEETS BAR")));
assertThat(messages, receivesPayloadThat(not("nothing but the best test")));
Message<String> testMessage =
MessageBuilder.withPayload("headers")
.setHeader("foo", "bar")
.build();
input.send(testMessage);
Message<String> expected =
MessageBuilder.withPayload("HEADERS")
.copyHeaders(testMessage.getHeaders())
.build();
Matcher<Message<Object>> sameExceptIgnorableHeaders =
(Matcher<Message<Object>>) (Matcher<?>) sameExceptIgnorableHeaders(expected);
assertThat(messages, receivesMessageThat(sameExceptIgnorableHeaders));
verify(this.toUpperCaseProcessor, times(5)).transform(anyString());
}
}

View File

@@ -1,94 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.processor.integration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.testing.processor.ToUpperCaseProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Iterator;
import static org.assertj.core.api.Assertions.assertThat;
/**
* A Spring Boot integration test for the Spring Cloud Stream Processor application
* based on the Embedded Kafka.
*
* @author Artem Bilan
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = {
"spring.autoconfigure.exclude=org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration",
"spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"spring.cloud.stream.bindings.output.producer.headerMode=raw",
"spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"spring.cloud.stream.bindings.input.group=embeddedKafkaApplication",
"spring.kafka.consumer.group-id=EmbeddedKafkaIntTest"
},
classes = ToUpperCaseProcessor.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
@Ignore
public class ToUpperCaseProcessorIntTests {
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "output");
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private DefaultKafkaConsumerFactory<byte[], String> consumerFactory;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", kafkaEmbedded.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", kafkaEmbedded.getZookeeperConnectionString());
}
@Test
public void testMessagesOverKafka() throws Exception {
this.template.send("input", "bar".getBytes());
Consumer<byte[], String> consumer = this.consumerFactory.createConsumer();
kafkaEmbedded.consumeFromAnEmbeddedTopic(consumer, "output");
ConsumerRecords<byte[], String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isEqualTo(1);
Iterator<ConsumerRecord<byte[], String>> iterator = replies.iterator();
assertThat(iterator.next().value()).isEqualTo("BAR");
}
}

View File

@@ -1,111 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.sink;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
/**
* The Spring Boot-base test-case to demonstrate how can we test Spring Cloud Stream applications
* with available testing tools.
*
* @author Artem Bilan
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class JdbcSinkTests {
@Autowired
private Sink channels;
@Autowired
private JdbcTemplate jdbcTemplate;
@SpyBean(name = "jdbcHandler")
private MessageHandler jdbcMessageHandler;
@Test
@SuppressWarnings("unchecked")
public void testMessages() {
AbstractMessageChannel input = (AbstractMessageChannel) this.channels.input();
final AtomicReference<Message<?>> messageAtomicReference = new AtomicReference<>();
ChannelInterceptorAdapter assertionInterceptor = new ChannelInterceptorAdapter() {
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
messageAtomicReference.set(message);
super.afterSendCompletion(message, channel, sent, ex);
}
};
input.addInterceptor(assertionInterceptor);
input.send(new GenericMessage<>("foo"));
input.removeInterceptor(assertionInterceptor);
input.send(new GenericMessage<>("bar"));
List<Map<String, Object>> data = this.jdbcTemplate.queryForList("SELECT * FROM foobar");
assertThat(data.size()).isEqualTo(2);
assertThat(data.get(0).get("value")).isEqualTo("foo");
assertThat(data.get(1).get("value")).isEqualTo("bar");
Message<?> message1 = messageAtomicReference.get();
assertThat(message1).isNotNull();
assertThat(message1).hasFieldOrPropertyWithValue("payload", "foo");
ArgumentCaptor<Message<?>> messageArgumentCaptor =
(ArgumentCaptor<Message<?>>) (ArgumentCaptor<?>) ArgumentCaptor.forClass(Message.class);
verify(this.jdbcMessageHandler, times(2)).handleMessage(messageArgumentCaptor.capture());
Message<?> message = messageArgumentCaptor.getValue();
assertThat(message).hasFieldOrPropertyWithValue("payload", "bar");
}
}

View File

@@ -1,65 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.source;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.springframework.cloud.stream.test.matcher.MessageQueueMatcher.receivesPayloadThat;
import java.util.concurrent.BlockingQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
/**
* The Spring Boot-base test-case to demonstrate how can we test Spring Cloud Stream applications
* with available testing tools.
*
* @author Artem Bilan
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class FooBarSourceTests {
@Autowired
private Source channels;
@Autowired
private MessageCollector collector;
@Test
public void testMessages() {
BlockingQueue<Message<?>> messages = this.collector.forChannel(channels.output());
assertThat(messages, receivesPayloadThat(is("foo")));
assertThat(messages, receivesPayloadThat(is("bar")));
assertThat(messages, receivesPayloadThat(is("foo")));
assertThat(messages, receivesPayloadThat(is("bar")));
}
}

View File

@@ -1,3 +0,0 @@
CREATE TABLE FOOBAR (
value VARCHAR(256),
);

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -18,14 +18,13 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TransformApplication.class)
@SpringApplicationConfiguration(classes = TransformApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {