Compare commits

..

24 Commits
1.0 ... Ditmars

Author SHA1 Message Date
Spring Operator
8416f4d1f1 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* [ ] http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* [ ] http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 38 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 19 occurrences
2019-04-24 12:52:22 -04:00
Spring Operator
5bfe5f3df7 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* [ ] http://docs.spring.io/spring-kafka/reference/htmlsingle/ (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-kafka/reference/htmlsingle/ ([https](https://docs.spring.io/spring-kafka/reference/htmlsingle/) result 404).

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ with 1 occurrences migrated to:
  https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ ([https](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/) result 200).

# Ignored
These URLs were intentionally ignored.

* http://localhost:8080 with 2 occurrences
* http://localhost:8080/ with 1 occurrences
2019-04-24 12:51:38 -04:00
Spring Operator
6068a20f29 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://maven.apache.org/xsd/maven-4.0.0.xsd with 19 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 6 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://www.spring.io with 1 occurrences migrated to:
  https://www.spring.io ([https](https://www.spring.io) result 301).
* http://repo.spring.io/libs-milestone-local with 5 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* http://repo.spring.io/libs-release-local with 1 occurrences migrated to:
  https://repo.spring.io/libs-release-local ([https](https://repo.spring.io/libs-release-local) result 302).
* http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 38 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 19 occurrences
2019-03-21 15:20:21 -04:00
Spring Operator
7cd6a6697e URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 62 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-21 15:17:39 -04:00
Soby Chacko
6feafa8e4d Temporarily ignoring a test 2018-01-08 15:58:48 -05:00
Soby Chacko
d2a0f659c7 Cleaing up tests/dependencies 2018-01-08 14:55:44 -05:00
Soby Chacko
c81c69587a Removing Kinesis samples from spring-cloud-stream-samples Ditmars 2018-01-08 14:24:51 -05:00
Artem Bilan
1204d0fcb4 Some improvement for testing sample
* Add JavaDocs
* Demonstrate more test utilities usage and testing approaches
2017-10-16 13:38:20 -04:00
Artem Bilan
16beaa71e9 Add testing sample
Rename artifactId to spring-cloud-stream-sample-testing
2017-10-13 11:22:03 -04:00
Soby Chacko
a2fea3a188 Adding KStream sample applications
Add sample for the canonical WordCount
Showing how to track group of products
Showing how to implement interactive queries

Fix #41

Update scst versions
Enhance kstream samples
2017-09-14 09:49:59 +05:30
Peter.Oates
30a0c1d734 Add spring-cloud-stream-binder-aws-kinesis
Resolves spring-cloud/spring-cloud-stream-binder-aws-kinesis#3

Code style polishing
2017-09-13 14:02:52 -04:00
Vinicius Carvalho
4861ae9225 Removing XD mentions, polishing samples listing 2017-08-17 15:03:51 -03:00
Gary Russell
b29fc5491a Add test-embedded-kafka Sample
Also some general README and pom cleanup.

README Polishing

Resolves #37

Polishing - Use Boot Config for Test Consumer

Possible now that https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/137 is merged.
2017-08-17 13:33:20 -04:00
Marius Bogoevici
6fec783413 Update Spring Cloud Build to 1.3.3.RELEASE 2017-07-19 15:06:48 -04:00
Marius Bogoevici
cb96b31009 Remove unnecessary Rabbit binder from 'double' example 2017-06-10 01:59:58 -04:00
Ilayaperumal Gopinathan
d7ce9cb43d Update README.md for multi-io 2017-05-24 16:49:51 +05:30
Marius Bogoevici
d637b73fc7 Fix some failing tests
- Use Chelsea.BUILD-SNAPSHOT to get latest updates
- Use consumer groups for multibinder so that
  behaviour is deterministic
2017-04-07 13:37:47 -04:00
Ilayaperumal Gopinathan
a1f78b74bd Update Chelsea RELEASE
- Some polishing
2017-04-06 11:01:21 +05:30
Ilayaperumal Gopinathan
aa17d3c50b Add non self-contained aggregate application
Resolves #30
2017-03-20 11:38:34 +00:00
Marius Bogoevici
00664fe229 Update dependency versions 2017-03-06 22:35:49 -05:00
Marius Bogoevici
e41b7e1db7 Updated RxJava transformer to 1.1.0 2017-03-06 22:35:49 -05:00
Ilayaperumal Gopinathan
d55da10ffc Sample to showcase dynamic destination binding
Resolves #28
2017-02-23 14:01:03 +00:00
Jannik Weichert
32b6cfe93e Create nice bullet points for available properties 2017-02-04 17:41:14 +01:00
Marius Bogoevici
0b00a2361d Update to Spring Cloud Stream Brooklyn.BUILD-SNAPSHOT 2016-08-25 22:57:38 -04:00
108 changed files with 3740 additions and 440 deletions

2
.gitignore vendored
View File

