Compare commits

..

3 Commits

Author SHA1 Message Date
Spring Operator
56b628a162 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* [ ] http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* [ ] http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 20 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 10 occurrences
2019-04-24 12:52:50 -04:00
Spring Operator
3400e51d17 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://maven.apache.org/xsd/maven-4.0.0.xsd with 10 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 2 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://www.spring.io with 1 occurrences migrated to:
  https://www.spring.io ([https](https://www.spring.io) result 301).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 20 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 10 occurrences
2019-03-21 15:20:53 -04:00
Spring Operator
0e9b625b0d URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 39 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-21 15:18:44 -04:00
43 changed files with 221 additions and 920 deletions

1
.gitignore vendored
View File

@@ -13,7 +13,6 @@ _site/
.settings
.springBeans
.DS_Store
*/.idea
*.sw*
*.iml
*.ipr

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -35,6 +35,10 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -29,10 +29,10 @@ import org.springframework.context.ConfigurableApplicationContext;
public class DoubleApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(DoubleApplication.class, args)
.from(SourceApplication.class).args("--fixedDelay=5000")
new AggregateApplicationBuilder().
from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class)
.to(SinkApplication.class).args("--debug=true").run();
.to(SinkApplication.class).args("--debug=true").run("--spring.application.name=aggregate-test");
}
}

View File

@@ -1,43 +0,0 @@
Spring Cloud Stream Source Sample
=============================
In this *Spring Cloud Stream* sample, a source application publishes messages to dynamically created destinations.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
This example requires RabbitMQ to be running on localhost.
## Code Tour
The class `SourceWithDynamicDestination` is a REST controller that registers the 'POST' request mapping for '/'.
When a payload is sent to 'http://localhost:8080/' by a POST request (port 8080 is the default), this application
then uses `BinderAwareChannelResolver` to resolve the destination dynamically at runtime. Currently, this resolver uses
`payload` as the SpEL expression to resolve the destination name. Hence, if a payload `testing` is sent to the app, then
this source application sends the message `testing` into the Rabbit exchange `testing`. This exchange or topic (in case
of Kafka if Kafka binder is used) is created dynamically and bound to send the payload.
Upon starting the application on the default port 8080, if the following data are sent:
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-2","bill-pay":"150"}' http://localhost:8080
The destinations 'customerId-1' and 'customerId-2' are created at the broker (for example: exchange in case of Rabbit or topic in case of Kafka with the names 'customerId-1' and 'customerId-2') and the data are published to the appropriate destinations dynamically.
## Building with Maven
Build the sample by executing:
source>$ mvn clean package
## Running the Sample
To start the source module execute the following:
source>$ java -jar target/spring-cloud-stream-sample-dynamic-source-1.1.0.BUILD-SNAPSHOT-exec.jar

View File

@@ -1,62 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-dynamic-source</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-dynamic-source</name>
<description>Demo project for source module</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.SourceApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,29 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}

View File

@@ -1,87 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import java.util.Collections;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.dsl.support.FunctionExpression;
import org.springframework.integration.router.AbstractMappingMessageRouter;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import static org.springframework.web.bind.annotation.RequestMethod.POST;
/**
* @author Ilayaperumal Gopinathan
*/
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@Autowired
@Qualifier("sourceChannel")
private MessageChannel localChannel;
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
localChannel.send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "sourceChannel")
public MessageChannel localChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "sourceChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SourceApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -12,7 +12,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -42,14 +42,14 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.10</artifactId>
<classifier>test</classifier>
<version>0.9.0.1</version>
<version>0.8.2.1</version>
<scope>test</scope>
<exclusions>
<exclusion>

View File

