Compare commits

...

10 Commits

Author SHA1 Message Date
Spring Operator
65563dd4af URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* [ ] http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* [ ] http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 26 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 13 occurrences
2019-04-24 12:52:36 -04:00
Spring Operator
95d61b1c8e URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://maven.apache.org/xsd/maven-4.0.0.xsd with 13 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 2 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://www.spring.io with 1 occurrences migrated to:
  https://www.spring.io ([https](https://www.spring.io) result 301).
* http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* http://repo.spring.io/libs-release-local with 1 occurrences migrated to:
  https://repo.spring.io/libs-release-local ([https](https://repo.spring.io/libs-release-local) result 302).
* http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 26 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 13 occurrences
2019-03-21 15:20:40 -04:00
Spring Operator
4ca25898e4 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 49 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-21 15:18:21 -04:00
Marius Bogoevici
c7896ceca9 Remove unnecessary binder dependency 2017-04-20 20:15:26 -04:00
Ilayaperumal Gopinathan
aa17d3c50b Add non self-contained aggregate application
Resolves #30
2017-03-20 11:38:34 +00:00
Marius Bogoevici
00664fe229 Update dependency versions 2017-03-06 22:35:49 -05:00
Marius Bogoevici
e41b7e1db7 Updated RxJava transformer to 1.1.0 2017-03-06 22:35:49 -05:00
Ilayaperumal Gopinathan
d55da10ffc Sample to showcase dynamic destination binding
Resolves #28
2017-02-23 14:01:03 +00:00
Jannik Weichert
32b6cfe93e Create nice bullet points for available properties 2017-02-04 17:41:14 +01:00
Marius Bogoevici
0b00a2361d Update to Spring Cloud Stream Brooklyn.BUILD-SNAPSHOT 2016-08-25 22:57:38 -04:00
77 changed files with 977 additions and 278 deletions

1
.gitignore vendored
View File

@@ -13,6 +13,7 @@ _site/
.settings
.springBeans
.DS_Store
*/.idea
*.sw*
*.iml
*.ipr

View File

@@ -21,7 +21,7 @@
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -29,7 +29,7 @@
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -37,7 +37,7 @@
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -47,7 +47,7 @@
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -55,7 +55,7 @@
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>

View File

@@ -1,6 +1,6 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
@@ -192,7 +192,7 @@
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-double</artifactId>
@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -35,10 +35,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,10 +29,10 @@ import org.springframework.context.ConfigurableApplicationContext;
public class DoubleApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder().
from(SourceApplication.class).args("--fixedDelay=5000")
new AggregateApplicationBuilder(DoubleApplication.class, args)
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class)
.to(SinkApplication.class).args("--debug=true").run("--spring.application.name=aggregate-test");
.to(SinkApplication.class).args("--debug=true").run();
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

43
dynamic-source/README.md Normal file
View File

@@ -0,0 +1,43 @@
Spring Cloud Stream Source Sample
=============================
In this *Spring Cloud Stream* sample, a source application publishes messages to dynamically created destinations.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
This example requires RabbitMQ to be running on localhost.
## Code Tour
The class `SourceWithDynamicDestination` is a REST controller that registers the 'POST' request mapping for '/'.
When a payload is sent to 'http://localhost:8080/' by a POST request (port 8080 is the default), this application
then uses `BinderAwareChannelResolver` to resolve the destination dynamically at runtime. Currently, this resolver uses
`payload` as the SpEL expression to resolve the destination name. Hence, if a payload `testing` is sent to the app, then
this source application sends the message `testing` into the Rabbit exchange `testing`. This exchange or topic (in case
of Kafka if Kafka binder is used) is created dynamically and bound to send the payload.
Upon starting the application on the default port 8080, if the following data are sent:
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-2","bill-pay":"150"}' http://localhost:8080
The destinations 'customerId-1' and 'customerId-2' are created at the broker (for example: exchange in case of Rabbit or topic in case of Kafka with the names 'customerId-1' and 'customerId-2') and the data are published to the appropriate destinations dynamically.
## Building with Maven
Build the sample by executing:
source>$ mvn clean package
## Running the Sample
To start the source module execute the following:
source>$ java -jar target/spring-cloud-stream-sample-dynamic-source-1.1.0.BUILD-SNAPSHOT-exec.jar