@@ -13,11 +13,13 @@ _site/
.settings
.springBeans
.DS_Store
*/.idea
*.sw*
*.iml
*.ipr
*.iws
.idea/*
*/.idea
.factorypath
spring-xd-samples/*/xd
dump.rdb

View File

@@ -1,6 +1,6 @@
== Samples
There are several samples, all running on the RabbitMQ transport (so you need RabbitMQ running locally to test them).
There are several samples, most running on the RabbitMQ transport (so you need RabbitMQ running locally to test them).
To build the samples do:
@@ -8,19 +8,37 @@ To build the samples do:
./mvnw clean build
```
NOTE: The main set of samples are "vanilla" in the sense that they are not deployable as XD modules by the current generation (1.x) of XD. You can still interact with an XD system using the appropriate naming convention for input and output channel names (`<stream>.<index>` format).
* `source` is a Java config version of the classic "timer" module from Spring XD. It has a "fixedDelay" option (in milliseconds) for the period between emitting messages.
* `double` is an example of an aggregate application, the Source and Sink are combined into one single application.
* `sink` is a Java config version of the classic "log" module from Spring XD. It has no options (but some could easily be added), and just logs incoming messages at INFO level.
* `dynamic-source` publishes messages to dynamically created destinations.
* `transform` is a simple pass through logging transformer (just logs the incoming message and passes it on).
* `kinesis-produce-consume` An example application using spring-cloud-stream-binder-aws-kinesis. Presents a web endpoint to send Orders, these are placed on a Kinesis stream and then consumed by the application from that stream.
* `double` is a combination of 2 modules defined locally (a source and a sink, so the whole app is self contained).
* `multibinder` is a sample application that shows how an application could use multiple binders. In this case, the processor's input/output channels connect to different brokers using their own binder configurations.
* `multi-io` shows how to use configure multiple input/output channels inside a single application.
* `multibinder-differentsystems` shows how an application could use same binder implementation but different configurations for its channels. In this case, a processor's input/output channels connect to same binder implementation but with two separate broker configurations.
If you run the source and the sink and point them at the same redis instance (e.g. do nothing to get the one on localhost, or the one they are both bound to as a service on Cloud Foundry) then they will form a "stream" and start talking to each other. All the samples have friendly JMX and Actuator endpoints for inspecting what is going on in the system.
* `multibinder` shows how an application could use multiple binders. In this case, the processor's input/output channels connect to different brokers using their own binder configurations.
* `non-self-contained-aggregate-app` shows how to write a non self-contained aggregate application.
* `reactive-processor-kafka` shows how to create a reactive Apache Kafka processor application.
* `rxjava-processor` shows how to create an RxJava processor application.
* `sink` A simple sink that logs the incoming payload. It has no options (but some could easily be added), and just logs incoming messages at INFO level.
* `source` A simple time source example. It has a "fixedDelay" option (in milliseconds) for the period between emitting messages.
* `stream-listener` shows how to use StreamListener support to enable message mapping and automatic type conversion.
* `test-embedded-kafka` is a sample that shows how to test with an embedded Apache Kafka broker.
We generally recommend testing with the https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_testing[TestSupportBinder] but if you have a need for testing with an embedded broker, you can use the techniques in this sample.
* `transform` is a simple pass through logging transformer (just logs the incoming message and passes it on).
* `kstream` is a collection of applications that demonstrate the capabilities of the Spring Cloud Stream support for Kafka Streams
* `testing` is a bunch of applications and tests for them to demonstrate the capabilities of testing for the the Spring Cloud Stream applications.

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -34,11 +34,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -29,10 +29,10 @@ import org.springframework.context.ConfigurableApplicationContext;
public class DoubleApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder().
from(SourceApplication.class).args("--fixedDelay=5000")
new AggregateApplicationBuilder(DoubleApplication.class, args)
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class)
.to(SinkApplication.class).args("--debug=true").run("--spring.application.name=aggregate-test");
.to(SinkApplication.class).args("--debug=true").run();
}
}

View File

@@ -18,13 +18,14 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = DoubleApplication.class)
@SpringBootTest(classes = DoubleApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

43
dynamic-source/README.md Normal file
View File

@@ -0,0 +1,43 @@
Spring Cloud Stream Source Sample
=============================
In this *Spring Cloud Stream* sample, a source application publishes messages to dynamically created destinations.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
This example requires RabbitMQ to be running on localhost.
## Code Tour
The class `SourceWithDynamicDestination` is a REST controller that registers the 'POST' request mapping for '/'.
When a payload is sent to 'http://localhost:8080/' by a POST request (port 8080 is the default), this application
then uses `BinderAwareChannelResolver` to resolve the destination dynamically at runtime. Currently, this resolver uses
`payload` as the SpEL expression to resolve the destination name. Hence, if a payload `testing` is sent to the app, then
this source application sends the message `testing` into the Rabbit exchange `testing`. This exchange or topic (in case
of Kafka if Kafka binder is used) is created dynamically and bound to send the payload.
Upon starting the application on the default port 8080, if the following data are sent:
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-2","bill-pay":"150"}' http://localhost:8080
The destinations 'customerId-1' and 'customerId-2' are created at the broker (for example: exchange in case of Rabbit or topic in case of Kafka with the names 'customerId-1' and 'customerId-2') and the data are published to the appropriate destinations dynamically.
## Building with Maven
Build the sample by executing:
source>$ mvn clean package
## Running the Sample
To start the source module execute the following:
source>$ java -jar target/spring-cloud-stream-sample-dynamic-source-1.1.0.BUILD-SNAPSHOT-exec.jar

62
dynamic-source/pom.xml Normal file
View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-dynamic-source</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-dynamic-source</name>
<description>Demo project for source module</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.SourceApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import java.util.Collections;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.dsl.support.FunctionExpression;
import org.springframework.integration.router.AbstractMappingMessageRouter;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import static org.springframework.web.bind.annotation.RequestMethod.POST;
/**
* @author Ilayaperumal Gopinathan
*/
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@Autowired
@Qualifier("sourceChannel")
private MessageChannel localChannel;
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
localChannel.send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "sourceChannel")
public MessageChannel localChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