@@ -16,6 +16,10 @@
package multibinder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import java.util.List;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
@@ -28,9 +32,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -38,10 +40,12 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@@ -49,66 +53,64 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MultibinderApplication.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class TwoKafkaBindersApplicationTest {
@ClassRule
public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1);
public static KafkaTestSupport kafkaTestSupport1 = new KafkaTestSupport(true);
@ClassRule
public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1);
public static KafkaTestSupport kafkaTestSupport2 = new KafkaTestSupport(true);
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@BeforeClass
public static void setupEnvironment() {
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString());
System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString());
System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString());
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokerAddress());
System.setProperty("zk1", kafkaTestSupport1.getZkConnectString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokerAddress());
System.setProperty("zk2", kafkaTestSupport2.getZkConnectString());
}
@Autowired
private BinderFactory binderFactory;
private BinderFactory<MessageChannel> binderFactory;
@Test
public void contextLoads() {
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1", MessageChannel.class);
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1");
KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties");
Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString()));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2", MessageChannel.class);
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(kafka1.getConnectionFactory());
Configuration configuration = (Configuration) directFieldAccessor.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses = configuration.getBrokerAddresses();
Assert.assertThat(brokerAddresses, hasSize(1));
Assert.assertThat(brokerAddresses, contains(BrokerAddress.fromAddress(kafkaTestSupport1.getBrokerAddress())));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2");
KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties");
Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString()));
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2.getConnectionFactory());
Configuration configuration2 = (Configuration) directFieldAccessor2.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses2 = configuration2.getBrokerAddresses();
Assert.assertThat(brokerAddresses2, hasSize(1));
Assert.assertThat(brokerAddresses2, contains(BrokerAddress.fromAddress(kafkaTestSupport2.getBrokerAddress())));
}
@Test
public void messagingWorks() {
DirectChannel dataProducer = new DirectChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka1", MessageChannel.class))
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka1"))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
QueueChannel dataConsumer = new QueueChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2", MessageChannel.class)).bindConsumer("dataOut", UUID.randomUUID().toString(),
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2")).bindConsumer("dataOut", UUID.randomUUID().toString(),
dataConsumer, new ExtendedConsumerProperties<>(new KafkaConsumerProperties()));
String testPayload = "testFoo" + UUID.randomUUID().toString();
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(60_000);
Message<?> receive = dataConsumer.receive(2000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -34,23 +34,23 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit-test-support</artifactId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -6,7 +6,7 @@ spring:
bindings:
input:
destination: dataIn
binder: kafka
binder: redis
output:
destination: dataOut
binder: rabbit

View File

@@ -24,38 +24,47 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.SpringApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder;
import org.springframework.cloud.stream.test.junit.rabbit.RabbitTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class RabbitAndKafkaBinderApplicationTests {
public class RabbitAndRedisBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test");
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@Autowired
private BinderFactory<MessageChannel> binderFactory;
private final String randomGroup = UUID.randomUUID().toString();
@@ -68,39 +77,25 @@ public class RabbitAndKafkaBinderApplicationTests {
}
@Test
public void contextLoads() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString());
context.close();
public void contextLoads() {
}
@Test
public void messagingWorks() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(),
"--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup);
public void messagingWorks() {
DirectChannel dataProducer = new DirectChannel();
BinderFactory binderFactory = context.getBean(BinderFactory.class);
((RedisMessageChannelBinder)binderFactory.getBinder("redis"))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new ProducerProperties()));
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit", MessageChannel.class)).bindConsumer("dataOut", this.randomGroup,
((RabbitMessageChannelBinder)binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka", MessageChannel.class))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(60_000);
Message<?> receive = dataConsumer.receive(2000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
context.close();
}
}

View File

@@ -1,30 +0,0 @@
Spring Cloud Stream - Non self-contained Aggregate application sample
=============================
In this *Spring Cloud Stream* sample, the application shows how to write a non self-contained aggregate application.
A non self-contained application is the one that has its applications directly bound but either or both the input and output of the application is bound to the external broker.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
## Code Tour
* NonSelfContainedAggregateApplication - the Spring Boot Main Aggregate Application that directly binds `Source` and `Processor` application while the processor application's output is bound to RabbitMQ.
* ProcessorModuleDefinition - the processor application configuration
* SourceModuleDefinition - the source application configuration
## Building with Maven
Build the sample by executing:
>$ mvn clean package
## Running the Sample
To start the non self-contained aggregate application execute the following:
>$ java -jar target/spring-cloud-stream-sample-non-self-contained-aggregate-app-<version>-exec.jar

