diff --git a/.gitignore b/.gitignore
index 2c23ba4..b47989f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,7 +18,7 @@ _site/
*.iml
*.ipr
*.iws
-.idea/*
+.idea/
*/.idea
.factorypath
spring-xd-samples/*/xd
diff --git a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java
index 98de5ba..8a57164 100644
--- a/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java
+++ b/kafka-streams-samples/kafka-streams-dlq-sample/src/main/java/kafka/streams/dlq/sample/KafkaStreamsDlqSample.java
@@ -19,6 +19,8 @@ package kafka.streams.dlq.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
@@ -51,8 +53,9 @@ public class KafkaStreamsDlqSample {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
- .groupByKey(Serdes.String(), Serdes.String())
- .count(timeWindows, "WordCounts-1")
+ .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .windowedBy(timeWindows)
+ .count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
diff --git a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java
index f67449d..006d9e9 100644
--- a/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java
+++ b/kafka-streams-samples/kafka-streams-product-tracker/src/main/java/kafka/streams/product/tracker/KafkaStreamsProductTrackerApplication.java
@@ -18,6 +18,8 @@ package kafka.streams.product.tracker;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
@@ -60,8 +62,9 @@ public class KafkaStreamsProductTrackerApplication {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
- .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
- .count(timeWindows, "product-counts")
+ .groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
+ .windowedBy(timeWindows)
+ .count(Materialized.as("product-counts"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
diff --git a/multibinder-samples/multibinder-two-kafka-clusters/pom.xml b/multibinder-samples/multibinder-two-kafka-clusters/pom.xml
index fd134ee..e510ce8 100644
--- a/multibinder-samples/multibinder-two-kafka-clusters/pom.xml
+++ b/multibinder-samples/multibinder-two-kafka-clusters/pom.xml
@@ -19,12 +19,12 @@
org.springframework.cloud
spring-cloud-stream-binder-kafka
- 2.0.0.BUILD-SNAPSHOT
+ 2.1.0.BUILD-SNAPSHOT
org.springframework.cloud
spring-cloud-stream-binder-kafka-core
- 2.0.0.BUILD-SNAPSHOT
+ 2.1.0.BUILD-SNAPSHOT
org.springframework.boot
diff --git a/partitioning-samples/partitioning-producer/src/test/java/demo/ModuleApplicationTests.java b/partitioning-samples/partitioning-producer/src/test/java/demo/producer/ModuleApplicationTests.java
similarity index 69%
rename from partitioning-samples/partitioning-producer/src/test/java/demo/ModuleApplicationTests.java
rename to partitioning-samples/partitioning-producer/src/test/java/demo/producer/ModuleApplicationTests.java
index b1c9072..4860a34 100644
--- a/partitioning-samples/partitioning-producer/src/test/java/demo/ModuleApplicationTests.java
+++ b/partitioning-samples/partitioning-producer/src/test/java/demo/producer/ModuleApplicationTests.java
@@ -14,20 +14,15 @@
* limitations under the License.
*/
-package demo;
+package demo.producer;
-import demo.producer.Producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import org.springframework.test.context.web.WebAppConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
-@RunWith(SpringJUnit4ClassRunner.class)
-@SpringBootTest(classes = Producer.class)
-@WebAppConfiguration
-@DirtiesContext
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ModuleApplicationTests {
@Test
diff --git a/samples-acceptance-tests/pom.xml b/samples-acceptance-tests/pom.xml
index 0e6541d..4f1c4f7 100644
--- a/samples-acceptance-tests/pom.xml
+++ b/samples-acceptance-tests/pom.xml
@@ -11,7 +11,7 @@
org.springframework.cloud
spring-cloud-build
- 2.0.0.BUILD-SNAPSHOT
+ 2.1.0.BUILD-SNAPSHOT