View File

Binary file not shown.

View File

@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -0,0 +1,46 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on a contrived use case of tracking products by interactively querying their status.
The program accepts product ID's and track their counts hitherto by interactively querying the underlying store. \
This sample uses lambda expressions and thus requires Java 8+.
==== Starting Kafka in a docker container
* Skip steps 1-3 if you already have a non-Docker Kafka environment.
1. Go to the docker directory in this repo and invoke the command `docker-compose up -d`.
2. Ensure that in the docker directory and then invoke the script `start-kafka-shell.sh`
3. cd $KAFKA_HOME
4. Start the console producer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic products`
=== Running the app:
Go to the root of the repository and do:
`./mvnw clean package`
`java -jar target/kstream-interactive-query-0.0.1-SNAPSHOT.jar --kstream.product.tracker.productIds=123,124,125`
The above command will track products with ID's 123,124 and 125 and print their counts seen so far every 30 seconds.
* By default we use the docker container IP (mac osx specific) in the `application.yml` for Kafka broker and zookeeper.
Change it in `application.yml` (which requires a rebuild) or pass them as runtime arguments as below.
`spring.cloud.stream.kstream.binder.brokers=<Broker IP Address>` +
`spring.cloud.stream.kstream.binder.zkNodes=<Zookeeper IP Address>`
Enter the following in the console producer (one line at a time) and watch the output on the console (or IDE) where the application is running.
```
{"id":"123"}
{"id":"124"}
{"id":"125"}
{"id":"123"}
{"id":"123"}
{"id":"123"}
```

View File

@@ -0,0 +1,14 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@@ -0,0 +1,2 @@
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

225
kstream/kstream-interactive-query/mvnw vendored Executable file
View File

@@ -0,0 +1,225 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kstream.product.tracker</groupId>
<artifactId>kstream-interactive-query</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kstream-interactive-query</name>
<description>Spring Cloud Stream sample for KStream interactive queries</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -0,0 +1,127 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kstream.word.count.kstreamwordcount;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.kafka.core.KStreamBuilderFactoryBean;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.StringUtils;
@SpringBootApplication
public class KStreamInteractiveQueryApplication {
public static void main(String[] args) {
SpringApplication.run(KStreamInteractiveQueryApplication.class, args);
}
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(ProductTrackerProperties.class)
@EnableScheduling
public static class InteractiveProductCountApplication {
private static final String STORE_NAME = "prod-id-count-store";
@Autowired
private KStreamBuilderFactoryBean kStreamBuilderFactoryBean;
@Autowired
ProductTrackerProperties productTrackerProperties;
ReadOnlyKeyValueStore<Object, Object> keyValueStore;
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
.count(STORE_NAME)
.toStream();
}
private Set<Integer> productIds() {
return StringUtils.commaDelimitedListToSet(productTrackerProperties.getProductIds())
.stream().map(Integer::parseInt).collect(Collectors.toSet());
}
@Scheduled(fixedRate = 30000, initialDelay = 5000)
public void printProductCounts() {
if (keyValueStore == null) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
keyValueStore = streams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
}
for (Integer id : productIds()) {
System.out.println("Product ID: " + id + " Count: " + keyValueStore.get(id));
}
}
}
@ConfigurationProperties(prefix = "kstream.product.tracker")
static class ProductTrackerProperties {
private String productIds;
public String getProductIds() {
return productIds;
}
public void setProductIds(String productIds) {
this.productIds = productIds;
}
}
static class Product {
Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
}

View File

@@ -0,0 +1,21 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kstream:
binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.output.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
headerMode: raw
spring.cloud.stream.kstream.binder:
brokers: 192.168.99.100
zkNodes: 192.168.99.100

View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

Binary file not shown.

View File

@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -0,0 +1,58 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on a contrived use case of tracking products.
Although contrived, this type of use cases are pretty common in the industry where some organizations want to track the statistics of some entities over a time window.
In essence, the application receives product information from input topic and count the interested products in a configurable time window and report that in an output topic.
This sample uses lambda expressions and thus requires Java 8+.
==== Starting Kafka in a docker container
* Skip steps 1-3 if you already have a non-Docker Kafka environment.
1. Go to the docker directory in this repo and invoke the command `docker-compose up -d`.
2. Ensure that in the docker directory and then invoke the script `start-kafka-shell.sh`
3. cd $KAFKA_HOME
4. Start the console producer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic products`
5. Start the console consumer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --key-deserializer org.apache.kafka.common.serialization.IntegerDeserializer --property print.key=true --topic product-counts`
=== Running the app:
Go to the root of the repository and do:
`./mvnw clean package`
`java -jar target/kstream-product-tracker-0.0.1-SNAPSHOT.jar --kstream.product.tracker.productIds=123,124,125 --spring.cloud.stream.kstream.timeWindow.length=60000 --spring.cloud.stream.kstream.timeWindow.advanceBy=30000 --spring.cloud.stream.bindings.input.destination=products`
The above command will track products with ID's 123,124 and 125 every 30 seconds with the counts from the last minute.
In other words, every 30 seconds a new 1 minute window is started.
* By default we use the docker container IP (mac osx specific) in the `application.yml` for Kafka broker and zookeeper.
Change it in `application.yml` (which requires a rebuild) or pass them as runtime arguments as below.
`spring.cloud.stream.kstream.binder.brokers=<Broker IP Address>` +
`spring.cloud.stream.kstream.binder.zkNodes=<Zookeeper IP Address>`
Enter the following in the console producer (one line at a time) and watch the output on the console consumer:
```
{"id":"123"}
{"id":"124"}
{"id":"125"}
{"id":"123"}
{"id":"123"}
{"id":"123"}
```
The default time window is configured for 30 seconds and you can change that using the following property.
`kstream.word.count.windowLength` (value is expressed in milliseconds)
In order to switch to a hopping window, you can use the `kstream.word.count.advanceBy` (value in milliseconds).
This will create an overlapped hopping windows depending on the value you provide.

View File

@@ -0,0 +1,14 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@@ -0,0 +1,2 @@
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

225
kstream/kstream-product-tracker/mvnw vendored Executable file
View File

@@ -0,0 +1,225 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kstream.product.tracker</groupId>
<artifactId>kstream-product-tracker</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kstream-product-tracker</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -0,0 +1,155 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kstream.word.count.kstreamwordcount;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.StringUtils;
@SpringBootApplication
public class KStreamProductTrackerApplication {
public static void main(String[] args) {
SpringApplication.run(KStreamProductTrackerApplication.class, args);
}
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(ProductTrackerProperties.class)
public static class ProductCountApplication {
@Autowired
ProductTrackerProperties productTrackerProperties;
@Autowired
TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<Integer, ProductStatus> process(KStream<Object, Product> input) {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(timeWindows, "product-counts")
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
Instant.ofEpochMilli(key.window().end()).atZone(ZoneId.systemDefault()).toLocalTime())));
}
private Set<Integer> productIds() {
return StringUtils.commaDelimitedListToSet(productTrackerProperties.getProductIds())
.stream().map(Integer::parseInt).collect(Collectors.toSet());
}
}
@ConfigurationProperties(prefix = "kstream.product.tracker")
static class ProductTrackerProperties {
private String productIds;
public String getProductIds() {
return productIds;
}
public void setProductIds(String productIds) {
this.productIds = productIds;
}
}
static class Product {
Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
static class ProductStatus {
private Integer id;
private long count;
private LocalTime windowStart;
private LocalTime windowEnd;
public ProductStatus(Integer id, long count, LocalTime windowStart, LocalTime windowEnd) {
this.id = id;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public LocalTime getWindowStart() {
return windowStart;
}
public void setWindowStart(LocalTime windowStart) {
this.windowStart = windowStart;
}
public LocalTime getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(LocalTime windowEnd) {
this.windowEnd = windowEnd;
}
}
}

View File

@@ -0,0 +1,20 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kstream:
binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.output.producer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
headerMode: raw
spring.cloud.stream.kstream.binder:
brokers: 192.168.99.100
zkNodes: 192.168.99.100

View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

24
kstream/kstream-word-count/.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/

View File

Binary file not shown.

View File

@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -0,0 +1,43 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The example is based on the word count application from the https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java[reference documentation].
It uses a single input and a single output.
In essence, the application receives text messages from an input topic and computes word occurrence counts in a configurable time window and report that in an output topic.
This sample uses lambda expressions and thus requires Java 8+.
==== Starting Kafka in a docker container
* Skip steps 1-3 if you already have a non-Docker Kafka environment.
1. Go to the docker directory in this repo and invoke the command `docker-compose up -d`.
2. Ensure that in the docker directory and then invoke the script `start-kafka-shell.sh`
3. cd $KAFKA_HOME
4. Start the console producer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic words`
5. Start the console consumer: +
Assuming that you are running kafka on a docker container on mac osx. Change the zookeeper IP address accordingly otherwise. +
`bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.100:9092 --topic counts`
=== Running the app:
Go to the root of the repository and do: `./mvnw clean package`
`java -jar target/kstream-word-count-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kstream.timeWindow.length=5000`
* By default we use the docker container IP (mac osx specific) in the `application.yml` for Kafka broker and zookeeper.
Change it in `application.yml` (which requires a rebuild) or pass them as runtime arguments as below.
`spring.cloud.stream.kstream.binder.brokers=<Broker IP Address>` +
`spring.cloud.stream.kstream.binder.zkNodes=<Zookeeper IP Address>`
Enter some text in the console producer and watch the output in the console consumer.
In order to switch to a hopping window, you can use the `spring.cloud.stream.kstream.timeWindow.advanceBy` (value in milliseconds).
This will create an overlapped hopping windows depending on the value you provide.
Here is an example with 2 overlapping windows (window length of 10 seconds and a hop (advance) by 5 seconds:
`java -jar target/kstream-word-count-0.0.1-SNAPSHOT.jar --spring.cloud.stream.kstream.timeWindow.length=10000 --spring.cloud.stream.kstream.timeWindow.advanceBy=5000`

View File

@@ -0,0 +1,14 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@@ -0,0 +1,2 @@
#!/bin/bash
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -e HOST_IP=$1 -e ZK=$2 -i -t wurstmeister/kafka /bin/bash

225
kstream/kstream-word-count/mvnw vendored Executable file
View File

@@ -0,0 +1,225 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

143
kstream/kstream-word-count/mvnw.cmd vendored Normal file
View File

@@ -0,0 +1,143 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

View File

@@ -0,0 +1,53 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kstream.word.count</groupId>
<artifactId>kstream-word-count</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kstream-word-count</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -0,0 +1,114 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kstream.word.count.kstreamwordcount;
import java.util.Arrays;
import java.util.Date;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
@SpringBootApplication
public class KStreamWordCountApplication {
public static void main(String[] args) {
SpringApplication.run(KStreamWordCountApplication.class, args);
}
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts")
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
}
static class WordCount {
private String word;
private long count;
private Date start;
private Date end;
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;
this.start = start;
this.end = end;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public Date getStart() {
return start;
}
public void setStart(Date start) {
this.start = start;
}
public Date getEnd() {
return end;
}
public void setEnd(Date end) {
this.end = end;
}
}
}

