Changes to accommodate upstream boot changes in sring-cloud-stream core

This commit is contained in:
Soby Chacko
2018-08-14 14:23:32 -04:00
parent 2460ff21f7
commit a5eb9eabc2
6 changed files with 18 additions and 17 deletions

2
.gitignore vendored
View File

@@ -18,7 +18,7 @@ _site/
*.iml
*.ipr
*.iws
.idea/*
.idea/
*/.idea
.factorypath
spring-xd-samples/*/xd

View File

@@ -19,6 +19,8 @@ package kafka.streams.dlq.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
@@ -51,8 +53,9 @@ public class KafkaStreamsDlqSample {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts-1")
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}

View File

@@ -18,6 +18,8 @@ package kafka.streams.product.tracker;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
@@ -60,8 +62,9 @@ public class KafkaStreamsProductTrackerApplication {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(timeWindows, "product-counts")
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(timeWindows)
.count(Materialized.as("product-counts"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),

View File

@@ -19,12 +19,12 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -14,20 +14,15 @@
* limitations under the License.
*/
package demo;
package demo.producer;
import demo.producer.Producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Producer.class)
@WebAppConfiguration
@DirtiesContext
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ModuleApplicationTests {
@Test

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>2.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>