62
dynamic-source/pom.xml Normal file
View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-dynamic-source</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-dynamic-source</name>
<description>Demo project for source module</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.SourceApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import java.util.Collections;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.dsl.support.FunctionExpression;
import org.springframework.integration.router.AbstractMappingMessageRouter;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import static org.springframework.web.bind.annotation.RequestMethod.POST;
/**
* @author Ilayaperumal Gopinathan
*/
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@Autowired
@Qualifier("sourceChannel")
private MessageChannel localChannel;
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
localChannel.send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "sourceChannel")
public MessageChannel localChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-multi-io</artifactId>
@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-multibinder-differentsystems</artifactId>
@@ -12,7 +12,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -42,14 +42,14 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<version>0.8.2.1</version>
<version>0.9.0.1</version>
<scope>test</scope>
<exclusions>
<exclusion>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,6 @@
package multibinder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import java.util.List;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
@@ -32,7 +28,9 @@ import org.junit.runner.RunWith;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -40,12 +38,10 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@@ -53,64 +49,66 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@SpringBootTest(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class TwoKafkaBindersApplicationTest {
@ClassRule
public static KafkaTestSupport kafkaTestSupport1 = new KafkaTestSupport(true);
public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1);
@ClassRule
public static KafkaTestSupport kafkaTestSupport2 = new KafkaTestSupport(true);
public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1);
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@BeforeClass
public static void setupEnvironment() {
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokerAddress());
System.setProperty("zk1", kafkaTestSupport1.getZkConnectString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokerAddress());
System.setProperty("zk2", kafkaTestSupport2.getZkConnectString());
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString());
System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString());
System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString());
}
@Autowired
private BinderFactory<MessageChannel> binderFactory;
private BinderFactory binderFactory;
@Test
public void contextLoads() {
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1");
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1", MessageChannel.class);
KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(kafka1.getConnectionFactory());
Configuration configuration = (Configuration) directFieldAccessor.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses = configuration.getBrokerAddresses();
Assert.assertThat(brokerAddresses, hasSize(1));
Assert.assertThat(brokerAddresses, contains(BrokerAddress.fromAddress(kafkaTestSupport1.getBrokerAddress())));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2");
DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties");
Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString()));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2", MessageChannel.class);
KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2.getConnectionFactory());
Configuration configuration2 = (Configuration) directFieldAccessor2.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses2 = configuration2.getBrokerAddresses();
Assert.assertThat(brokerAddresses2, hasSize(1));
Assert.assertThat(brokerAddresses2, contains(BrokerAddress.fromAddress(kafkaTestSupport2.getBrokerAddress())));
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties");
Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString()));
}
@Test
public void messagingWorks() {
DirectChannel dataProducer = new DirectChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka1"))
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka1", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
QueueChannel dataConsumer = new QueueChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2")).bindConsumer("dataOut", UUID.randomUUID().toString(),
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2", MessageChannel.class)).bindConsumer("dataOut", UUID.randomUUID().toString(),
dataConsumer, new ExtendedConsumerProperties<>(new KafkaConsumerProperties()));
String testPayload = "testFoo" + UUID.randomUUID().toString();
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(2000);
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
}

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-multibinder</artifactId>
@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -34,23 +34,23 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -6,7 +6,7 @@ spring:
bindings:
input:
destination: dataIn
binder: redis
binder: kafka
output:
destination: dataOut
binder: rabbit

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,47 +24,38 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder;
import org.springframework.cloud.stream.test.junit.rabbit.RabbitTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class RabbitAndRedisBinderApplicationTests {
public class RabbitAndKafkaBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@Autowired
private BinderFactory<MessageChannel> binderFactory;
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test");
private final String randomGroup = UUID.randomUUID().toString();
@@ -77,25 +68,39 @@ public class RabbitAndRedisBinderApplicationTests {
}
@Test
public void contextLoads() {
public void contextLoads() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString());
context.close();
}
@Test
public void messagingWorks() {
public void messagingWorks() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(),
"--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup);
DirectChannel dataProducer = new DirectChannel();
((RedisMessageChannelBinder)binderFactory.getBinder("redis"))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new ProducerProperties()));
BinderFactory binderFactory = context.getBean(BinderFactory.class);
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder)binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup,
((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit", MessageChannel.class)).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(2000);
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
context.close();
}
}