View File

@@ -0,0 +1,17 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kstream.binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output:
destination: counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: words
consumer:
headerMode: raw
spring.cloud.stream.kstream.binder:
brokers: 192.168.99.100
zkNodes: 192.168.99.100

View File

@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

22
kstream/pom.xml Normal file
View File

@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-kstream-parent</artifactId>
<packaging>pom</packaging>
<name>spring-cloud-stream-sample-kstream-parent</name>
<description>Parent Project for KStream Samples</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<modules>
<module>kstream-word-count</module>
<module>kstream-product-tracker</module>
<module>kstream-interactive-query</module>
</modules>
</project>

View File

@@ -21,8 +21,8 @@ multiple input/output channels.
* SampleSource - the app that configures two output channels (output1 and output2).
* SampleSink - the app that configures two input channels (input1 and input2).
The channels output1 and input1 connect to the same destination (test1) on the broker (Redis) and the channels output2 and
input2 connect to the same destination (test2) on redis.
The channels output1 and input1 connect to the same destination (test1) on the broker (Rabbit) and the channels output2 and
input2 connect to the same destination (test2) on Rabbit.
For demo purpose, the apps `SampleSource` and `SampleSink` are bundled together. In practice they are separate applications
unless bundled together by the `AggregateApplicationBuilder`.

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -18,13 +18,14 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultipleIOChannelsApplication.class)
@SpringBootTest(classes = MultipleIOChannelsApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -12,7 +12,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -40,24 +40,6 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<classifier>test</classifier>
<version>0.8.2.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
@@ -81,6 +63,11 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -17,8 +17,9 @@
package multibinder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author Marius Bogoevici
@@ -26,7 +27,8 @@ import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding(Processor.class)
public class BridgeTransformer {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object transform(Object payload) {
return payload;
}

View File

@@ -7,6 +7,7 @@ spring:
input:
destination: dataIn
binder: kafka1
group: testGroup
output:
destination: dataOut
binder: kafka2

View File

@@ -16,12 +16,6 @@
package multibinder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import java.util.List;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
@@ -29,23 +23,20 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@@ -53,64 +44,67 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import java.util.UUID;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@SpringBootTest(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class TwoKafkaBindersApplicationTest {
@ClassRule
public static KafkaTestSupport kafkaTestSupport1 = new KafkaTestSupport(true);
public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1);
@ClassRule
public static KafkaTestSupport kafkaTestSupport2 = new KafkaTestSupport(true);
public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1);
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@BeforeClass
public static void setupEnvironment() {
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokerAddress());
System.setProperty("zk1", kafkaTestSupport1.getZkConnectString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokerAddress());
System.setProperty("zk2", kafkaTestSupport2.getZkConnectString());
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString());
System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString());
System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString());
}
@Autowired
private BinderFactory<MessageChannel> binderFactory;
private BinderFactory binderFactory;
@Test
public void contextLoads() {
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1");
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1", MessageChannel.class);
KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(kafka1.getConnectionFactory());
Configuration configuration = (Configuration) directFieldAccessor.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses = configuration.getBrokerAddresses();
Assert.assertThat(brokerAddresses, hasSize(1));
Assert.assertThat(brokerAddresses, contains(BrokerAddress.fromAddress(kafkaTestSupport1.getBrokerAddress())));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2");
DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties");
Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString()));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2", MessageChannel.class);
KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2.getConnectionFactory());
Configuration configuration2 = (Configuration) directFieldAccessor2.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses2 = configuration2.getBrokerAddresses();
Assert.assertThat(brokerAddresses2, hasSize(1));
Assert.assertThat(brokerAddresses2, contains(BrokerAddress.fromAddress(kafkaTestSupport2.getBrokerAddress())));
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties");
Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString()));
}
@Test
public void messagingWorks() {
DirectChannel dataProducer = new DirectChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka1"))
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka1", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
QueueChannel dataConsumer = new QueueChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2")).bindConsumer("dataOut", UUID.randomUUID().toString(),
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2", MessageChannel.class)).bindConsumer("dataOut", UUID.randomUUID().toString(),
dataConsumer, new ExtendedConsumerProperties<>(new KafkaConsumerProperties()));
String testPayload = "testFoo" + UUID.randomUUID().toString();
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(2000);
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -22,35 +22,18 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<scope>test</scope>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -63,6 +46,11 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -17,8 +17,10 @@
package multibinder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author Marius Bogoevici
@@ -26,7 +28,8 @@ import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding(Processor.class)
public class BridgeTransformer {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object transform(Object payload) {
return payload;
}

View File

@@ -6,7 +6,7 @@ spring:
bindings:
input:
destination: dataIn
binder: redis
binder: kafka
output:
destination: dataOut
binder: rabbit

View File

@@ -1,101 +1,106 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder;
import org.springframework.cloud.stream.test.junit.rabbit.RabbitTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class RabbitAndRedisBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@Autowired
private BinderFactory<MessageChannel> binderFactory;
private final String randomGroup = UUID.randomUUID().toString();
@After
public void cleanUp() {
RabbitAdmin admin = new RabbitAdmin(rabbitTestSupport.getResource());
admin.deleteQueue("binder.dataOut.default");
admin.deleteQueue("binder.dataOut." + this.randomGroup);
admin.deleteExchange("binder.dataOut");
}
@Test
public void contextLoads() {
}
@Test
public void messagingWorks() {
DirectChannel dataProducer = new DirectChannel();
((RedisMessageChannelBinder)binderFactory.getBinder("redis"))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new ProducerProperties()));
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder)binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(2000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
}
}
/*
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.rabbit.properties.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import java.util.UUID;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@DirtiesContext
public class RabbitAndKafkaBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test");
private final String randomGroup = UUID.randomUUID().toString();
@After
public void cleanUp() {
RabbitAdmin admin = new RabbitAdmin(rabbitTestSupport.getResource());
admin.deleteQueue("binder.dataOut.default");
admin.deleteQueue("binder.dataOut." + this.randomGroup);
admin.deleteExchange("binder.dataOut");
}
@Test
public void contextLoads() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString());
context.close();
}
@Test
public void messagingWorks() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(),
"--spring.cloud.stream.bindings.input.group=testGroup",
"--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup);
DirectChannel dataProducer = new DirectChannel();
BinderFactory binderFactory = context.getBean(BinderFactory.class);
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit", MessageChannel.class)).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
context.close();
}
}

View File

@@ -0,0 +1,30 @@
Spring Cloud Stream - Non self-contained Aggregate application sample
=============================
In this *Spring Cloud Stream* sample, the application shows how to write a non self-contained aggregate application.
A non self-contained application is the one that has its applications directly bound but either or both the input and output of the application is bound to the external broker.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
## Code Tour
* NonSelfContainedAggregateApplication - the Spring Boot Main Aggregate Application that directly binds `Source` and `Processor` application while the processor application's output is bound to RabbitMQ.
* ProcessorModuleDefinition - the processor application configuration
* SourceModuleDefinition - the source application configuration
## Building with Maven
Build the sample by executing:
>$ mvn clean package
## Running the Sample
To start the non self-contained aggregate application execute the following:
>$ java -jar target/spring-cloud-stream-sample-non-self-contained-aggregate-app-<version>-exec.jar

View File

@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-non-self-contained-aggregate-app</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-non-self-contained-aggregate-app</name>
<description>Demo project for non self contained Aggregate Application</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.NonSelfContainedAggregateApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,27 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.processor;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Marius Bogoevici
*/
@SpringBootApplication
public class ProcessorApplication {
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.processor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
/**
* @author Marius Bogoevici
*/
@EnableBinding(Processor.class)
public class ProcessorModuleDefinition {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Message<?> transform(Message<?> inbound) {
return inbound;
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.source;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Marius Bogoevici
*/
@SpringBootApplication
public class SourceApplication {
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.source;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
/**
* @author Dave Syer
* @author Marius Bogoevici
*/
@EnableBinding(Source.class)
public class SourceModuleDefinition {
private String format = "yyyy-MM-dd HH:mm:ss";
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(this.format).format(new Date()));
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import config.processor.ProcessorApplication;
import config.source.SourceApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
/**
* @author Ilayaperumal Gopinathan
*/
@SpringBootApplication
public class NonSelfContainedAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(NonSelfContainedAggregateApplication.class)
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class).namespace("a").run("--spring.cloud.stream.bindings.output.destination=processor-output");
}
}

View File

@@ -0,0 +1 @@
fixedDelay: 1000

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = NonSelfContainedAggregateApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

226
pom.xml
View File

@@ -1,78 +1,148 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>https://www.spring.io</url>
</organization>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.1.1.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>1.0.2.RELEASE</spring-cloud-stream.version>
<java.version>1.8</java.version>
</properties>
<modules>
<module>source</module>
<module>sink</module>
<module>transform</module>
<module>double</module>
<module>multibinder</module>
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<!--skip deploy (this is just a test module) -->
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>https://www.spring.io</url>
</organization>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.5.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>Ditmars.BUILD-SNAPSHOT</spring-cloud-stream.version>
<java.version>1.8</java.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
</properties>
<modules>
<module>source</module>
<module>dynamic-source</module>
<module>sink</module>
<module>transform</module>
<module>double</module>
<module>non-self-contained-aggregate-app</module>
<module>multibinder</module>
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
<module>reactive-processor-kafka</module>
<module>test-embedded-kafka</module>
<module>kstream</module>
<module>testing</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<version>${spring-kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<!--skip deploy (this is just a test module) -->
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>

View File

View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-stream-samples</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-reactive-processor-kafka</artifactId>
<properties>
<java.version>1.8</java.version>
<start-class>reactive.kafka.ReactiveKafkaProcessorApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,32 @@
package reactive.kafka;
import java.time.Duration;
import reactor.core.publisher.Flux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
@SpringBootApplication
@EnableBinding(Processor.class)
public class ReactiveKafkaProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveKafkaProcessorApplication.class, args);
}
@StreamListener
@Output(Processor.OUTPUT)
public Flux<String> toUpperCase(@Input(Processor.INPUT) Flux<String> inbound) {
return inbound.
log()
.window(Duration.ofSeconds(10), Duration.ofSeconds(5))
.flatMap(w -> w.reduce("", (s1,s2)->s1+s2))
.log();
}
}

