From a5eb9eabc25182bb9928b005f0e57deb52f7aa0b Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 14 Aug 2018 14:23:32 -0400 Subject: [PATCH] Changes to accommodate upstream boot changes in sring-cloud-stream core --- .gitignore | 2 +- .../streams/dlq/sample/KafkaStreamsDlqSample.java | 7 +++++-- .../KafkaStreamsProductTrackerApplication.java | 7 +++++-- .../multibinder-two-kafka-clusters/pom.xml | 4 ++-- .../demo/{ => producer}/ModuleApplicationTests.java | 13 ++++--------- samples-acceptance-tests/pom.xml | 2 +- 6 files changed, 18 insertions(+), 17 deletions(-) rename partitioning-samples/partitioning-producer/src/test/java/demo/{ => producer}/ModuleApplicationTests.java (69%) diff --git a/.gitignore b/.gitignore index 2c23ba4..b47989f 100644 --- a/.gitignore +++ b/.gitignore @@ -18,7 +18,7 @@ _site/ *.iml *.ipr *.iws -.idea/* +.idea/ */.idea .factorypath spring-xd-samples/*/xd diff --git a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java index 98de5ba..8a57164 100644 --- a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java +++ b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java @@ -19,6 +19,8 @@ package kafka.streams.dlq.sample; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; @@ -51,8 +53,9 @@ public class KafkaStreamsDlqSample { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) - .groupByKey(Serdes.String(), Serdes.String()) - .count(timeWindows, "WordCounts-1") + .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .windowedBy(timeWindows) + .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); } diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java index f67449d..006d9e9 100644 --- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java +++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java @@ -18,6 +18,8 @@ package kafka.streams.product.tracker; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; @@ -60,8 +62,9 @@ public class KafkaStreamsProductTrackerApplication { return input .filter((key, product) -> productIds().contains(product.getId())) .map((key, value) -> new KeyValue<>(value, value)) - .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)) - .count(timeWindows, "product-counts") + .groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))) + .windowedBy(timeWindows) + .count(Materialized.as("product-counts")) .toStream() .map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id, value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(), diff --git a/multibinder-samples/multibinder-two-kafka-clusters/pom.xml b/multibinder-samples/multibinder-two-kafka-clusters/pom.xml index fd134ee..e510ce8 100644 --- a/multibinder-samples/multibinder-two-kafka-clusters/pom.xml +++ b/multibinder-samples/multibinder-two-kafka-clusters/pom.xml @@ -19,12 +19,12 @@ org.springframework.cloud spring-cloud-stream-binder-kafka - 2.0.0.BUILD-SNAPSHOT + 2.1.0.BUILD-SNAPSHOT org.springframework.cloud spring-cloud-stream-binder-kafka-core - 2.0.0.BUILD-SNAPSHOT + 2.1.0.BUILD-SNAPSHOT org.springframework.boot diff --git a/partitioning-samples/partitioning-producer/src/test/java/demo/ModuleApplicationTests.java b/partitioning-samples/partitioning-producer/src/test/java/demo/producer/ModuleApplicationTests.java similarity index 69% rename from partitioning-samples/partitioning-producer/src/test/java/demo/ModuleApplicationTests.java rename to partitioning-samples/partitioning-producer/src/test/java/demo/producer/ModuleApplicationTests.java index b1c9072..4860a34 100644 --- a/partitioning-samples/partitioning-producer/src/test/java/demo/ModuleApplicationTests.java +++ b/partitioning-samples/partitioning-producer/src/test/java/demo/producer/ModuleApplicationTests.java @@ -14,20 +14,15 @@ * limitations under the License. */ -package demo; +package demo.producer; -import demo.producer.Producer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import org.springframework.test.context.web.WebAppConfiguration; +import org.springframework.test.context.junit4.SpringRunner; -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = Producer.class) -@WebAppConfiguration -@DirtiesContext +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) public class ModuleApplicationTests { @Test diff --git a/samples-acceptance-tests/pom.xml b/samples-acceptance-tests/pom.xml index 0e6541d..4f1c4f7 100644 --- a/samples-acceptance-tests/pom.xml +++ b/samples-acceptance-tests/pom.xml @@ -11,7 +11,7 @@ org.springframework.cloud spring-cloud-build - 2.0.0.BUILD-SNAPSHOT + 2.1.0.BUILD-SNAPSHOT