diff --git a/double/pom.xml b/double/pom.xml index 52e04ed..5d22fcb 100644 --- a/double/pom.xml +++ b/double/pom.xml @@ -11,7 +11,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT diff --git a/double/src/main/java/demo/DoubleApplication.java b/double/src/main/java/demo/DoubleApplication.java index 4e1e458..8a900db 100644 --- a/double/src/main/java/demo/DoubleApplication.java +++ b/double/src/main/java/demo/DoubleApplication.java @@ -29,10 +29,10 @@ import org.springframework.context.ConfigurableApplicationContext; public class DoubleApplication { public static void main(String[] args) { - new AggregateApplicationBuilder(). - from(SourceApplication.class).args("--fixedDelay=5000") + new AggregateApplicationBuilder(DoubleApplication.class, args) + .from(SourceApplication.class).args("--fixedDelay=5000") .via(ProcessorApplication.class) - .to(SinkApplication.class).args("--debug=true").run("--spring.application.name=aggregate-test"); + .to(SinkApplication.class).args("--debug=true").run(); } } diff --git a/multi-io/pom.xml b/multi-io/pom.xml index cdc4285..21d99d8 100644 --- a/multi-io/pom.xml +++ b/multi-io/pom.xml @@ -11,7 +11,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT diff --git a/multibinder-differentsystems/pom.xml b/multibinder-differentsystems/pom.xml index c5aafbf..c8fbc49 100644 --- a/multibinder-differentsystems/pom.xml +++ b/multibinder-differentsystems/pom.xml @@ -12,7 +12,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT @@ -42,14 +42,14 @@ org.springframework.cloud - spring-cloud-stream-test-support-internal + spring-cloud-stream-binder-kafka-test-support test org.apache.kafka - kafka_2.10 + kafka_2.11 test - 0.8.2.1 + 0.9.0.1 test diff --git a/multibinder-differentsystems/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java b/multibinder-differentsystems/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java index 06c4c6e..1b98d48 100644 --- a/multibinder-differentsystems/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java +++ b/multibinder-differentsystems/src/test/java/multibinder/TwoKafkaBindersApplicationTest.java @@ -16,10 +16,6 @@ package multibinder; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.hasSize; - -import java.util.List; import java.util.UUID; import org.hamcrest.CoreMatchers; @@ -32,7 +28,9 @@ import org.junit.runner.RunWith; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.SpringApplicationConfiguration; +import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; @@ -40,12 +38,10 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties; -import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport; -import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.kafka.core.BrokerAddress; -import org.springframework.integration.kafka.core.Configuration; +import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; @@ -53,27 +49,28 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.web.WebAppConfiguration; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; + @RunWith(SpringJUnit4ClassRunner.class) -@SpringApplicationConfiguration(classes = MultibinderApplication.class) +@SpringBootTest(classes = MultibinderApplication.class) @WebAppConfiguration @DirtiesContext public class TwoKafkaBindersApplicationTest { @ClassRule - public static KafkaTestSupport kafkaTestSupport1 = new KafkaTestSupport(true); + public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1); @ClassRule - public static KafkaTestSupport kafkaTestSupport2 = new KafkaTestSupport(true); + public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1); - @ClassRule - public static RedisTestSupport redisTestSupport = new RedisTestSupport(); @BeforeClass public static void setupEnvironment() { - System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokerAddress()); - System.setProperty("zk1", kafkaTestSupport1.getZkConnectString()); - System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokerAddress()); - System.setProperty("zk2", kafkaTestSupport2.getZkConnectString()); + System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString()); + System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString()); + System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString()); + System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString()); } @Autowired @@ -83,18 +80,19 @@ public class TwoKafkaBindersApplicationTest { public void contextLoads() { Binder binder1 = binderFactory.getBinder("kafka1"); KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1; - DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(kafka1.getConnectionFactory()); - Configuration configuration = (Configuration) directFieldAccessor.getPropertyValue("configuration"); - List brokerAddresses = configuration.getBrokerAddresses(); - Assert.assertThat(brokerAddresses, hasSize(1)); - Assert.assertThat(brokerAddresses, contains(BrokerAddress.fromAddress(kafkaTestSupport1.getBrokerAddress()))); + DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1); + KafkaBinderConfigurationProperties configuration1 = + (KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties"); + Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1)); + Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString())); + Binder binder2 = binderFactory.getBinder("kafka2"); KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2; - DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2.getConnectionFactory()); - Configuration configuration2 = (Configuration) directFieldAccessor2.getPropertyValue("configuration"); - List brokerAddresses2 = configuration2.getBrokerAddresses(); - Assert.assertThat(brokerAddresses2, hasSize(1)); - Assert.assertThat(brokerAddresses2, contains(BrokerAddress.fromAddress(kafkaTestSupport2.getBrokerAddress()))); + DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2); + KafkaBinderConfigurationProperties configuration2 = + (KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties"); + Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1)); + Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString())); } @Test @@ -110,7 +108,7 @@ public class TwoKafkaBindersApplicationTest { String testPayload = "testFoo" + UUID.randomUUID().toString(); dataProducer.send(MessageBuilder.withPayload(testPayload).build()); - Message receive = dataConsumer.receive(2000); + Message receive = dataConsumer.receive(5000); Assert.assertThat(receive, Matchers.notNullValue()); Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload)); } diff --git a/multibinder/pom.xml b/multibinder/pom.xml index f48fe37..f0c0afd 100644 --- a/multibinder/pom.xml +++ b/multibinder/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT @@ -34,23 +34,23 @@ org.springframework.cloud spring-cloud-stream-sample-sink - - org.springframework.cloud - spring-cloud-stream-binder-redis - 1.0.0.RELEASE - org.springframework.cloud spring-cloud-stream-binder-rabbit org.springframework.cloud - spring-cloud-stream-test-support-internal + spring-cloud-stream-binder-kafka + + + org.springframework.cloud + spring-cloud-stream-binder-rabbit-test-support test - org.springframework.boot - spring-boot-starter-redis + org.springframework.cloud + spring-cloud-stream-binder-kafka-test-support + test org.springframework.boot diff --git a/multibinder/src/main/resources/application.yml b/multibinder/src/main/resources/application.yml index 9c3cb21..9dcd1b1 100644 --- a/multibinder/src/main/resources/application.yml +++ b/multibinder/src/main/resources/application.yml @@ -6,7 +6,7 @@ spring: bindings: input: destination: dataIn - binder: redis + binder: kafka output: destination: dataOut binder: rabbit diff --git a/multibinder/src/test/java/multibinder/RabbitAndRedisBinderApplicationTests.java b/multibinder/src/test/java/multibinder/RabbitAndKafkaBinderApplicationTests.java similarity index 57% rename from multibinder/src/test/java/multibinder/RabbitAndRedisBinderApplicationTests.java rename to multibinder/src/test/java/multibinder/RabbitAndKafkaBinderApplicationTests.java index 991a404..112bf83 100644 --- a/multibinder/src/test/java/multibinder/RabbitAndRedisBinderApplicationTests.java +++ b/multibinder/src/test/java/multibinder/RabbitAndKafkaBinderApplicationTests.java @@ -24,47 +24,37 @@ import org.junit.After; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; -import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitAdmin; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.SpringApplicationConfiguration; +import org.springframework.boot.SpringApplication; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; -import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; +import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties; import org.springframework.cloud.stream.binder.rabbit.RabbitConsumerProperties; import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder; -import org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder; -import org.springframework.cloud.stream.test.junit.rabbit.RabbitTestSupport; -import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport; +import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import org.springframework.test.context.web.WebAppConfiguration; /** * @author Marius Bogoevici * @author Gary Russell */ -@RunWith(SpringJUnit4ClassRunner.class) -@SpringApplicationConfiguration(classes = MultibinderApplication.class) -@WebAppConfiguration @DirtiesContext -public class RabbitAndRedisBinderApplicationTests { +public class RabbitAndKafkaBinderApplicationTests { @ClassRule public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport(); @ClassRule - public static RedisTestSupport redisTestSupport = new RedisTestSupport(); - - @Autowired - private BinderFactory binderFactory; + public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test"); private final String randomGroup = UUID.randomUUID().toString(); @@ -77,25 +67,39 @@ public class RabbitAndRedisBinderApplicationTests { } @Test - public void contextLoads() { + public void contextLoads() throws Exception { + // passing connection arguments arguments to the embedded Kafka instance + ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class, + "--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(), + "--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString()); + context.close(); } @Test - public void messagingWorks() { + public void messagingWorks() throws Exception { + // passing connection arguments arguments to the embedded Kafka instance + ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class, + "--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(), + "--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(), + "--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup); DirectChannel dataProducer = new DirectChannel(); - ((RedisMessageChannelBinder)binderFactory.getBinder("redis")) - .bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new ProducerProperties())); + BinderFactory binderFactory = context.getBean(BinderFactory.class); QueueChannel dataConsumer = new QueueChannel(); - ((RabbitMessageChannelBinder)binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup, + + ((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup, dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties())); + ((KafkaMessageChannelBinder) binderFactory.getBinder("kafka")) + .bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties())); + String testPayload = "testFoo" + this.randomGroup; dataProducer.send(MessageBuilder.withPayload(testPayload).build()); - Message receive = dataConsumer.receive(2000); + Message receive = dataConsumer.receive(10000); Assert.assertThat(receive, Matchers.notNullValue()); Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload)); + context.close(); } } diff --git a/pom.xml b/pom.xml index 129d2cb..17972ca 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT pom https://github.com/spring-cloud/spring-cloud-stream-samples @@ -13,11 +13,11 @@ org.springframework.cloud spring-cloud-build - 1.1.1.RELEASE + 1.2.0.RELEASE UTF-8 - 1.0.2.RELEASE + Brooklyn.BUILD-SNAPSHOT 1.8 @@ -30,6 +30,7 @@ rxjava-processor multi-io stream-listener + reactive-processor-kafka @@ -43,22 +44,17 @@ org.springframework.cloud spring-cloud-stream-sample-source - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT org.springframework.cloud spring-cloud-stream-sample-sink - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT org.springframework.cloud spring-cloud-stream-sample-transform - 1.0.0.BUILD-SNAPSHOT - - - org.springframework.cloud - spring-cloud-stream-binder-rabbit - ${spring-cloud-stream.version} + 1.1.0.BUILD-SNAPSHOT @@ -75,4 +71,67 @@ + + + spring + + + spring-snapshots + Spring Snapshots + http://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + http://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + http://repo.spring.io/release + + false + + + + + + spring-snapshots + Spring Snapshots + http://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + http://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + http://repo.spring.io/libs-release-local + + false + + + + + diff --git a/reactive-processor-kafka/.jdk8 b/reactive-processor-kafka/.jdk8 new file mode 100644 index 0000000..e69de29 diff --git a/reactive-processor-kafka/pom.xml b/reactive-processor-kafka/pom.xml new file mode 100644 index 0000000..4dfaa69 --- /dev/null +++ b/reactive-processor-kafka/pom.xml @@ -0,0 +1,62 @@ + + + + spring-cloud-stream-samples + org.springframework.cloud + 1.1.0.BUILD-SNAPSHOT + + 4.0.0 + + reactive-processor-kafka + + + 1.8 + reactive.kafka.ReactiveKafkaProcessorApplication + + + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + org.springframework.cloud + spring-cloud-stream-reactive + + + io.projectreactor + reactor-core + 3.0.1.BUILD-SNAPSHOT + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + exec + + + + + + \ No newline at end of file diff --git a/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveKafkaProcessorApplication.java b/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveKafkaProcessorApplication.java new file mode 100644 index 0000000..33fc799 --- /dev/null +++ b/reactive-processor-kafka/src/main/java/reactive/kafka/ReactiveKafkaProcessorApplication.java @@ -0,0 +1,32 @@ +package reactive.kafka; + +import java.time.Duration; + +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; + +@SpringBootApplication +@EnableBinding(Processor.class) +public class ReactiveKafkaProcessorApplication { + + public static void main(String[] args) { + SpringApplication.run(ReactiveKafkaProcessorApplication.class, args); + } + + @StreamListener + @Output(Processor.OUTPUT) + public Flux toUpperCase(@Input(Processor.INPUT) Flux inbound) { + return inbound. + log() + .window(Duration.ofSeconds(10), Duration.ofSeconds(5)) + .flatMap(w -> w.reduce("", (s1,s2)->s1+s2)) + .log(); + } +} diff --git a/reactive-processor-kafka/src/main/resources/application.yml b/reactive-processor-kafka/src/main/resources/application.yml new file mode 100644 index 0000000..3db06da --- /dev/null +++ b/reactive-processor-kafka/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: 8082 +spring: + cloud: + stream: + bindings: + output: + destination: transformed + input: + destination: testtock diff --git a/rxjava-processor/pom.xml b/rxjava-processor/pom.xml index 4073b1b..3a98b14 100644 --- a/rxjava-processor/pom.xml +++ b/rxjava-processor/pom.xml @@ -11,7 +11,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT @@ -24,7 +24,6 @@ org.springframework.cloud spring-cloud-stream-rxjava - ${spring-cloud-stream.version} org.springframework.cloud diff --git a/sink/pom.xml b/sink/pom.xml index 85f2ad8..165b5ad 100644 --- a/sink/pom.xml +++ b/sink/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT diff --git a/sink/src/main/resources/application.yml b/sink/src/main/resources/application.yml index 682bd5a..6d40252 100644 --- a/sink/src/main/resources/application.yml +++ b/sink/src/main/resources/application.yml @@ -5,7 +5,7 @@ spring: stream: bindings: input: - destination: testtock + destination: transformed # uncomment below to have this module consume from a specific partition # in the range of 0 to N-1, where N is the upstream module's partitionCount #consumerProperties: diff --git a/source/pom.xml b/source/pom.xml index d6020c6..4992a3e 100644 --- a/source/pom.xml +++ b/source/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT @@ -31,10 +31,6 @@ spring-boot-configuration-processor true - - org.springframework.boot - spring-boot-starter-redis - org.springframework.boot spring-boot-starter-test diff --git a/stream-listener/pom.xml b/stream-listener/pom.xml index c9f058a..c2f7b1a 100644 --- a/stream-listener/pom.xml +++ b/stream-listener/pom.xml @@ -11,7 +11,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT diff --git a/stream-listener/src/main/java/demo/Bar.java b/stream-listener/src/main/java/demo/Bar.java new file mode 100644 index 0000000..38a2c66 --- /dev/null +++ b/stream-listener/src/main/java/demo/Bar.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package demo; + +/** + * @author Marius Bogoevici + */ +public class Bar { + + private String value; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/stream-listener/src/main/java/demo/Converters.java b/stream-listener/src/main/java/demo/Converters.java deleted file mode 100644 index 0f09f67..0000000 --- a/stream-listener/src/main/java/demo/Converters.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package demo; - -import org.springframework.cloud.stream.converter.AbstractFromMessageConverter; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.Message; -import org.springframework.messaging.converter.MessageConversionException; -import org.springframework.util.MimeType; - -/** - * @author Ilayaperumal Gopinathan - */ -@Configuration -public class Converters { - - //Register custom converter - @Bean - public AbstractFromMessageConverter fooConverter() { - return new FooToBarConverter(); - } - - public static class Foo { - - private String value = "foo"; - - public String getValue() { - return this.value; - } - - public void setValue(String value) { - this.value = value; - } - } - - public static class Bar { - - private String value = "init"; - - public Bar(String value) { - this.value = value; - } - - public String getValue() { - return this.value; - } - - public void setValue(String value) { - this.value = value; - } - - } - - public static class FooToBarConverter extends AbstractFromMessageConverter { - - public FooToBarConverter() { - super(MimeType.valueOf("test/bar")); - } - - @Override - protected Class[] supportedTargetTypes() { - return new Class[] {Bar.class}; - } - - @Override - protected Class[] supportedPayloadTypes() { - return new Class[] {Foo.class}; - } - - @Override - public Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { - Object result = null; - try { - if (message.getPayload() instanceof Foo) { - Foo fooPayload = (Foo) message.getPayload(); - result = new Bar(fooPayload.getValue()); - } - } - catch (Exception e) { - logger.error(e.getMessage(), e); - throw new MessageConversionException(e.getMessage()); - } - return result; - } - } -} diff --git a/stream-listener/src/main/java/demo/Foo.java b/stream-listener/src/main/java/demo/Foo.java new file mode 100644 index 0000000..5b0e849 --- /dev/null +++ b/stream-listener/src/main/java/demo/Foo.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package demo; + +/** + * @author Marius Bogoevici + */ +public class Foo { + + private String value; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/stream-listener/src/main/java/demo/SampleSink.java b/stream-listener/src/main/java/demo/SampleSink.java index e4c652e..9b0e638 100644 --- a/stream-listener/src/main/java/demo/SampleSink.java +++ b/stream-listener/src/main/java/demo/SampleSink.java @@ -29,11 +29,11 @@ public class SampleSink { // Sink application definition @StreamListener(Sink.SAMPLE) - public void receive(Converters.Bar barMessage) { + public void receive(Foo fooMessage) { System.out.println("******************"); System.out.println("At the Sink"); System.out.println("******************"); - System.out.println("Received transformed message " + barMessage.getValue() + " of type " + barMessage.getClass()); + System.out.println("Received transformed message " + fooMessage.getValue() + " of type " + fooMessage.getClass()); } public interface Sink { diff --git a/stream-listener/src/main/java/demo/SampleSource.java b/stream-listener/src/main/java/demo/SampleSource.java index cf5276b..30fd7f0 100644 --- a/stream-listener/src/main/java/demo/SampleSource.java +++ b/stream-listener/src/main/java/demo/SampleSource.java @@ -24,7 +24,9 @@ import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; /** * @author Ilayaperumal Gopinathan @@ -40,10 +42,9 @@ public class SampleSource { System.out.println("******************"); System.out.println("At the Source"); System.out.println("******************"); - Converters.Foo foo = new Converters.Foo(); - foo.setValue("hi"); - System.out.println("Sending value: " + foo.getValue() + " of type " + foo.getClass()); - return new GenericMessage(foo); + String value = "{\"value\":\"hi\"}"; + System.out.println("Sending value: " + value); + return MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build(); } }; } diff --git a/stream-listener/src/main/java/demo/SampleTransformer.java b/stream-listener/src/main/java/demo/SampleTransformer.java index faab174..aa12b17 100644 --- a/stream-listener/src/main/java/demo/SampleTransformer.java +++ b/stream-listener/src/main/java/demo/SampleTransformer.java @@ -33,7 +33,7 @@ public class SampleTransformer { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) - public Converters.Bar receive(Converters.Bar barMessage) { + public Bar receive(Bar barMessage) { System.out.println("******************"); System.out.println("At the transformer"); System.out.println("******************"); diff --git a/stream-listener/src/main/resources/application.yml b/stream-listener/src/main/resources/application.yml index 9f6e482..98eda29 100644 --- a/stream-listener/src/main/resources/application.yml +++ b/stream-listener/src/main/resources/application.yml @@ -10,5 +10,6 @@ spring: destination: testtock output: destination: xformed + content-type: application/json sample-sink: destination: xformed diff --git a/transform/pom.xml b/transform/pom.xml index c72d39f..a136a71 100644 --- a/transform/pom.xml +++ b/transform/pom.xml @@ -11,7 +11,7 @@ org.springframework.cloud spring-cloud-stream-samples - 1.0.0.BUILD-SNAPSHOT + 1.1.0.BUILD-SNAPSHOT