diff --git a/multi-io/README.md b/multi-io/README.md new file mode 100644 index 0000000..2d89262 --- /dev/null +++ b/multi-io/README.md @@ -0,0 +1,40 @@ +Spring Cloud Stream Stream Listener Sample +============================= + +In this *Spring Cloud Stream* sample, the application shows how to use configure multiple input/output channels inside +a single application. + +## Requirements + +To run this sample, you will need to have installed: + +* Java 8 or Above + +This example requires Redis to be running on localhost. + +## Code Tour + +This sample is a Spring Boot application that bundles multiple application together to showcase how to configure +multiple input/output channels. + +* MultipleIOChannelsApplication - the Spring Boot Main Application +* SampleSource - the app that configures two output channels (output1 and output2). +* SampleSink - the app that configures two input channels (input1 and input2). + +The channels output1 and input1 connect to the same destination (test1) on the broker (Redis) and the channels output2 and +input2 connect to the same destination (test2) on redis. +For demo purpose, the apps `SampleSource` and `SampleSink` are bundled together. In practice they are separate applications +unless bundled together by the `AggregateApplicationBuilder`. + +## Building with Maven + +Build the sample by executing: + + >$ mvn clean package + +## Running the Sample + +To start the source module execute the following: + + >$ java -jar target/spring-cloud-stream-sample-multi-io-1.0.0.BUILD-SNAPSHOT-exec.jar + diff --git a/multi-io/pom.xml b/multi-io/pom.xml new file mode 100644 index 0000000..b10a3b0 --- /dev/null +++ b/multi-io/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + spring-cloud-stream-sample-multi-io + jar + + spring-cloud-stream-sample-multi-io + Demo project for multiple input/output channels binding + + + org.springframework.cloud + spring-cloud-stream-samples + 1.0.0.BUILD-SNAPSHOT + + + + demo.MultipleIOChannelsApplication + + + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.cloud + spring-cloud-stream-binder-redis + + + org.springframework.boot + spring-boot-starter-redis + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + exec + + + + + + diff --git a/multi-io/src/main/java/demo/MultipleIOChannelsApplication.java b/multi-io/src/main/java/demo/MultipleIOChannelsApplication.java new file mode 100644 index 0000000..bee0236 --- /dev/null +++ b/multi-io/src/main/java/demo/MultipleIOChannelsApplication.java @@ -0,0 +1,29 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package demo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MultipleIOChannelsApplication { + + public static void main(String[] args) { + SpringApplication.run(MultipleIOChannelsApplication.class, args); + } + +} diff --git a/multi-io/src/main/java/demo/SampleSink.java b/multi-io/src/main/java/demo/SampleSink.java new file mode 100644 index 0000000..b61b5ee --- /dev/null +++ b/multi-io/src/main/java/demo/SampleSink.java @@ -0,0 +1,57 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package demo; + +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.SubscribableChannel; + +/** + * @author Ilayaperumal Gopinathan + */ +@EnableBinding(SampleSink.MultiInputSink.class) +public class SampleSink { + + @StreamListener(MultiInputSink.INPUT1) + public synchronized void receive1(String message) { + System.out.println("******************"); + System.out.println("At Sink1"); + System.out.println("******************"); + System.out.println("Received message " + message); + } + + @StreamListener(MultiInputSink.INPUT2) + public synchronized void receive2(String message) { + System.out.println("******************"); + System.out.println("At Sink2"); + System.out.println("******************"); + System.out.println("Received message " + message); + } + + public interface MultiInputSink { + String INPUT1 = "input1"; + + String INPUT2 = "input2"; + + @Input(INPUT1) + SubscribableChannel input1(); + + @Input(INPUT2) + SubscribableChannel input2(); + } +} diff --git a/multi-io/src/main/java/demo/SampleSource.java b/multi-io/src/main/java/demo/SampleSource.java new file mode 100644 index 0000000..238d02e --- /dev/null +++ b/multi-io/src/main/java/demo/SampleSource.java @@ -0,0 +1,76 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package demo; + +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.core.MessageSource; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +/** + * @author Ilayaperumal Gopinathan + */ +@EnableBinding(SampleSource.MultiOutputSource.class) +public class SampleSource { + + @Bean + @InboundChannelAdapter(value = MultiOutputSource.OUTPUT1, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) + public synchronized MessageSource messageSource1() { + return new MessageSource() { + public Message receive() { + String message = "FromSource1"; + System.out.println("******************"); + System.out.println("From Source1"); + System.out.println("******************"); + System.out.println("Sending value: " + message); + return new GenericMessage(message); + } + }; + } + + @Bean + @InboundChannelAdapter(value = MultiOutputSource.OUTPUT2, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) + public synchronized MessageSource timerMessageSource() { + return new MessageSource() { + public Message receive() { + String message = "FromSource2"; + System.out.println("******************"); + System.out.println("From Source2"); + System.out.println("******************"); + System.out.println("Sending value: " + message); + return new GenericMessage(message); + } + }; + } + + public interface MultiOutputSource { + String OUTPUT1 = "output1"; + + String OUTPUT2 = "output2"; + + @Output(OUTPUT1) + MessageChannel output1(); + + @Output(OUTPUT2) + MessageChannel output2(); + } +} diff --git a/multi-io/src/main/resources/application.yml b/multi-io/src/main/resources/application.yml new file mode 100644 index 0000000..7c0c973 --- /dev/null +++ b/multi-io/src/main/resources/application.yml @@ -0,0 +1,14 @@ +server: + port: 8082 +spring: + cloud: + stream: + bindings: + output1: + destination: test1 + output2: + destination: test2 + input1: + destination: test1 + input2: + destination: test2 diff --git a/multi-io/src/test/java/demo/ModuleApplicationTests.java b/multi-io/src/test/java/demo/ModuleApplicationTests.java new file mode 100644 index 0000000..463a2ab --- /dev/null +++ b/multi-io/src/test/java/demo/ModuleApplicationTests.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package demo; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.web.WebAppConfiguration; +import org.springframework.boot.test.SpringApplicationConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringApplicationConfiguration(classes = MultipleIOChannelsApplication.class) +@WebAppConfiguration +@DirtiesContext +public class ModuleApplicationTests { + + @Test + public void contextLoads() { + } + +} diff --git a/pom.xml b/pom.xml index d585b4e..11da0fe 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,6 @@ 4.0.0 - spring-cloud-stream-samples pom https://github.com/spring-cloud/spring-cloud-stream-samples @@ -26,6 +25,7 @@ multibinder multibinder-differentsystems rxjava-processor + multi-io