View File

@@ -1,56 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-non-self-contained-aggregate-app</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-non-self-contained-aggregate-app</name>
<description>Demo project for non self contained Aggregate Application</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.NonSelfContainedAggregateApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,27 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.processor;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Marius Bogoevici
*/
@SpringBootApplication
public class ProcessorApplication {
}

View File

@@ -1,34 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.processor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
/**
* @author Marius Bogoevici
*/
@EnableBinding(Processor.class)
public class ProcessorModuleDefinition {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Message<?> transform(Message<?> inbound) {
return inbound;
}
}

View File

@@ -1,26 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.source;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Marius Bogoevici
*/
@SpringBootApplication
public class SourceApplication {
}

View File

@@ -1,45 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config.source;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
/**
* @author Dave Syer
* @author Marius Bogoevici
*/
@EnableBinding(Source.class)
public class SourceModuleDefinition {
private String format = "yyyy-MM-dd HH:mm:ss";
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(this.format).format(new Date()));
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import config.processor.ProcessorApplication;
import config.source.SourceApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
/**
* @author Ilayaperumal Gopinathan
*/
@SpringBootApplication
public class NonSelfContainedAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(NonSelfContainedAggregateApplication.class)
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class).namespace("a").run("--spring.cloud.stream.bindings.output.destination=processor-output");
}
}

View File

@@ -1 +0,0 @@
fixedDelay: 1000

View File

@@ -1,37 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = NonSelfContainedAggregateApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

82
pom.xml
View File

@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
@@ -13,11 +13,11 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.2.0.RELEASE</version>
<version>1.1.1.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>Brooklyn.SR3</spring-cloud-stream.version>
<spring-cloud-stream.version>1.0.2.RELEASE</spring-cloud-stream.version>
<java.version>1.8</java.version>
</properties>
<modules>
@@ -25,13 +25,11 @@
<module>sink</module>
<module>transform</module>
<module>double</module>
<module>non-self-contained-aggregate-app</module>
<module>multibinder</module>
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
<module>reactive-processor-kafka</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -45,17 +43,22 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -72,67 +75,4 @@
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>

View File

@@ -1,62 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-stream-samples</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>reactive-processor-kafka</artifactId>
<properties>
<java.version>1.8</java.version>
<start-class>reactive.kafka.ReactiveKafkaProcessorApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,32 +0,0 @@
package reactive.kafka;
import java.time.Duration;
import reactor.core.publisher.Flux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
@SpringBootApplication
@EnableBinding(Processor.class)
public class ReactiveKafkaProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveKafkaProcessorApplication.class, args);
}
@StreamListener
@Output(Processor.OUTPUT)
public Flux<String> toUpperCase(@Input(Processor.INPUT) Flux<String> inbound) {
return inbound.
log()
.window(Duration.ofSeconds(10), Duration.ofSeconds(5))
.flatMap(w -> w.reduce("", (s1,s2)->s1+s2))
.log();
}
}

View File

@@ -1,10 +0,0 @@
server:
port: 8082
spring:
cloud:
stream:
bindings:
output:
destination: transformed
input:
destination: testtock

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -24,25 +24,12 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-rxjava</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>

View File

