Compare commits
64 Commits
Elmhurst
...
Germantown
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1afdac7eb6 | ||
|
|
ce0e8e1e18 | ||
|
|
de71d4bffd | ||
|
|
f8c5cf942c | ||
|
|
6a2fb044ab | ||
|
|
0469a003ee | ||
|
|
dd06860b57 | ||
|
|
dd41ea5bac | ||
|
|
18a7b2453e | ||
|
|
5eebba3e9d | ||
|
|
e6cf1085a6 | ||
|
|
03976c7c22 | ||
|
|
c518e38aef | ||
|
|
92803928ab | ||
|
|
5a21f2b687 | ||
|
|
80723fc491 | ||
|
|
d73bddc095 | ||
|
|
50b855243a | ||
|
|
1ce7d3d51b | ||
|
|
417abb9acd | ||
|
|
697e5fd64a | ||
|
|
d0b9cf52aa | ||
|
|
1a67f1af23 | ||
|
|
52c4c876a1 | ||
|
|
9beb9b7352 | ||
|
|
0ee844bcd9 | ||
|
|
451eef29b8 | ||
|
|
57c980cfa2 | ||
|
|
12649abfeb | ||
|
|
ab59cb73c6 | ||
|
|
c8ccdfc9f6 | ||
|
|
765ada3559 | ||
|
|
2adec97cd0 | ||
|
|
1c38e8b553 | ||
|
|
b10a329fe1 | ||
|
|
e2111652fd | ||
|
|
1dabe82fc9 | ||
|
|
f34fd120e9 | ||
|
|
49428f2bb6 | ||
|
|
1b3807888c | ||
|
|
cf8a2e1765 | ||
|
|
8bd99c2b74 | ||
|
|
21d5bc8529 | ||
|
|
267b58d7b4 | ||
|
|
a5eb9eabc2 | ||
|
|
2460ff21f7 | ||
|
|
e927b01bec | ||
|
|
cb0ed01152 | ||
|
|
1b963e58fd | ||
|
|
b156cc8c22 | ||
|
|
042330a51d | ||
|
|
c4165dd7f8 | ||
|
|
2019300fe8 | ||
|
|
c6bdca132b | ||
|
|
a1bd8929b6 | ||
|
|
c9f2a3b982 | ||
|
|
d7fcde4754 | ||
|
|
68ff597cd2 | ||
|
|
783c61fbf4 | ||
|
|
4875f68368 | ||
|
|
90b0b72db7 | ||
|
|
107e758733 | ||
|
|
a2056507ec | ||
|
|
4616e4c2f9 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -18,7 +18,7 @@ _site/
|
||||
*.iml
|
||||
*.ipr
|
||||
*.iws
|
||||
.idea/*
|
||||
.idea/
|
||||
*/.idea
|
||||
.factorypath
|
||||
spring-xd-samples/*/xd
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
<repository>
|
||||
<id>spring-snapshots</id>
|
||||
<name>Spring Snapshots</name>
|
||||
<url>http://repo.spring.io/libs-snapshot-local</url>
|
||||
<url>https://repo.spring.io/libs-snapshot-local</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
@@ -29,7 +29,7 @@
|
||||
<repository>
|
||||
<id>spring-milestones</id>
|
||||
<name>Spring Milestones</name>
|
||||
<url>http://repo.spring.io/libs-milestone-local</url>
|
||||
<url>https://repo.spring.io/libs-milestone-local</url>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
@@ -37,7 +37,7 @@
|
||||
<repository>
|
||||
<id>spring-releases</id>
|
||||
<name>Spring Releases</name>
|
||||
<url>http://repo.spring.io/release</url>
|
||||
<url>https://repo.spring.io/release</url>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
@@ -47,7 +47,7 @@
|
||||
<pluginRepository>
|
||||
<id>spring-snapshots</id>
|
||||
<name>Spring Snapshots</name>
|
||||
<url>http://repo.spring.io/libs-snapshot-local</url>
|
||||
<url>https://repo.spring.io/libs-snapshot-local</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
@@ -55,7 +55,7 @@
|
||||
<pluginRepository>
|
||||
<id>spring-milestones</id>
|
||||
<name>Spring Milestones</name>
|
||||
<url>http://repo.spring.io/libs-milestone-local</url>
|
||||
<url>https://repo.spring.io/libs-milestone-local</url>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
|
||||
4
LICENSE
4
LICENSE
@@ -1,6 +1,6 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
https://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
@@ -192,7 +192,7 @@
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-aggregate</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
spring.application.name: kafka-streams-aggregate-sample
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: foobar
|
||||
consumer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$BytesSerde
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-branching</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -7,21 +7,13 @@ spring.cloud.stream.kafka.streams.binder.configuration:
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.bindings.output1:
|
||||
destination: english-counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.output2:
|
||||
destination: french-counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.output3:
|
||||
destination: spanish-counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: words
|
||||
group: group1
|
||||
consumer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100 #localhost
|
||||
zkNodes: localhost #192.168.99.100 #localhost
|
||||
spring.application.name: kafka-streams-branching-sample
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -15,7 +15,7 @@ This sample uses lambda expressions and thus requires Java 8+.
|
||||
|
||||
Go to the root of the repository and do: `./mvnw clean package`
|
||||
|
||||
`java -jar target/kstream-word-count-0.0.1-SNAPSHOT.jar`
|
||||
`java -jar target/kafka-streams-dlq-sample-0.0.1-SNAPSHOT.jar`
|
||||
|
||||
The default application.yml file demonstrates native decoding by Kafka.
|
||||
The default value serializer is set to IntegerSerde to force a deserialization errors.
|
||||
@@ -38,4 +38,4 @@ You will not see any messages coming to the regular destination counts.
|
||||
|
||||
There is another yaml file provided (by-framework-decoding.yml).
|
||||
Use that as application.yml to see how it works when the deserialization done by the framework.
|
||||
In this case also, the messages on error appear in the DLQ topic.
|
||||
In this case also, the messages on error appear in the DLQ topic.
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-dlq-sample</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -19,6 +19,8 @@ package kafka.streams.dlq.sample;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
@@ -41,9 +43,6 @@ public class KafkaStreamsDlqSample {
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
public static class WordCountProcessorApplication {
|
||||
|
||||
@Autowired
|
||||
private TimeWindows timeWindows;
|
||||
|
||||
@StreamListener("input")
|
||||
@SendTo("output")
|
||||
public KStream<?, WordCount> process(KStream<Object, String> input) {
|
||||
@@ -51,8 +50,9 @@ public class KafkaStreamsDlqSample {
|
||||
return input
|
||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Serdes.String(), Serdes.String())
|
||||
.count(timeWindows, "WordCounts-1")
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(5000))
|
||||
.count(Materialized.as("WordCounts-1"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
|
||||
}
|
||||
|
||||
@@ -1,25 +1,20 @@
|
||||
spring.cloud.stream.bindings.output.contentType: application/json
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
|
||||
spring.cloud.stream.kafka.streams.binder.configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
|
||||
application.id: default
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
configuration:
|
||||
commit.interval.ms: 1000
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
|
||||
application.id: default
|
||||
brokers: localhost #192.168.99.100
|
||||
serdeError: sendToDlq
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
#useNativeEncoding: true
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: words
|
||||
group: group1
|
||||
consumer:
|
||||
headerMode: raw
|
||||
useNativeDecoding: true
|
||||
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
serdeError: sendToDlq
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
spring.cloud.stream.bindings.output.contentType: application/json
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
|
||||
spring.cloud.stream.kafka.streams.binder.configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
application.id: default
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
serdeError: sendToDlq
|
||||
configuration:
|
||||
commit.interval.ms: 1000
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
application.id: default
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: counts
|
||||
producer:
|
||||
@@ -17,9 +20,9 @@ spring.cloud.stream.bindings.input:
|
||||
headerMode: raw
|
||||
useNativeDecoding: false
|
||||
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
serdeError: sendToDlq
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
1
kafka-streams-samples/kafka-streams-global-table-join/.mvn/wrapper/maven-wrapper.properties
vendored
Normal file
1
kafka-streams-samples/kafka-streams-global-table-join/.mvn/wrapper/maven-wrapper.properties
vendored
Normal file
@@ -0,0 +1 @@
|
||||
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip
|
||||
@@ -0,0 +1,25 @@
|
||||
== What is this app?
|
||||
|
||||
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
|
||||
|
||||
The application uses two inputs - one KStream for user-clicks and a GlobalKTable for user-regions.
|
||||
Then it joins the information from stream to table to find out total clicks per region. You could compare the this with ktable join sample.
|
||||
|
||||
=== Running the app:
|
||||
|
||||
Go to the root of the repository.
|
||||
|
||||
`docker-compose up -d`
|
||||
|
||||
`./mvnw clean package`
|
||||
|
||||
`java -jar target/kafka-streams-global-table-join-0.0.1-SNAPSHOT.jar`
|
||||
|
||||
`docker exec -it kafka-join /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic user-regions`
|
||||
|
||||
`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic user-regions --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-"`
|
||||
|
||||
`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output-topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.separator="-"`
|
||||
|
||||
|
||||
Run the stand-alone `Producers` application to generate some data and watch the output on the console consumer above.
|
||||
@@ -0,0 +1,19 @@
|
||||
version: '3'
|
||||
services:
|
||||
kafka:
|
||||
image: wurstmeister/kafka
|
||||
container_name: kafka-join
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
|
||||
- KAFKA_ADVERTISED_PORT=9092
|
||||
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
depends_on:
|
||||
- zookeeper
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_HOST_NAME=zookeeper
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@@ -0,0 +1,42 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-global-table-join</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>kafka-streams-global-table-join</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.streams.globalktable.join;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
|
||||
/**
|
||||
* This is the PR that added this sample:
|
||||
* https://github.com/spring-cloud/spring-cloud-stream-samples/pull/112
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class KafkaStreamsGlobalKTableJoin {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaStreamsGlobalKTableJoin.class, args);
|
||||
}
|
||||
|
||||
@EnableBinding(KStreamProcessorX.class)
|
||||
public static class KStreamToTableJoinApplication {
|
||||
|
||||
|
||||
@StreamListener
|
||||
@SendTo("output")
|
||||
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
|
||||
@Input("inputTable") GlobalKTable<String, String> userRegionsTable) {
|
||||
|
||||
return userClicksStream
|
||||
.leftJoin(userRegionsTable,
|
||||
(name,value) -> name,
|
||||
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)
|
||||
)
|
||||
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
|
||||
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
|
||||
.toStream();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
interface KStreamProcessorX extends KafkaStreamsProcessor {
|
||||
|
||||
@Input("inputTable")
|
||||
GlobalKTable<?, ?> inputKTable();
|
||||
}
|
||||
|
||||
private static final class RegionWithClicks {
|
||||
|
||||
private final String region;
|
||||
private final long clicks;
|
||||
|
||||
public RegionWithClicks(String region, long clicks) {
|
||||
if (region == null || region.isEmpty()) {
|
||||
throw new IllegalArgumentException("region must be set");
|
||||
}
|
||||
if (clicks < 0) {
|
||||
throw new IllegalArgumentException("clicks must not be negative");
|
||||
}
|
||||
this.region = region;
|
||||
this.clicks = clicks;
|
||||
}
|
||||
|
||||
public String getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
public long getClicks() {
|
||||
return clicks;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.streams.globalktable.join;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.LongSerializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Producers {
|
||||
|
||||
public static void main(String... args) {
|
||||
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, 0);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
|
||||
|
||||
List<KeyValue<String, Long>> userClicks = Arrays.asList(
|
||||
new KeyValue<>("alice", 13L),
|
||||
new KeyValue<>("bob", 4L),
|
||||
new KeyValue<>("chao", 25L),
|
||||
new KeyValue<>("bob", 19L),
|
||||
new KeyValue<>("dave", 56L),
|
||||
new KeyValue<>("eve", 78L),
|
||||
new KeyValue<>("alice", 40L),
|
||||
new KeyValue<>("fang", 99L)
|
||||
);
|
||||
|
||||
DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(props);
|
||||
KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("user-clicks3");
|
||||
|
||||
for (KeyValue<String,Long> keyValue : userClicks) {
|
||||
template.sendDefault(keyValue.key, keyValue.value);
|
||||
}
|
||||
|
||||
List<KeyValue<String, String>> userRegions = Arrays.asList(
|
||||
new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */
|
||||
new KeyValue<>("bob", "americas"),
|
||||
new KeyValue<>("chao", "asia"),
|
||||
new KeyValue<>("dave", "europe"),
|
||||
new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
|
||||
new KeyValue<>("eve", "americas"),
|
||||
new KeyValue<>("fang", "asia")
|
||||
);
|
||||
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
|
||||
DefaultKafkaProducerFactory<String, String> pf1 = new DefaultKafkaProducerFactory<>(props);
|
||||
KafkaTemplate<String, String> template1 = new KafkaTemplate<>(pf1, true);
|
||||
template1.setDefaultTopic("user-regions");
|
||||
|
||||
for (KeyValue<String,String> keyValue : userRegions) {
|
||||
template1.sendDefault(keyValue.key, keyValue.value);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
spring.application.name: stream-global-table-sample
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: user-clicks3
|
||||
consumer:
|
||||
useNativeDecoding: true
|
||||
spring.cloud.stream.bindings.inputTable:
|
||||
destination: user-regions
|
||||
contentType: application/avro
|
||||
consumer:
|
||||
useNativeDecoding: true
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: output-topic
|
||||
producer:
|
||||
useNativeEncoding: true
|
||||
spring.cloud.stream.kafka.streams.bindings.input:
|
||||
consumer:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
|
||||
spring.cloud.stream.kafka.streams.bindings.inputTable:
|
||||
consumer:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
materializedAs: all-regions
|
||||
spring.cloud.stream.kafka.streams.bindings.output:
|
||||
producer:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
commit.interval.ms: 1000
|
||||
@@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<root level="INFO">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
|
||||
</configuration>
|
||||
@@ -0,0 +1,18 @@
|
||||
package kafka.streams.globalktable.join;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
public class KafkaStreamsGlobalKTableJoinTests {
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void contextLoads() {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -9,18 +9,33 @@ There is a REST service provided as part of the application that can be used to
|
||||
|
||||
=== Running the app:
|
||||
|
||||
We will run 2 instances of the processor application to demonstrate that regardless of which instance hosts the keys, the REST endpoint will serve the requests.
|
||||
For more information on how this is done, please take a look at the application code.
|
||||
|
||||
1. `docker-compose up -d`
|
||||
|
||||
2. Start the confluent schema registry: The following command is based on the confluent platform.
|
||||
2. Start the confluent schema registry: The following command is based on the confluent platform and assume that you are at the root of the confluent platform's root directory.
|
||||
|
||||
`./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties`
|
||||
|
||||
3. Go to the root of the repository and do: `./mvnw clean package`
|
||||
|
||||
4. `java -jar target/kafka-streams-interactive-query-0.0.1-SNAPSHOT.jar`
|
||||
4. `java -jar target/kafka-streams-interactive-query-advanced-0.0.1-SNAPSHOT.jar`
|
||||
|
||||
5. On another terminal session:
|
||||
|
||||
`java -jar target/kafka-streams-interactive-query-advanced-0.0.1-SNAPSHOT.jar --server.port=8082 --spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8082'
|
||||
|
||||
5. Run the stand-alone `Producers` application to generate data and start the processing.
|
||||
Keep it running for a while.
|
||||
|
||||
6. Go to the URL: http://localhost:8080/charts/top-five?genre=Punk
|
||||
keep refreshing the URL and you will see the song play count information changes.
|
||||
|
||||
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8080 that serves this request.
|
||||
|
||||
7. Go to the URL: http://localhost:8082/charts/top-five?genre=Punk
|
||||
|
||||
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8082 that serves this request.
|
||||
|
||||
8. Once you are done with running the sample, stop the docker containers and the schema registry.
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-interactive-query-advanced</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Spring Cloud Stream sample for KStream interactive queries</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
@@ -47,6 +47,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
|
||||
<version>2.1.0.BUILD-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@@ -86,7 +87,7 @@
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
<url>https://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -21,13 +21,13 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
|
||||
import kafka.streams.interactive.query.avro.PlayEvent;
|
||||
import kafka.streams.interactive.query.avro.Song;
|
||||
import kafka.streams.interactive.query.avro.SongPlayCount;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.serialization.*;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.*;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
||||
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
|
||||
@@ -37,10 +37,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
@@ -54,7 +55,7 @@ public class KafkaStreamsInteractiveQuerySample {
|
||||
static final String ALL_SONGS = "all-songs";
|
||||
|
||||
@Autowired
|
||||
private QueryableStoreRegistry queryableStoreRegistry;
|
||||
private InteractiveQueryService interactiveQueryService;
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaStreamsInteractiveQuerySample.class, args);
|
||||
@@ -169,16 +170,45 @@ public class KafkaStreamsInteractiveQuerySample {
|
||||
@RestController
|
||||
public class FooController {
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
@RequestMapping("/song/idx")
|
||||
public SongBean song(@RequestParam(value="id") Long id) {
|
||||
final ReadOnlyKeyValueStore<Long, Song> songStore =
|
||||
interactiveQueryService.getQueryableStore(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
|
||||
|
||||
final Song song = songStore.get(id);
|
||||
if (song == null) {
|
||||
throw new IllegalArgumentException("hi");
|
||||
}
|
||||
return new SongBean(song.getArtist(), song.getAlbum(), song.getName());
|
||||
}
|
||||
|
||||
@RequestMapping("/charts/top-five")
|
||||
public List<SongPlayCountBean> greeting(@RequestParam(value="genre") String genre) {
|
||||
return topFiveSongs(KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE);
|
||||
@SuppressWarnings("unchecked")
|
||||
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
|
||||
|
||||
HostInfo hostInfo = interactiveQueryService.getHostInfo(KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE,
|
||||
KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, new StringSerializer());
|
||||
|
||||
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
|
||||
logger.info("Top Five songs request served from same host: " + hostInfo);
|
||||
return topFiveSongs(KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE);
|
||||
}
|
||||
else {
|
||||
//find the store from the proper instance.
|
||||
logger.info("Top Five songs request served from different host: " + hostInfo);
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
return restTemplate.postForObject(
|
||||
String.format("https://%s:%d/%s", hostInfo.host(),
|
||||
hostInfo.port(), "charts/top-five?genre=Punk"), "punk", List.class);
|
||||
}
|
||||
}
|
||||
|
||||
private List<SongPlayCountBean> topFiveSongs(final String key,
|
||||
final String storeName) {
|
||||
|
||||
final ReadOnlyKeyValueStore<String, TopFiveSongs> topFiveStore =
|
||||
queryableStoreRegistry.getQueryableStoreType(storeName, QueryableStoreTypes.<String, TopFiveSongs>keyValueStore());
|
||||
interactiveQueryService.getQueryableStore(storeName, QueryableStoreTypes.<String, TopFiveSongs>keyValueStore());
|
||||
|
||||
// Get the value from the store
|
||||
final TopFiveSongs value = topFiveStore.get(key);
|
||||
@@ -187,12 +217,31 @@ public class KafkaStreamsInteractiveQuerySample {
|
||||
}
|
||||
final List<SongPlayCountBean> results = new ArrayList<>();
|
||||
value.forEach(songPlayCount -> {
|
||||
final ReadOnlyKeyValueStore<Long, Song> songStore =
|
||||
queryableStoreRegistry.getQueryableStoreType(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
|
||||
|
||||
final Song song = songStore.get(songPlayCount.getSongId());
|
||||
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
|
||||
songPlayCount.getPlays()));
|
||||
HostInfo hostInfo = interactiveQueryService.getHostInfo(KafkaStreamsInteractiveQuerySample.ALL_SONGS,
|
||||
songPlayCount.getSongId(), new LongSerializer());
|
||||
|
||||
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
|
||||
logger.info("Song info request served from same host: " + hostInfo);
|
||||
|
||||
final ReadOnlyKeyValueStore<Long, Song> songStore =
|
||||
interactiveQueryService.getQueryableStore(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
|
||||
|
||||
final Song song = songStore.get(songPlayCount.getSongId());
|
||||
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
|
||||
songPlayCount.getPlays()));
|
||||
}
|
||||
else {
|
||||
logger.info("Song info request served from different host: " + hostInfo);
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
SongBean song = restTemplate.postForObject(
|
||||
String.format("https://%s:%d/%s", hostInfo.host(),
|
||||
hostInfo.port(), "song/idx?id=" + songPlayCount.getSongId()), "id", SongBean.class);
|
||||
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
|
||||
songPlayCount.getPlays()));
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
package kafka.streams.interactive.query;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class SongBean {
|
||||
|
||||
private String artist;
|
||||
private String album;
|
||||
private String name;
|
||||
|
||||
public SongBean() {}
|
||||
|
||||
public SongBean(final String artist, final String album, final String name) {
|
||||
|
||||
this.artist = artist;
|
||||
this.album = album;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getArtist() {
|
||||
return artist;
|
||||
}
|
||||
|
||||
public void setArtist(final String artist) {
|
||||
this.artist = artist;
|
||||
}
|
||||
|
||||
public String getAlbum() {
|
||||
return album;
|
||||
}
|
||||
|
||||
public void setAlbum(final String album) {
|
||||
this.album = album;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(final String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SongBean{" +
|
||||
"artist='" + artist + '\'' +
|
||||
", album='" + album + '\'' +
|
||||
", name='" + name + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final SongBean that = (SongBean) o;
|
||||
return Objects.equals(artist, that.artist) &&
|
||||
Objects.equals(album, that.album) &&
|
||||
Objects.equals(name, that.name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(artist, album, name);
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -2,12 +2,10 @@ spring.cloud.stream.bindings.input:
|
||||
destination: play-events
|
||||
consumer:
|
||||
useNativeDecoding: true
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.inputX:
|
||||
destination: song-feed
|
||||
consumer:
|
||||
useNativeDecoding: true
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.bindings.input:
|
||||
consumer:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
@@ -19,7 +17,10 @@ spring.cloud.stream.kafka.streams.bindings.inputX:
|
||||
materializedAs: all-songs
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
configuration:
|
||||
schema.registry.url: http://localhost:8081
|
||||
commit.interval.ms: 1000
|
||||
commit.interval.ms: 1000
|
||||
spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
|
||||
spring.cloud.stream.kafka.streams.binder.minPartitionCount: 4
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.application.server: localhost:8080
|
||||
spring.applicaiton.name: kafka-streams-iq-advanced-sample
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-interactive-query-basic</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Spring Cloud Stream sample for KStream interactive queries</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -17,8 +17,12 @@
|
||||
package kafka.streams.product.tracker;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
||||
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -28,7 +32,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.kafka.support.serializer.JsonSerde;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
@@ -54,7 +58,8 @@ public class KafkaStreamsInteractiveQueryApplication {
|
||||
private static final String STORE_NAME = "prod-id-count-store";
|
||||
|
||||
@Autowired
|
||||
private QueryableStoreRegistry queryableStoreRegistry;
|
||||
private InteractiveQueryService queryService;
|
||||
|
||||
|
||||
@Autowired
|
||||
ProductTrackerProperties productTrackerProperties;
|
||||
@@ -68,8 +73,10 @@ public class KafkaStreamsInteractiveQueryApplication {
|
||||
return input
|
||||
.filter((key, product) -> productIds().contains(product.getId()))
|
||||
.map((key, value) -> new KeyValue<>(value.id, value))
|
||||
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
|
||||
.count(STORE_NAME)
|
||||
.groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
|
||||
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
|
||||
.withKeySerde(Serdes.Integer())
|
||||
.withValueSerde(Serdes.Long()))
|
||||
.toStream();
|
||||
}
|
||||
|
||||
@@ -82,7 +89,7 @@ public class KafkaStreamsInteractiveQueryApplication {
|
||||
@Scheduled(fixedRate = 30000, initialDelay = 5000)
|
||||
public void printProductCounts() {
|
||||
if (keyValueStore == null) {
|
||||
keyValueStore = queryableStoreRegistry.getQueryableStoreType(STORE_NAME, QueryableStoreTypes.keyValueStore());
|
||||
keyValueStore = queryService.getQueryableStore(STORE_NAME, QueryableStoreTypes.keyValueStore());
|
||||
}
|
||||
|
||||
for (Integer id : productIds()) {
|
||||
|
||||
@@ -10,12 +10,10 @@ spring.cloud.stream.kafka.streams:
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: product-counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
useNativeEncoding: true
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: products
|
||||
consumer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
spring.application.name: kafka-streams-iq-basic-sample
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-message-channel</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,20 +5,12 @@ spring.cloud.stream.kafka.streams.binder.configuration:
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.bindings.singleOutput:
|
||||
destination: counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
#useNativeEncoding: true
|
||||
spring.cloud.stream.bindings.binding2:
|
||||
destination: words
|
||||
consumer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.binding1:
|
||||
destination: words
|
||||
consumer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
|
||||
spring.applicaiton.name: kafka-streams-message-channel-sample
|
||||
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-product-tracker</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -18,6 +18,8 @@ package kafka.streams.product.tracker;
|
||||
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
@@ -60,8 +62,9 @@ public class KafkaStreamsProductTrackerApplication {
|
||||
return input
|
||||
.filter((key, product) -> productIds().contains(product.getId()))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
|
||||
.count(timeWindows, "product-counts")
|
||||
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
|
||||
.windowedBy(timeWindows)
|
||||
.count(Materialized.as("product-counts"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
|
||||
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),
|
||||
|
||||
@@ -8,13 +8,8 @@ spring.cloud.stream.kafka.streams:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: product-counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
#useNativeEncoding: true
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: products
|
||||
consumer:
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
spring.applicaiton.name: kafka-streams-product-tracker-sample
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-table-join</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -44,7 +44,7 @@ public class KafkaStreamsTableJoin {
|
||||
@StreamListener
|
||||
@SendTo("output")
|
||||
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
|
||||
@Input("inputX") KTable<String, String> userRegionsTable) {
|
||||
@Input("inputTable") KTable<String, String> userRegionsTable) {
|
||||
|
||||
return userClicksStream
|
||||
.leftJoin(userRegionsTable,
|
||||
@@ -60,8 +60,8 @@ public class KafkaStreamsTableJoin {
|
||||
|
||||
interface KStreamProcessorX extends KafkaStreamsProcessor {
|
||||
|
||||
@Input("inputX")
|
||||
KTable<?, ?> inputX();
|
||||
@Input("inputTable")
|
||||
KTable<?, ?> inputKTable();
|
||||
}
|
||||
|
||||
private static final class RegionWithClicks {
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -1,23 +1,22 @@
|
||||
spring.application.name: stream-table-sample
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: user-clicks3
|
||||
consumer:
|
||||
useNativeDecoding: true
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.inputX:
|
||||
spring.cloud.stream.bindings.inputTable:
|
||||
destination: user-regions
|
||||
contentType: application/avro
|
||||
consumer:
|
||||
useNativeDecoding: true
|
||||
headerMode: raw
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: output-topic
|
||||
producer:
|
||||
useNativeEncoding: true
|
||||
headerMode: raw
|
||||
spring.cloud.stream.kafka.streams.bindings.input:
|
||||
consumer:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
|
||||
spring.cloud.stream.kafka.streams.bindings.inputX:
|
||||
spring.cloud.stream.kafka.streams.bindings.inputTable:
|
||||
consumer:
|
||||
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
@@ -27,7 +26,6 @@ spring.cloud.stream.kafka.streams.bindings.output:
|
||||
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost #192.168.99.100
|
||||
zkNodes: localhost #192.168.99.100
|
||||
configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
|
||||
24
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/.gitignore
vendored
Normal file
24
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/.gitignore
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
target/
|
||||
!.mvn/wrapper/maven-wrapper.jar
|
||||
|
||||
### STS ###
|
||||
.apt_generated
|
||||
.classpath
|
||||
.factorypath
|
||||
.project
|
||||
.settings
|
||||
.springBeans
|
||||
|
||||
### IntelliJ IDEA ###
|
||||
.idea
|
||||
*.iws
|
||||
*.iml
|
||||
*.ipr
|
||||
|
||||
### NetBeans ###
|
||||
nbproject/private/
|
||||
build/
|
||||
nbbuild/
|
||||
dist/
|
||||
nbdist/
|
||||
.nb-gradle/
|
||||
BIN
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/.mvn/wrapper/maven-wrapper.jar
vendored
Normal file
BIN
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/.mvn/wrapper/maven-wrapper.jar
vendored
Normal file
Binary file not shown.
@@ -0,0 +1 @@
|
||||
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip
|
||||
@@ -0,0 +1,32 @@
|
||||
== What is this app?
|
||||
|
||||
This application contains two processors, a regular Kafka Streams processor and another one that consumes data from Kafka and produces into Rabbitmq.
|
||||
|
||||
The example is based on the word count application from the https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java[reference documentation].
|
||||
The application receives from Kafka through KStream and output to Kafka through KStream.
|
||||
Then another processor listens from the same topic where the kafka streams processor output data and then it sends data to a Rabbit exchange.
|
||||
There is a convenient test processor provided as part of the application that logs messages from the Rabbit destination.
|
||||
|
||||
=== Running the app:
|
||||
|
||||
Go to the root of the repository and do:
|
||||
|
||||
`docker-compose up -d`
|
||||
(This starts both Kafka and Rabbitmq in docker containers)
|
||||
|
||||
`./mvnw clean package`
|
||||
|
||||
`java -jar target/kafka-streams-to-rabbitmq-message-channel-0.0.1-SNAPSHOT.jar`
|
||||
|
||||
Issue the following commands:
|
||||
|
||||
`docker exec -it kafka-streams-multibinder /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic words`
|
||||
|
||||
On another terminal:
|
||||
|
||||
`docker exec -it kafka-streams-multibinder /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic counts`
|
||||
|
||||
Enter some text in the console producer (the first terminal above) and watch the output in the console consumer (second terminal).
|
||||
|
||||
Also watch the console for logging statements from the test consumer that listens from the Rabbit exchange.
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
version: '3'
|
||||
services:
|
||||
kafka:
|
||||
image: wurstmeister/kafka
|
||||
container_name: kafka-streams-multibinder
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
|
||||
- KAFKA_ADVERTISED_PORT=9092
|
||||
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
depends_on:
|
||||
- zookeeper
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
- KAFKA_ADVERTISED_HOST_NAME=zookeeper
|
||||
rabbitmq:
|
||||
image: rabbitmq:management
|
||||
container_name: rabbit-multibinder-1
|
||||
ports:
|
||||
- 5672:5672
|
||||
- 15672:15672
|
||||
225
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/mvnw
vendored
Executable file
225
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/mvnw
vendored
Executable file
@@ -0,0 +1,225 @@
|
||||
#!/bin/sh
|
||||
# ----------------------------------------------------------------------------
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Maven2 Start Up Batch script
|
||||
#
|
||||
# Required ENV vars:
|
||||
# ------------------
|
||||
# JAVA_HOME - location of a JDK home dir
|
||||
#
|
||||
# Optional ENV vars
|
||||
# -----------------
|
||||
# M2_HOME - location of maven2's installed home dir
|
||||
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
# e.g. to debug Maven itself, use
|
||||
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
if [ -z "$MAVEN_SKIP_RC" ] ; then
|
||||
|
||||
if [ -f /etc/mavenrc ] ; then
|
||||
. /etc/mavenrc
|
||||
fi
|
||||
|
||||
if [ -f "$HOME/.mavenrc" ] ; then
|
||||
. "$HOME/.mavenrc"
|
||||
fi
|
||||
|
||||
fi
|
||||
|
||||
# OS specific support. $var _must_ be set to either true or false.
|
||||
cygwin=false;
|
||||
darwin=false;
|
||||
mingw=false
|
||||
case "`uname`" in
|
||||
CYGWIN*) cygwin=true ;;
|
||||
MINGW*) mingw=true;;
|
||||
Darwin*) darwin=true
|
||||
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
|
||||
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
if [ -x "/usr/libexec/java_home" ]; then
|
||||
export JAVA_HOME="`/usr/libexec/java_home`"
|
||||
else
|
||||
export JAVA_HOME="/Library/Java/Home"
|
||||
fi
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
if [ -r /etc/gentoo-release ] ; then
|
||||
JAVA_HOME=`java-config --jre-home`
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$M2_HOME" ] ; then
|
||||
## resolve links - $0 may be a link to maven's home
|
||||
PRG="$0"
|
||||
|
||||
# need this for relative symlinks
|
||||
while [ -h "$PRG" ] ; do
|
||||
ls=`ls -ld "$PRG"`
|
||||
link=`expr "$ls" : '.*-> \(.*\)$'`
|
||||
if expr "$link" : '/.*' > /dev/null; then
|
||||
PRG="$link"
|
||||
else
|
||||
PRG="`dirname "$PRG"`/$link"
|
||||
fi
|
||||
done
|
||||
|
||||
saveddir=`pwd`
|
||||
|
||||
M2_HOME=`dirname "$PRG"`/..
|
||||
|
||||
# make it fully qualified
|
||||
M2_HOME=`cd "$M2_HOME" && pwd`
|
||||
|
||||
cd "$saveddir"
|
||||
# echo Using m2 at $M2_HOME
|
||||
fi
|
||||
|
||||
# For Cygwin, ensure paths are in UNIX format before anything is touched
|
||||
if $cygwin ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --unix "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
|
||||
fi
|
||||
|
||||
# For Migwn, ensure paths are in UNIX format before anything is touched
|
||||
if $mingw ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME="`(cd "$M2_HOME"; pwd)`"
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
|
||||
# TODO classpath?
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
javaExecutable="`which javac`"
|
||||
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
|
||||
# readlink(1) is not available as standard on Solaris 10.
|
||||
readLink=`which readlink`
|
||||
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
|
||||
if $darwin ; then
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
|
||||
else
|
||||
javaExecutable="`readlink -f \"$javaExecutable\"`"
|
||||
fi
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
|
||||
JAVA_HOME="$javaHome"
|
||||
export JAVA_HOME
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$JAVACMD" ] ; then
|
||||
if [ -n "$JAVA_HOME" ] ; then
|
||||
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
|
||||
# IBM's JDK on AIX uses strange locations for the executables
|
||||
JAVACMD="$JAVA_HOME/jre/sh/java"
|
||||
else
|
||||
JAVACMD="$JAVA_HOME/bin/java"
|
||||
fi
|
||||
else
|
||||
JAVACMD="`which java`"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ ! -x "$JAVACMD" ] ; then
|
||||
echo "Error: JAVA_HOME is not defined correctly." >&2
|
||||
echo " We cannot execute $JAVACMD" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
echo "Warning: JAVA_HOME environment variable is not set."
|
||||
fi
|
||||
|
||||
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
|
||||
|
||||
# traverses directory structure from process work directory to filesystem root
|
||||
# first directory with .mvn subdirectory is considered project base directory
|
||||
find_maven_basedir() {
|
||||
|
||||
if [ -z "$1" ]
|
||||
then
|
||||
echo "Path not specified to find_maven_basedir"
|
||||
return 1
|
||||
fi
|
||||
|
||||
basedir="$1"
|
||||
wdir="$1"
|
||||
while [ "$wdir" != '/' ] ; do
|
||||
if [ -d "$wdir"/.mvn ] ; then
|
||||
basedir=$wdir
|
||||
break
|
||||
fi
|
||||
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
|
||||
if [ -d "${wdir}" ]; then
|
||||
wdir=`cd "$wdir/.."; pwd`
|
||||
fi
|
||||
# end of workaround
|
||||
done
|
||||
echo "${basedir}"
|
||||
}
|
||||
|
||||
# concatenates all lines of a file
|
||||
concat_lines() {
|
||||
if [ -f "$1" ]; then
|
||||
echo "$(tr -s '\n' ' ' < "$1")"
|
||||
fi
|
||||
}
|
||||
|
||||
BASE_DIR=`find_maven_basedir "$(pwd)"`
|
||||
if [ -z "$BASE_DIR" ]; then
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
|
||||
echo $MAVEN_PROJECTBASEDIR
|
||||
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
|
||||
|
||||
# For Cygwin, switch paths to Windows format before running java
|
||||
if $cygwin; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --path --windows "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
|
||||
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
|
||||
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
|
||||
fi
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
exec "$JAVACMD" \
|
||||
$MAVEN_OPTS \
|
||||
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
|
||||
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
|
||||
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
|
||||
143
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/mvnw.cmd
vendored
Normal file
143
kafka-streams-samples/kafka-streams-to-rabbitmq-message-channel/mvnw.cmd
vendored
Normal file
@@ -0,0 +1,143 @@
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven2 Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
@setlocal
|
||||
|
||||
set ERROR_CODE=0
|
||||
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
:init
|
||||
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
:endDetectBaseDir
|
||||
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
:endReadAdditionalConfig
|
||||
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
|
||||
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
@@ -0,0 +1,50 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-to-rabbitmq-message-channel</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>kafka-streams-to-rabbitmq-message-channel</name>
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.streams.message.channel;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
|
||||
@SpringBootApplication
|
||||
public class KafkaStreamsWordCountApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KafkaStreamsWordCountApplication.class, args);
|
||||
}
|
||||
|
||||
@EnableBinding(MultipleProcessor.class)
|
||||
public static class WordCountProcessorApplication {
|
||||
|
||||
@StreamListener("kstreamIn")
|
||||
@SendTo("kstreamOut")
|
||||
public KStream<?, WordCount> process(KStream<Object, String> input) {
|
||||
|
||||
return input
|
||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(20_000))
|
||||
.count(Materialized.as("WordCounts-1"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
|
||||
}
|
||||
|
||||
@StreamListener("fromKafka")
|
||||
@SendTo("toRabbit")
|
||||
public WordCount sink(WordCount input) {
|
||||
return input;
|
||||
}
|
||||
|
||||
@StreamListener("testInputFromRabbit")
|
||||
public void receive(String data) {
|
||||
System.out.println("Data received..." + data);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
interface MultipleProcessor {
|
||||
|
||||
String KSTREAM_IN = "kstreamIn";
|
||||
String KSTREAM_OUT = "kstreamOut";
|
||||
|
||||
String FROM_KAFKA = "fromKafka";
|
||||
String TO_RABBIT = "toRabbit";
|
||||
|
||||
String TEST_INPUT_FROM_RABBIT = "testInputFromRabbit";
|
||||
|
||||
@Input(KSTREAM_IN)
|
||||
KStream<?, ?> kstreamIn();
|
||||
|
||||
@Output(KSTREAM_OUT)
|
||||
KStream<?, ?> kstreamOut();
|
||||
|
||||
@Input(FROM_KAFKA)
|
||||
SubscribableChannel fromKafka();
|
||||
|
||||
@Output(TO_RABBIT)
|
||||
MessageChannel toRabbit();
|
||||
|
||||
@Input(TEST_INPUT_FROM_RABBIT)
|
||||
SubscribableChannel testInputFromRabbit();
|
||||
}
|
||||
|
||||
static class WordCount {
|
||||
|
||||
private String word;
|
||||
|
||||
private long count;
|
||||
|
||||
private Date start;
|
||||
|
||||
private Date end;
|
||||
|
||||
WordCount(String word, long count, Date start, Date end) {
|
||||
this.word = word;
|
||||
this.count = count;
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
}
|
||||
|
||||
public String getWord() {
|
||||
return word;
|
||||
}
|
||||
|
||||
public void setWord(String word) {
|
||||
this.word = word;
|
||||
}
|
||||
|
||||
public long getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public void setCount(long count) {
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public Date getStart() {
|
||||
return start;
|
||||
}
|
||||
|
||||
public void setStart(Date start) {
|
||||
this.start = start;
|
||||
}
|
||||
|
||||
public Date getEnd() {
|
||||
return end;
|
||||
}
|
||||
|
||||
public void setEnd(Date end) {
|
||||
this.end = end;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
spring.cloud.stream.bindings.singleOutput.contentType: application/json
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
|
||||
spring.cloud.stream.kafka.streams:
|
||||
binder.configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
bindings.kstreamIn.consumer.application-id: kafka-streams-to-rabbitmq
|
||||
spring.cloud.stream.bindings.kstreamIn:
|
||||
destination: words
|
||||
spring.cloud.stream.bindings.kstreamOut:
|
||||
destination: counts
|
||||
spring.cloud.stream.bindings.fromKafka:
|
||||
destination: counts
|
||||
binder: kafka
|
||||
spring.cloud.stream.bindings.toRabbit:
|
||||
destination: countsInRabbit
|
||||
binder: rabbit
|
||||
|
||||
spring.cloud.stream.bindings.testInputFromRabbit:
|
||||
destination: countsInRabbit
|
||||
binder: rabbit
|
||||
|
||||
spring.cloud.stream.kafka.streams.binder:
|
||||
brokers: localhost
|
||||
@@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<root level="INFO">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
|
||||
</configuration>
|
||||
@@ -0,0 +1,18 @@
|
||||
package kafka.streams.message.channel;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
public class KafkaStreamsWordCountApplicationTests {
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void contextLoads() {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kafka-streams-word-count</artifactId>
|
||||
@@ -11,7 +11,7 @@
|
||||
<description>Demo project for Spring Boot</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
@@ -31,6 +31,17 @@
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams-test-utils</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -17,6 +17,7 @@
|
||||
package kafka.streams.word.count;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
@@ -42,15 +43,19 @@ public class KafkaStreamsWordCountApplication {
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
public static class WordCountProcessorApplication {
|
||||
|
||||
@StreamListener("input")
|
||||
@SendTo("output")
|
||||
public KStream<?, WordCount> process(KStream<Object, String> input) {
|
||||
public static final String INPUT_TOPIC = "input";
|
||||
public static final String OUTPUT_TOPIC = "output";
|
||||
public static final int WINDOW_SIZE_MS = 30000;
|
||||
|
||||
@StreamListener(INPUT_TOPIC)
|
||||
@SendTo(OUTPUT_TOPIC)
|
||||
public KStream<Bytes, WordCount> process(KStream<Bytes, String> input) {
|
||||
|
||||
return input
|
||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(30000))
|
||||
.windowedBy(TimeWindows.of(WINDOW_SIZE_MS))
|
||||
.count(Materialized.as("WordCounts-1"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
|
||||
@@ -67,6 +72,21 @@ public class KafkaStreamsWordCountApplication {
|
||||
|
||||
private Date end;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuffer sb = new StringBuffer("WordCount{");
|
||||
sb.append("word='").append(word).append('\'');
|
||||
sb.append(", count=").append(count);
|
||||
sb.append(", start=").append(start);
|
||||
sb.append(", end=").append(end);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
WordCount() {
|
||||
|
||||
}
|
||||
|
||||
WordCount(String word, long count, Date start, Date end) {
|
||||
this.word = word;
|
||||
this.count = count;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -1,17 +1,14 @@
|
||||
spring.cloud.stream.bindings.output.contentType: application/json
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
|
||||
spring.cloud.stream.kafka.streams.binder.configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.kafka.streams:
|
||||
binder.configuration:
|
||||
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
bindings.input.consumer.application-id: basic-word-count
|
||||
spring.cloud.stream.bindings.output:
|
||||
destination: counts
|
||||
producer:
|
||||
headerMode: raw
|
||||
#useNativeEncoding: true
|
||||
spring.cloud.stream.bindings.input:
|
||||
destination: words
|
||||
consumer:
|
||||
headerMode: raw
|
||||
#For testing
|
||||
spring.cloud.stream.bindings.input1.destination: counts
|
||||
spring.cloud.stream.bindings.output1.destination: words
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<root level="INFO">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
|
||||
</configuration>
|
||||
@@ -1,18 +1,93 @@
|
||||
/*
|
||||
* Copyright 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.streams.word.count;
|
||||
|
||||
import org.junit.Ignore;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
@SpringBootTest(
|
||||
webEnvironment = SpringBootTest.WebEnvironment.NONE,
|
||||
properties = {"server.port=0",
|
||||
"spring.jmx.enabled=false",
|
||||
"spring.cloud.stream.bindings.input.destination=words",
|
||||
"spring.cloud.stream.bindings.output.destination=counts",
|
||||
"spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count",
|
||||
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde"})
|
||||
public class KafkaStreamsWordCountApplicationTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "words", "counts");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
|
||||
|
||||
private static Consumer<String, String> consumer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false", embeddedKafka);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
|
||||
consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts");
|
||||
//Since there are both binders present in this app, we resort to the spring kafka broker property.
|
||||
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
consumer.close();
|
||||
System.clearProperty("spring.kafka.bootstrap-servers");
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void contextLoads() {
|
||||
public void testKafkaStreamsWordCountProcessor() throws Exception {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("words");
|
||||
template.sendDefault("foobar");
|
||||
ConsumerRecords<String, String> cr = KafkaTestUtils.getRecords(consumer);
|
||||
assertThat(cr.count()).isGreaterThanOrEqualTo(1);
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,162 @@
|
||||
/*
|
||||
* Copyright 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.streams.word.count;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.TopologyTestDriver;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.test.ConsumerRecordFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.kafka.support.serializer.JsonSerde;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* TopologyTestDriver based test about stream processing of {@link KafkaStreamsWordCountApplication}
|
||||
*
|
||||
* @author Jukka Karvanen / jukinimi.com
|
||||
*/
|
||||
|
||||
public class WordCountProcessorApplicationTests {
|
||||
private TopologyTestDriver testDriver;
|
||||
public static final String INPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.INPUT_TOPIC;
|
||||
public static final String OUTPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.OUTPUT_TOPIC;
|
||||
|
||||
final Serde<String> stringSerde = Serdes.String();
|
||||
final JsonSerde<KafkaStreamsWordCountApplication.WordCount> countSerde = new JsonSerde<>(KafkaStreamsWordCountApplication.WordCount.class);
|
||||
final Serde<Bytes> nullSerde = Serdes.Bytes(); //Serde for not used key
|
||||
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(
|
||||
stringSerde.serializer(), stringSerde.serializer()); //Key feed in as string, even read as Bytes
|
||||
|
||||
static Properties getStreamsConfiguration() {
|
||||
final Properties streamsConfiguration = new Properties();
|
||||
// Need to be set even these do not matter with TopologyTestDriver
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
|
||||
return streamsConfiguration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup Stream topology
|
||||
* Add KStream based on @StreamListener annotation
|
||||
* Add to(topic) based @SendTo annotation
|
||||
*/
|
||||
@Before
|
||||
public void setup() {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
KStream<Bytes, String> input = builder.stream(INPUT_TOPIC, Consumed.with(nullSerde, stringSerde));
|
||||
KafkaStreamsWordCountApplication.WordCountProcessorApplication app = new KafkaStreamsWordCountApplication.WordCountProcessorApplication();
|
||||
KStream<Bytes, KafkaStreamsWordCountApplication.WordCount> output = app.process(input);
|
||||
output.to(OUTPUT_TOPIC, Produced.with(nullSerde, countSerde));
|
||||
testDriver = new TopologyTestDriver(builder.build(), getStreamsConfiguration());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
try {
|
||||
testDriver.close();
|
||||
} catch (final RuntimeException e) {
|
||||
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
|
||||
// Logged stacktrace cannot be avoided
|
||||
System.out.println("Ignoring exception, test failing in Windows due this exception:" + e.getLocalizedMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read one Record from output topic.
|
||||
*
|
||||
* @return ProducerRecord containing WordCount as value
|
||||
*/
|
||||
private ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> readOutput() {
|
||||
return testDriver.readOutput(OUTPUT_TOPIC, nullSerde.deserializer(), countSerde.deserializer());
|
||||
}
|
||||
|
||||
/**
|
||||
* Read counts from output to map ignoring start and end dates
|
||||
* If existing word is incremented, it can appear twice in output and is replaced in map
|
||||
*
|
||||
* @return Map of Word and counts
|
||||
*/
|
||||
private Map<String, Long> getOutputList() {
|
||||
final Map<String, Long> output = new HashMap<>();
|
||||
ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> outputRow;
|
||||
while ((outputRow = readOutput()) != null) {
|
||||
output.put(outputRow.value().getWord(), outputRow.value().getCount());
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple test validating count of one word
|
||||
*/
|
||||
@Test
|
||||
public void testOneWord() {
|
||||
final String nullKey = null;
|
||||
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
|
||||
testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, nullKey, "Hello", 1L));
|
||||
//Read and validate output
|
||||
final ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> output = readOutput();
|
||||
assertThat(output).isNotNull();
|
||||
assertThat(output.value()).isEqualToComparingFieldByField(new KafkaStreamsWordCountApplication.WordCount("hello", 1L, new Date(0), new Date(KafkaStreamsWordCountApplication.WordCountProcessorApplication.WINDOW_SIZE_MS)));
|
||||
//No more output in topic
|
||||
assertThat(readOutput()).isNull();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Word count of sentence list.
|
||||
*/
|
||||
@Test
|
||||
public void shouldCountWords() {
|
||||
final List<String> inputLines = Arrays.asList(
|
||||
"Kafka Streams Examples",
|
||||
"Spring Cloud Stream Sample",
|
||||
"Using Kafka Streams Test Utils"
|
||||
);
|
||||
final List<KeyValue<String, String>> inputRecords = inputLines.stream().map(v -> new KeyValue<String, String>(null, v)).collect(Collectors.toList());
|
||||
|
||||
final Map<String, Long> expectedWordCounts = new HashMap<>();
|
||||
expectedWordCounts.put("spring", 1L);
|
||||
expectedWordCounts.put("cloud", 1L);
|
||||
expectedWordCounts.put("examples", 1L);
|
||||
expectedWordCounts.put("sample", 1L);
|
||||
expectedWordCounts.put("streams", 2L);
|
||||
expectedWordCounts.put("stream", 1L);
|
||||
expectedWordCounts.put("test", 1L);
|
||||
expectedWordCounts.put("utils", 1L);
|
||||
expectedWordCounts.put("kafka", 2L);
|
||||
expectedWordCounts.put("using", 1L);
|
||||
|
||||
testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, inputRecords, 1L, 1000L)); //All feed in same 30s time window
|
||||
final Map<String, Long> actualWordCounts = getOutputList();
|
||||
assertThat(actualWordCounts).containsAllEntriesOf(expectedWordCounts).hasSameSizeAs(expectedWordCounts);
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>kafka-streams-samples</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
@@ -13,10 +13,13 @@
|
||||
<module>kafka-streams-branching</module>
|
||||
<module>kafka-streams-dlq-sample</module>
|
||||
<module>kafka-streams-table-join</module>
|
||||
<module>kafka-streams-global-table-join</module>
|
||||
<module>kafka-streams-interactive-query-basic</module>
|
||||
<module>kafka-streams-interactive-query-advanced</module>
|
||||
<module>kafka-streams-message-channel</module>
|
||||
<module>kafka-streams-product-tracker</module>
|
||||
<module>kafka-streams-aggregate</module>
|
||||
<module>kafka-streams-to-rabbitmq-message-channel</module>
|
||||
</modules>
|
||||
|
||||
</project>
|
||||
|
||||
2
kinesis-samples/kinesis-produce-consume/mvnw
vendored
2
kinesis-samples/kinesis-produce-consume/mvnw
vendored
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>kinesis-produce-consume</artifactId>
|
||||
@@ -10,7 +10,7 @@
|
||||
<description>Spring Cloud Stream Kinesis Sample</description>
|
||||
|
||||
<parent>
|
||||
<groupId>spring.cloud.stream.samples</groupId>
|
||||
<groupId>io.spring.cloud.stream.sample</groupId>
|
||||
<artifactId>spring-cloud-stream-samples-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../..</relativePath>
|
||||
@@ -19,8 +19,8 @@
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-kinesis</artifactId>
|
||||
<version>1.0.0.BUILD-SNAPSHOT</version>
|
||||
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
|
||||
<version>1.0.0.RELEASE</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
1
kinesis-samples/kinesis-to-webflux/.mvn
Normal file
1
kinesis-samples/kinesis-to-webflux/.mvn
Normal file
@@ -0,0 +1 @@
|
||||
../../.mvn
|
||||
10
kinesis-samples/kinesis-to-webflux/README.adoc
Normal file
10
kinesis-samples/kinesis-to-webflux/README.adoc
Normal file
@@ -0,0 +1,10 @@
|
||||
== Reactive Spring Cloud Stream AWS Kinesis Binder to SSE via WebFlux
|
||||
|
||||
This sample demonstrate a simple bridging of AWS Kinesis stream records to the Server Side Events subscribers.
|
||||
The `@StreamListener` sink side is based on the Spring Cloud Stream Reactive support, streaming incoming messages to the `Flux` argument which, in turn, is used as a source for the `@GetMapping` controller.
|
||||
|
||||
The `CloudStreamKinesisToWebfluxApplicationTests` demonstrates:
|
||||
|
||||
- an `AmazonKinesisAsync` client configured against local Kineselite on the `4568` port;
|
||||
- the `TestSource` binding for producing records into the Kinesis stream;
|
||||
- a `WebTestClient` to perform SSE request against embedded Netty started by the `CloudStreamKinesisToWebfluxApplication` Spring Boot application on the random port and subsequent verification of the data produced by the `Flux` from the `@StreamListener` against Kinesis Binder consumer.
|
||||
2
schema-registry-samples/schema-registry-confluent/producer2-confluent/mvnw → kinesis-samples/kinesis-to-webflux/mvnw
vendored
Executable file → Normal file
2
schema-registry-samples/schema-registry-confluent/producer2-confluent/mvnw → kinesis-samples/kinesis-to-webflux/mvnw
vendored
Executable file → Normal file
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user