diff --git a/drools/pom.xml b/drools/pom.xml
index 17b1e1129d..5f228802fa 100644
--- a/drools/pom.xml
+++ b/drools/pom.xml
@@ -1,68 +1,87 @@
- 4.0.0
-
- drools
-
-
- com.baeldung
- parent-modules
- 1.0.0-SNAPSHOT
-
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
+ drools
+
+ com.baeldung
+ parent-modules
+ 1.0.0-SNAPSHOT
+
4.4.6
7.4.1.Final
3.13
-
-
- org.apache.httpcomponents
- httpcore
- ${http-component-version}
-
-
-
- org.kie
- kie-ci
- ${drools-version}
-
-
- org.drools
- drools-decisiontables
- ${drools-version}
-
+
+
+ org.apache.httpcomponents
+ httpcore
+ ${http-component-version}
+
+
+
+ org.kie
+ kie-ci
+ ${drools-version}
+
+
+ org.drools
+ drools-decisiontables
+ ${drools-version}
+
-
- org.drools
- drools-core
- ${drools-version}
-
-
- org.drools
- drools-compiler
- ${drools-version}
-
-
- org.apache.poi
- poi
- ${apache-poi-version}
-
+
+ org.drools
+ drools-core
+ ${drools-version}
+
+
+ org.drools
+ drools-compiler
+ ${drools-version}
+
+
+ org.apache.poi
+ poi
+ ${apache-poi-version}
+
-
- org.apache.poi
- poi-ooxml
- ${apache-poi-version}
-
+
+ org.apache.poi
+ poi-ooxml
+ ${apache-poi-version}
+
-
- org.springframework
- spring-core
- 4.3.6.RELEASE
-
+
+ org.springframework
+ spring-core
+ 4.3.6.RELEASE
+
-
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+ 3
+ true
+
+ **/*IntegrationTest.java
+ **/*LongRunningUnitTest.java
+ **/*ManualTest.java
+ **/JdbcTest.java
+ **/*LiveTest.java
+
+
+
+
+
diff --git a/spring-cloud/pom.xml b/spring-cloud/pom.xml
index 93bf6ea74b..fd023a5ea5 100644
--- a/spring-cloud/pom.xml
+++ b/spring-cloud/pom.xml
@@ -1,7 +1,7 @@
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung.spring.cloud
@@ -15,8 +15,9 @@
spring-cloud-ribbon-client
spring-cloud-rest
spring-cloud-zookeeper
- spring-cloud-gateway
- spring-cloud-connectors-heroku
+ spring-cloud-gateway
+ spring-cloud-stream
+ spring-cloud-connectors-heroku
pom
@@ -38,6 +39,7 @@
1.2.3.RELEASE
1.2.3.RELEASE
1.2.3.RELEASE
+ 1.3.0.RELEASE
1.4.2.RELEASE
3.6.0
1.4.2.RELEASE
diff --git a/spring-cloud/spring-cloud-stream/pom.xml b/spring-cloud/spring-cloud-stream/pom.xml
new file mode 100644
index 0000000000..5ec24268d9
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/pom.xml
@@ -0,0 +1,71 @@
+
+
+ 4.0.0
+
+ org.baeldung
+ spring-cloud-stream
+ pom
+
+ spring-cloud-stream
+
+
+ com.baeldung.spring.cloud
+ spring-cloud
+ 1.0.0-SNAPSHOT
+
+
+
+ spring-cloud-stream-rabbit
+
+
+
+ UTF-8
+ 3.6.0
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rabbit
+ ${spring-cloud-stream.version}
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+ ${spring-cloud-stream.version}
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ ${spring-cloud-stream.version}
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+ 1.8
+ 1.8
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot-maven-plugin.version}
+
+
+
+
+
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml
new file mode 100644
index 0000000000..a954a7035e
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/pom.xml
@@ -0,0 +1,32 @@
+
+
+ 4.0.0
+
+ spring-cloud-stream-rabbit
+ jar
+
+ spring-cloud-stream-rabbit
+ Simple Spring Cloud Stream
+
+
+ org.baeldung
+ spring-cloud-stream
+ 1.0.0-SNAPSHOT
+ ..
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rabbit
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ test
+
+
+
+
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java
new file mode 100644
index 0000000000..375494dfac
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplication.java
@@ -0,0 +1,38 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@SpringBootApplication
+@EnableBinding(MyProcessor.class)
+public class MultipleOutputsServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MultipleOutputsServiceApplication.class, args);
+ }
+
+ @Autowired
+ private MyProcessor processor;
+
+ @StreamListener(MyProcessor.INPUT)
+ public void routeValues(Integer val) {
+ if (val < 10) {
+ processor.anOutput()
+ .send(message(val));
+ } else {
+ processor.anotherOutput()
+ .send(message(val));
+ }
+ }
+
+ private static final Message message(T val) {
+ return MessageBuilder.withPayload(val)
+ .build();
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java
new file mode 100644
index 0000000000..4729e418b6
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceApplication.java
@@ -0,0 +1,39 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@SpringBootApplication
+@EnableBinding(MyProcessor.class)
+public class MultipleOutputsWithConditionsServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MultipleOutputsWithConditionsServiceApplication.class, args);
+ }
+
+ @Autowired
+ private MyProcessor processor;
+
+ @StreamListener(target = MyProcessor.INPUT, condition = "payload < 10")
+ public void routeValuesToAnOutput(Integer val) {
+ processor.anOutput()
+ .send(message(val));
+ }
+
+ @StreamListener(target = MyProcessor.INPUT, condition = "payload >= 10")
+ public void routeValuesToAnotherOutput(Integer val) {
+ processor.anotherOutput()
+ .send(message(val));
+ }
+
+ private static final Message message(T val) {
+ return MessageBuilder.withPayload(val)
+ .build();
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java
new file mode 100644
index 0000000000..aac551e544
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerServiceApplication.java
@@ -0,0 +1,32 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.handler.annotation.SendTo;
+
+import com.baeldung.spring.cloud.stream.rabbit.messages.TextPlainMessageConverter;
+import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
+
+@SpringBootApplication
+@EnableBinding(Processor.class)
+public class MyLoggerServiceApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MyLoggerServiceApplication.class, args);
+ }
+
+ @Bean
+ public MessageConverter providesTextPlainMessageConverter() {
+ return new TextPlainMessageConverter();
+ }
+
+ @StreamListener(Processor.INPUT)
+ @SendTo(Processor.OUTPUT)
+ public LogMessage enrichLogMessage(LogMessage log) {
+ return new LogMessage(String.format("[1]: %s", log.getMessage()));
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java
new file mode 100644
index 0000000000..d690ee38a9
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/messages/TextPlainMessageConverter.java
@@ -0,0 +1,26 @@
+package com.baeldung.spring.cloud.stream.rabbit.messages;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.converter.AbstractMessageConverter;
+import org.springframework.util.MimeType;
+
+import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
+
+public class TextPlainMessageConverter extends AbstractMessageConverter {
+
+ public TextPlainMessageConverter() {
+ super(new MimeType("text", "plain"));
+ }
+
+ @Override
+ protected boolean supports(Class> clazz) {
+ return (LogMessage.class == clazz);
+ }
+
+ @Override
+ protected Object convertFromInternal(Message> message, Class> targetClass, Object conversionHint) {
+ Object payload = message.getPayload();
+ String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
+ return new LogMessage(text);
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java
new file mode 100644
index 0000000000..44a6ca4d4e
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/model/LogMessage.java
@@ -0,0 +1,32 @@
+package com.baeldung.spring.cloud.stream.rabbit.model;
+
+import java.io.Serializable;
+
+public class LogMessage implements Serializable {
+
+ private static final long serialVersionUID = -5857383701708275796L;
+
+ private String message;
+
+ public LogMessage() {
+
+ }
+
+ public LogMessage(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return message;
+ }
+
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java
new file mode 100644
index 0000000000..563ce06b6f
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/java/com/baeldung/spring/cloud/stream/rabbit/processor/MyProcessor.java
@@ -0,0 +1,19 @@
+package com.baeldung.spring.cloud.stream.rabbit.processor;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+public interface MyProcessor {
+ String INPUT = "myInput";
+
+ @Input
+ SubscribableChannel myInput();
+
+ @Output("myOutput")
+ MessageChannel anOutput();
+
+ @Output
+ MessageChannel anotherOutput();
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml
new file mode 100644
index 0000000000..3d9d97a736
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/main/resources/application.yml
@@ -0,0 +1,28 @@
+spring:
+ cloud:
+ stream:
+ bindings:
+ input:
+ destination: queue.log.messages
+ binder: local_rabbit
+ group: logMessageConsumers
+ output:
+ destination: queue.pretty.log.messages
+ binder: local_rabbit
+ binders:
+ local_rabbit:
+ type: rabbit
+ environment:
+ spring:
+ rabbitmq:
+ host: localhost
+ port: 5672
+ username: guest
+ password: guest
+ virtual-host: /
+server:
+ port: 0
+management:
+ health:
+ binders:
+ enabled: true
\ No newline at end of file
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationIntegrationTest.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationIntegrationTest.java
new file mode 100644
index 0000000000..898d06897f
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsServiceApplicationIntegrationTest.java
@@ -0,0 +1,52 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
+@DirtiesContext
+public class MultipleOutputsServiceApplicationIntegrationTest {
+
+ @Autowired
+ private MyProcessor pipe;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+ @Test
+ public void whenSendMessage_thenResponseIsInAOutput() {
+ whenSendMessage(1);
+ thenPayloadInChannelIs(pipe.anOutput(), 1);
+ }
+
+ @Test
+ public void whenSendMessage_thenResponseIsInAnotherOutput() {
+ whenSendMessage(11);
+ thenPayloadInChannelIs(pipe.anotherOutput(), 11);
+ }
+
+ private void whenSendMessage(Integer val) {
+ pipe.myInput()
+ .send(MessageBuilder.withPayload(val)
+ .build());
+ }
+
+ private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
+ Object payload = messageCollector.forChannel(channel)
+ .poll()
+ .getPayload();
+ assertEquals(expectedValue, payload);
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceIntegrationTest.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceIntegrationTest.java
new file mode 100644
index 0000000000..c3bf5a1205
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MultipleOutputsWithConditionsServiceIntegrationTest.java
@@ -0,0 +1,52 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.baeldung.spring.cloud.stream.rabbit.processor.MyProcessor;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = MultipleOutputsWithConditionsServiceApplication.class)
+@DirtiesContext
+public class MultipleOutputsWithConditionsServiceIntegrationTest {
+
+ @Autowired
+ private MyProcessor pipe;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+ @Test
+ public void whenSendMessage_thenResponseIsInAOutput() {
+ whenSendMessage(1);
+ thenPayloadInChannelIs(pipe.anOutput(), 1);
+ }
+
+ @Test
+ public void whenSendMessage_thenResponseIsInAnotherOutput() {
+ whenSendMessage(11);
+ thenPayloadInChannelIs(pipe.anotherOutput(), 11);
+ }
+
+ private void whenSendMessage(Integer val) {
+ pipe.myInput()
+ .send(MessageBuilder.withPayload(val)
+ .build());
+ }
+
+ private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
+ Object payload = messageCollector.forChannel(channel)
+ .poll()
+ .getPayload();
+ assertEquals(expectedValue, payload);
+ }
+}
diff --git a/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationIntegrationTest.java b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationIntegrationTest.java
new file mode 100644
index 0000000000..21d84e79e0
--- /dev/null
+++ b/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit/src/test/java/com/baeldung/spring/cloud/stream/rabbit/MyLoggerApplicationIntegrationTest.java
@@ -0,0 +1,40 @@
+package com.baeldung.spring.cloud.stream.rabbit;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.baeldung.spring.cloud.stream.rabbit.model.LogMessage;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = MyLoggerServiceApplication.class)
+@DirtiesContext
+public class MyLoggerApplicationIntegrationTest {
+
+ @Autowired
+ private Processor pipe;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+ @Test
+ public void whenSendMessage_thenResponseShouldUpdateText() {
+ pipe.input()
+ .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
+ .build());
+
+ Object payload = messageCollector.forChannel(pipe.output())
+ .poll()
+ .getPayload();
+
+ assertEquals("[1]: This is my message", payload.toString());
+ }
+}