2
mvnw vendored
View File

@@ -8,7 +8,7 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an

2
mvnw.cmd vendored
View File

@@ -7,7 +7,7 @@
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an

View File

@@ -0,0 +1,30 @@
Spring Cloud Stream - Non self-contained Aggregate application sample
=============================
In this *Spring Cloud Stream* sample, the application shows how to write a non self-contained aggregate application.
A non self-contained application is the one that has its applications directly bound but either or both the input and output of the application is bound to the external broker.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
## Code Tour
* NonSelfContainedAggregateApplication - the Spring Boot Main Aggregate Application that directly binds `Source` and `Processor` application while the processor application's output is bound to RabbitMQ.
* ProcessorModuleDefinition - the processor application configuration
* SourceModuleDefinition - the source application configuration
## Building with Maven
Build the sample by executing:
>$ mvn clean package
## Running the Sample
To start the non self-contained aggregate application execute the following:
>$ java -jar target/spring-cloud-stream-sample-non-self-contained-aggregate-app-<version>-exec.jar

View File

@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-non-self-contained-aggregate-app</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-non-self-contained-aggregate-app</name>
<description>Demo project for non self contained Aggregate Application</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.NonSelfContainedAggregateApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,27 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.processor;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Marius Bogoevici
*/
@SpringBootApplication
public class ProcessorApplication {
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.processor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
/**
* @author Marius Bogoevici
*/
@EnableBinding(Processor.class)
public class ProcessorModuleDefinition {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Message<?> transform(Message<?> inbound) {
return inbound;
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.source;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Marius Bogoevici
*/
@SpringBootApplication
public class SourceApplication {
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.source;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
/**
* @author Dave Syer
* @author Marius Bogoevici
*/
@EnableBinding(Source.class)
public class SourceModuleDefinition {
private String format = "yyyy-MM-dd HH:mm:ss";
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(this.format).format(new Date()));
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import config.processor.ProcessorApplication;
import config.source.SourceApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
/**
* @author Ilayaperumal Gopinathan
*/
@SpringBootApplication
public class NonSelfContainedAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(NonSelfContainedAggregateApplication.class)
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class).namespace("a").run("--spring.cloud.stream.bindings.output.destination=processor-output");
}
}

View File

@@ -0,0 +1 @@
fixedDelay: 1000

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = NonSelfContainedAggregateApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

86
pom.xml
View File

@@ -1,23 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
<url>https://www.spring.io</url>
</organization>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.1.1.RELEASE</version>
<version>1.2.0.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>1.0.2.RELEASE</spring-cloud-stream.version>
<spring-cloud-stream.version>Brooklyn.SR3</spring-cloud-stream.version>
<java.version>1.8</java.version>
</properties>
<modules>
@@ -25,11 +25,13 @@
<module>sink</module>
<module>transform</module>
<module>double</module>
<module>non-self-contained-aggregate-app</module>
<module>multibinder</module>
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
<module>reactive-processor-kafka</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -43,22 +45,17 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring-cloud-stream.version}</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -75,4 +72,67 @@
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>

View File

View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-stream-samples</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>reactive-processor-kafka</artifactId>
<properties>
<java.version>1.8</java.version>
<start-class>reactive.kafka.ReactiveKafkaProcessorApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,32 @@
package reactive.kafka;
import java.time.Duration;
import reactor.core.publisher.Flux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
@SpringBootApplication
@EnableBinding(Processor.class)
public class ReactiveKafkaProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveKafkaProcessorApplication.class, args);
}
@StreamListener
@Output(Processor.OUTPUT)
public Flux<String> toUpperCase(@Input(Processor.INPUT) Flux<String> inbound) {
return inbound.
log()
.window(Duration.ofSeconds(10), Duration.ofSeconds(5))
.flatMap(w -> w.reduce("", (s1,s2)->s1+s2))
.log();
}
}

View File