@@ -20,22 +20,19 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.cloud.stream.annotation.rxjava.EnableRxJavaProcessor;
import org.springframework.cloud.stream.annotation.rxjava.RxJavaProcessor;
import org.springframework.context.annotation.Bean;
@EnableBinding(Processor.class)
@EnableRxJavaProcessor
public class RxJavaTransformer {
private static Logger logger = LoggerFactory.getLogger(RxJavaTransformer.class);
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Observable<String> processor(Observable<String> inputStream) {
return inputStream.map(data -> {
@Bean
public RxJavaProcessor<String,String> processor() {
return inputStream -> inputStream.map(data -> {
logger.info("Got data = " + data);
return data;
}).buffer(5).map(data -> String.valueOf(avg(data)));

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -5,7 +5,7 @@ spring:
stream:
bindings:
input:
destination: transformed
destination: testtock
# uncomment below to have this module consume from a specific partition
# in the range of 0 to N-1, where N is the upstream module's partitionCount
#consumerProperties:

View File

@@ -18,16 +18,11 @@ This sample is a Spring Boot application that uses Spring Cloud Stream to publis
* SourceApplication - the Spring Boot Main Application
* TimeSource - the module that will generate the timestamp and post the message to the stream
* TimeSourceOptionsMetadata - defines the configurations that are available to setup the TimeSource
* format - how to render the current time, using SimpleDateFormat
* fixedDelay - time delay between messages
* initialDelay - delay before the first message
* timeUnit - the time unit for the fixed and initial delays
* maxMessages - the maximum messages per poll; -1 for unlimited
* format - how to render the current time, using SimpleDateFormat
* fixedDelay - time delay between messages
* initialDelay - delay before the first message
* timeUnit - the time unit for the fixed and initial delays
* maxMessages - the maximum messages per poll; -1 for unlimited
## Building with Maven

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -31,6 +31,10 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -1,33 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Bar {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -0,0 +1,101 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.converter.AbstractFromMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.MimeType;
/**
* @author Ilayaperumal Gopinathan
*/
@Configuration
public class Converters {
//Register custom converter
@Bean
public AbstractFromMessageConverter fooConverter() {
return new FooToBarConverter();
}
public static class Foo {
private String value = "foo";
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private String value = "init";
public Bar(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class FooToBarConverter extends AbstractFromMessageConverter {
public FooToBarConverter() {
super(MimeType.valueOf("test/bar"));
}
@Override
protected Class<?>[] supportedTargetTypes() {
return new Class[] {Bar.class};
}
@Override
protected Class<?>[] supportedPayloadTypes() {
return new Class<?>[] {Foo.class};
}
@Override
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object result = null;
try {
if (message.getPayload() instanceof Foo) {
Foo fooPayload = (Foo) message.getPayload();
result = new Bar(fooPayload.getValue());
}
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new MessageConversionException(e.getMessage());
}
return result;
}
}
}

View File

@@ -1,33 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Foo {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -29,11 +29,11 @@ public class SampleSink {
// Sink application definition
@StreamListener(Sink.SAMPLE)
public void receive(Foo fooMessage) {
public void receive(Converters.Bar barMessage) {
System.out.println("******************");
System.out.println("At the Sink");
System.out.println("******************");
System.out.println("Received transformed message " + fooMessage.getValue() + " of type " + fooMessage.getClass());
System.out.println("Received transformed message " + barMessage.getValue() + " of type " + barMessage.getClass());
}
public interface Sink {

View File

@@ -24,9 +24,7 @@ import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author Ilayaperumal Gopinathan
@@ -42,9 +40,10 @@ public class SampleSource {
System.out.println("******************");
System.out.println("At the Source");
System.out.println("******************");
String value = "{\"value\":\"hi\"}";
System.out.println("Sending value: " + value);
return MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
Converters.Foo foo = new Converters.Foo();
foo.setValue("hi");
System.out.println("Sending value: " + foo.getValue() + " of type " + foo.getClass());
return new GenericMessage(foo);
}
};
}

View File

@@ -33,7 +33,7 @@ public class SampleTransformer {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Bar receive(Bar barMessage) {
public Converters.Bar receive(Converters.Bar barMessage) {
System.out.println("******************");
System.out.println("At the transformer");
System.out.println("******************");

View File

@@ -10,6 +10,5 @@ spring:
destination: testtock
output:
destination: xformed
content-type: application/json
sample-sink:
destination: xformed

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>