View File

@@ -0,0 +1,10 @@
server:
port: 8082
spring:
cloud:
stream:
bindings:
output:
destination: transformed
input:
destination: testtock

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -24,15 +24,24 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-rxjava</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -20,19 +20,22 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import org.springframework.cloud.stream.annotation.rxjava.EnableRxJavaProcessor;
import org.springframework.cloud.stream.annotation.rxjava.RxJavaProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableRxJavaProcessor
@EnableBinding(Processor.class)
public class RxJavaTransformer {
private static Logger logger = LoggerFactory.getLogger(RxJavaTransformer.class);
@Bean
public RxJavaProcessor<String,String> processor() {
return inputStream -> inputStream.map(data -> {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Observable<String> processor(Observable<String> inputStream) {
return inputStream.map(data -> {
logger.info("Got data = " + data);
return data;
}).buffer(5).map(data -> String.valueOf(avg(data)));

View File

@@ -16,20 +16,22 @@
package demo;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = RxJavaApplication.class)
@SpringBootTest(classes = RxJavaApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
@Ignore
public void contextLoads() {
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -5,7 +5,7 @@ spring:
stream:
bindings:
input:
destination: testtock
destination: transformed
# uncomment below to have this module consume from a specific partition
# in the range of 0 to N-1, where N is the upstream module's partitionCount
#consumerProperties:

View File

@@ -16,12 +16,11 @@
package demo;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.Bindings;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
@@ -31,8 +30,10 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import static org.junit.Assert.assertNotNull;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SinkApplication.class)
@SpringBootTest(classes = SinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -18,11 +18,16 @@ This sample is a Spring Boot application that uses Spring Cloud Stream to publis
* SourceApplication - the Spring Boot Main Application
* TimeSource - the module that will generate the timestamp and post the message to the stream
* TimeSourceOptionsMetadata - defines the configurations that are available to setup the TimeSource
* format - how to render the current time, using SimpleDateFormat
* fixedDelay - time delay between messages
* initialDelay - delay before the first message
* timeUnit - the time unit for the fixed and initial delays
* maxMessages - the maximum messages per poll; -1 for unlimited
* format - how to render the current time, using SimpleDateFormat
* fixedDelay - time delay between messages
* initialDelay - delay before the first message
* timeUnit - the time unit for the fixed and initial delays
* maxMessages - the maximum messages per poll; -1 for unlimited
## Building with Maven

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -31,10 +31,6 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -18,13 +18,14 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = SourceApplication.class)
@SpringBootTest(classes = SourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Bar {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -1,101 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.converter.AbstractFromMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.MimeType;
/**
* @author Ilayaperumal Gopinathan
*/
@Configuration
public class Converters {
//Register custom converter
@Bean
public AbstractFromMessageConverter fooConverter() {
return new FooToBarConverter();
}
public static class Foo {
private String value = "foo";
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private String value = "init";
public Bar(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class FooToBarConverter extends AbstractFromMessageConverter {
public FooToBarConverter() {
super(MimeType.valueOf("test/bar"));
}
@Override
protected Class<?>[] supportedTargetTypes() {
return new Class[] {Bar.class};
}
@Override
protected Class<?>[] supportedPayloadTypes() {
return new Class<?>[] {Foo.class};
}
@Override
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object result = null;
try {
if (message.getPayload() instanceof Foo) {
Foo fooPayload = (Foo) message.getPayload();
result = new Bar(fooPayload.getValue());
}
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new MessageConversionException(e.getMessage());
}
return result;
}
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Foo {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -29,11 +29,11 @@ public class SampleSink {
// Sink application definition
@StreamListener(Sink.SAMPLE)
public void receive(Converters.Bar barMessage) {
public void receive(Foo fooMessage) {
System.out.println("******************");
System.out.println("At the Sink");
System.out.println("******************");
System.out.println("Received transformed message " + barMessage.getValue() + " of type " + barMessage.getClass());
System.out.println("Received transformed message " + fooMessage.getValue() + " of type " + fooMessage.getClass());
}
public interface Sink {

View File

@@ -24,7 +24,9 @@ import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author Ilayaperumal Gopinathan
@@ -40,10 +42,9 @@ public class SampleSource {
System.out.println("******************");
System.out.println("At the Source");
System.out.println("******************");
Converters.Foo foo = new Converters.Foo();
foo.setValue("hi");
System.out.println("Sending value: " + foo.getValue() + " of type " + foo.getClass());
return new GenericMessage(foo);
String value = "{\"value\":\"hi\"}";
System.out.println("Sending value: " + value);
return MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
}
};
}

View File

@@ -33,7 +33,7 @@ public class SampleTransformer {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Converters.Bar receive(Converters.Bar barMessage) {
public Bar receive(Bar barMessage) {
System.out.println("******************");
System.out.println("At the transformer");
System.out.println("******************");

View File

@@ -10,5 +10,6 @@ spring:
destination: testtock
output:
destination: xformed
content-type: application/json
sample-sink:
destination: xformed

View File

@@ -18,13 +18,14 @@ package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = TypeConversionApplication.class)
@SpringBootTest(classes = TypeConversionApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {

24
test-embedded-kafka/.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/

View File

@@ -0,0 +1,37 @@
Spring Cloud Stream Testing with Embedded Kafka Broker Sample
=============================
In this *Spring Cloud Stream* sample, an embedded kafka broker is used for testing.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
## Code Tour
This sample is a Spring Boot application that uses Spring Cloud Stream to receive a `String` message and forward an upper case version of that String; it uses the Kafka binder.
* EmbeddedKafkaApplication - the Spring Boot Main Application
* EmbeddedKafkaApplicationTests - the test case
The `spring-kafka-test` dependency added to the `pom.xml` puts the `KafkaEmbedded` JUnit `@Rule` on the class path.
Refer to the [Spring for Apache Kafka Reference Manual](https://docs.spring.io/spring-kafka/reference/htmlsingle/#testing) for more information about this.
Notice how the `@BeforeClass` method sets up the Boot and binder properties to locate the servers.
The test method starts by using the Spring Boot auto-configured `KafkaTemplate` to send a message to the input destination.
It then creates a consumer to consume from the output destination; gets the output message and asserts that it's an upper case version of the sent message.
See the `application.properties` file for the properties that are required for this test case and application.
In particular, `raw` header mode is used.
## Building with Maven
Build the sample and run the test by executing:
source>$ mvn clean package
or run the test in your favorite IDE.

View File

@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-test-embedded-kafka</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-test-embedded-kafka</name>
<description>Demo project for Testing with an embedded Kafka broker</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.EmbeddedKafkaApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,44 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author Gary Russell
*
*/
@SpringBootApplication
@EnableBinding(Processor.class)
public class EmbeddedKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(EmbeddedKafkaApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}
}

View File

@@ -0,0 +1,12 @@
# Test consumer properties
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=testEmbeddedKafkaApplication
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
# Binding properties
spring.cloud.stream.bindings.output.destination=testEmbeddedOut
spring.cloud.stream.bindings.input.destination=testEmbeddedIn
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=embeddedKafkaApplication

View File

@@ -0,0 +1,81 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Test class demonstrating how to use an embedded kafka service with the
* kafka binder.
*
* @author Gary Russell
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class EmbeddedKafkaApplicationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private DefaultKafkaConsumerFactory<byte[], byte[]> consumerFactory;
@Value("${spring.cloud.stream.bindings.input.destination}")
private String inputDestination;
@Value("${spring.cloud.stream.bindings.output.destination}")
private String outputDestination;
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
@Test
public void testSendReceive() {
template.send(this.inputDestination, "foo".getBytes());
Consumer<byte[], byte[]> consumer = this.consumerFactory.createConsumer();
consumer.subscribe(Collections.singleton(this.outputDestination));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}
}

3
testing/README.adoc Normal file
View File

@@ -0,0 +1,3 @@
== How to test Spring Cloud Stream applications?
This project contains a set of tests for simple Spring Cloud Stream applications to demonstrate what, how and when we can use to test this kind of microservices.

55
testing/pom.xml Normal file
View File

@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-testing</artifactId>
<packaging>jar</packaging>
<name>testing</name>
<description>Demonstrating how to test spring cloud stream apps</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.2.0.BUILD-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.processor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* The Spring Cloud Stream Processor application,
* which convert incoming String to its upper case representation.
*
* @author Artem Bilan
*
*/
@SpringBootApplication
@EnableBinding(Processor.class)
public class ToUpperCaseProcessor {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}
public static void main(String[] args) {
SpringApplication.run(ToUpperCaseProcessor.class, args);
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.sink;
import javax.sql.DataSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageHandler;
/**
* The Spring Cloud Stream Sink application,
* which insert payloads of incoming messages to the FOOBAR table in the RDBMS.
*
* @author Artem Bilan
*
*/
@SpringBootApplication
@EnableBinding(Sink.class)
public class JdbcSink {
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler jdbcHandler(DataSource dataSource) {
return new JdbcMessageHandler(dataSource, "INSERT INTO foobar (value) VALUES (:payload)");
}
public static void main(String[] args) {
SpringApplication.run(JdbcSink.class, args);
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.testing.source;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
/**
* The Spring Cloud Stream Source application,
* which generates each 100 milliseconds "foo" or "bar" string in round-robin manner.
*
* @author Artem Bilan
*
*/
@SpringBootApplication
@EnableBinding(Source.class)
public class FooBarSource {
private AtomicBoolean semaphore = new AtomicBoolean(true);
@Bean
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedDelay = "100"))
public MessageSource<String> fooBarStrings() {
return () ->
new GenericMessage<>(this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar");
}
public static void main(String[] args) {
SpringApplication.run(FooBarSource.class, args);
}
}

Some files were not shown because too many files have changed in this diff Show More