@@ -0,0 +1,10 @@
server:
port: 8082
spring:
cloud:
stream:
bindings:
output:
destination: transformed
input:
destination: testtock

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-rxjava</artifactId>
@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -24,12 +24,25 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-rxjava</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,19 +20,22 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import org.springframework.cloud.stream.annotation.rxjava.EnableRxJavaProcessor;
import org.springframework.cloud.stream.annotation.rxjava.RxJavaProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableRxJavaProcessor
@EnableBinding(Processor.class)
public class RxJavaTransformer {
private static Logger logger = LoggerFactory.getLogger(RxJavaTransformer.class);
@Bean
public RxJavaProcessor<String,String> processor() {
return inputStream -> inputStream.map(data -> {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Observable<String> processor(Observable<String> inputStream) {
return inputStream.map(data -> {
logger.info("Got data = " + data);
return data;
}).buffer(5).map(data -> String.valueOf(avg(data)));

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@ spring:
stream:
bindings:
input:
destination: testtock
destination: transformed
# uncomment below to have this module consume from a specific partition
# in the range of 0 to N-1, where N is the upstream module's partitionCount
#consumerProperties:

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -18,11 +18,16 @@ This sample is a Spring Boot application that uses Spring Cloud Stream to publis
* SourceApplication - the Spring Boot Main Application
* TimeSource - the module that will generate the timestamp and post the message to the stream
* TimeSourceOptionsMetadata - defines the configurations that are available to setup the TimeSource
* format - how to render the current time, using SimpleDateFormat
* fixedDelay - time delay between messages
* initialDelay - delay before the first message
* timeUnit - the time unit for the fixed and initial delays
* maxMessages - the maximum messages per poll; -1 for unlimited
* format - how to render the current time, using SimpleDateFormat
* fixedDelay - time delay between messages
* initialDelay - delay before the first message
* timeUnit - the time unit for the fixed and initial delays
* maxMessages - the maximum messages per poll; -1 for unlimited
## Building with Maven

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-source</artifactId>
@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -31,10 +31,6 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-stream-listener</artifactId>
@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Bar {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -1,101 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.converter.AbstractFromMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.MimeType;
/**
* @author Ilayaperumal Gopinathan
*/
@Configuration
public class Converters {
//Register custom converter
@Bean
public AbstractFromMessageConverter fooConverter() {
return new FooToBarConverter();
}
public static class Foo {
private String value = "foo";
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private String value = "init";
public Bar(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class FooToBarConverter extends AbstractFromMessageConverter {
public FooToBarConverter() {
super(MimeType.valueOf("test/bar"));
}
@Override
protected Class<?>[] supportedTargetTypes() {
return new Class[] {Bar.class};
}
@Override
protected Class<?>[] supportedPayloadTypes() {
return new Class<?>[] {Foo.class};
}
@Override
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object result = null;
try {
if (message.getPayload() instanceof Foo) {
Foo fooPayload = (Foo) message.getPayload();
result = new Bar(fooPayload.getValue());
}
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new MessageConversionException(e.getMessage());
}
return result;
}
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Foo {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -29,11 +29,11 @@ public class SampleSink {
// Sink application definition
@StreamListener(Sink.SAMPLE)
public void receive(Converters.Bar barMessage) {
public void receive(Foo fooMessage) {
System.out.println("******************");
System.out.println("At the Sink");
System.out.println("******************");
System.out.println("Received transformed message " + barMessage.getValue() + " of type " + barMessage.getClass());
System.out.println("Received transformed message " + fooMessage.getValue() + " of type " + fooMessage.getClass());
}
public interface Sink {

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,9 @@ import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author Ilayaperumal Gopinathan
@@ -40,10 +42,9 @@ public class SampleSource {
System.out.println("******************");
System.out.println("At the Source");
System.out.println("******************");
Converters.Foo foo = new Converters.Foo();
foo.setValue("hi");
System.out.println("Sending value: " + foo.getValue() + " of type " + foo.getClass());
return new GenericMessage(foo);
String value = "{\"value\":\"hi\"}";
System.out.println("Sending value: " + value);
return MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
}
};
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -33,7 +33,7 @@ public class SampleTransformer {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Converters.Bar receive(Converters.Bar barMessage) {
public Bar receive(Bar barMessage) {
System.out.println("******************");
System.out.println("At the transformer");
System.out.println("******************");

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -10,5 +10,6 @@ spring:
destination: testtock
output:
destination: xformed
content-type: application/json
sample-sink:
destination: xformed

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,