Compare commits

..

4 Commits

Author SHA1 Message Date
Spring Operator
c9c596002e URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* [ ] http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* [ ] http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 86 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 43 occurrences
2019-04-24 12:52:10 -04:00
Spring Operator
47900bd265 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* [ ] http://docs.spring.io/spring-kafka/reference/htmlsingle/ (301) with 2 occurrences migrated to:
  https://docs.spring.io/spring-kafka/reference/htmlsingle/ ([https](https://docs.spring.io/spring-kafka/reference/htmlsingle/) result 404).

# Ignored
These URLs were intentionally ignored.

* http://127.0.0.1:8081/config with 1 occurrences
* http://localhost:15672 with 1 occurrences
* http://localhost:64398/ with 1 occurrences
* http://localhost:64399/orders with 1 occurrences
* http://localhost:8080 with 8 occurrences
* http://localhost:8080/ with 1 occurrences
* http://localhost:8080/charts/top-five?genre=Punk with 1 occurrences
* http://localhost:8080/events with 1 occurrences
* http://localhost:8081 with 8 occurrences
* http://localhost:8990 with 3 occurrences
* http://localhost:9009/messages with 6 occurrences
* http://localhost:9009/messagesX with 1 occurrences
* http://localhost:9010/messages with 4 occurrences
* http://localhost:9010/messagesX with 1 occurrences
2019-04-24 12:51:23 -04:00
Spring Operator
5148dabe5a URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* http://packages.confluent.io/maven/ (404) with 4 occurrences migrated to:
  https://packages.confluent.io/maven/ ([https](https://packages.confluent.io/maven/) result 404).

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://maven.apache.org/xsd/maven-4.0.0.xsd with 43 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 66 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* http://repo.spring.io/libs-release-local with 1 occurrences migrated to:
  https://repo.spring.io/libs-release-local ([https](https://repo.spring.io/libs-release-local) result 302).
* http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 86 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 43 occurrences
2019-03-21 15:20:06 -04:00
Spring Operator
5f9e842b83 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 67 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-21 15:17:24 -04:00
288 changed files with 408 additions and 12187 deletions

2
.gitignore vendored
View File

@@ -18,7 +18,7 @@ _site/
*.iml
*.ipr
*.iws
.idea/
.idea/*
*/.idea
.factorypath
spring-xd-samples/*/xd

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -1,8 +1,10 @@
spring.application.name: kafka-streams-aggregate-sample
spring.cloud.stream.bindings.input:
destination: foobar
consumer:
headerMode: raw
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
zkNodes: localhost #192.168.99.100
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$BytesSerde

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -7,13 +7,21 @@ spring.cloud.stream.kafka.streams.binder.configuration:
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output1:
destination: english-counts
producer:
headerMode: raw
spring.cloud.stream.bindings.output2:
destination: french-counts
producer:
headerMode: raw
spring.cloud.stream.bindings.output3:
destination: spanish-counts
producer:
headerMode: raw
spring.cloud.stream.bindings.input:
destination: words
group: group1
consumer:
headerMode: raw
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100 #localhost
spring.application.name: kafka-streams-branching-sample
zkNodes: localhost #192.168.99.100 #localhost

View File

@@ -15,7 +15,7 @@ This sample uses lambda expressions and thus requires Java 8+.
Go to the root of the repository and do: `./mvnw clean package`
`java -jar target/kafka-streams-dlq-sample-0.0.1-SNAPSHOT.jar`
`java -jar target/kstream-word-count-0.0.1-SNAPSHOT.jar`
The default application.yml file demonstrates native decoding by Kafka.
The default value serializer is set to IntegerSerde to force a deserialization errors.
@@ -38,4 +38,4 @@ You will not see any messages coming to the regular destination counts.
There is another yaml file provided (by-framework-decoding.yml).
Use that as application.yml to see how it works when the deserialization done by the framework.
In this case also, the messages on error appear in the DLQ topic.
In this case also, the messages on error appear in the DLQ topic.

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -19,8 +19,6 @@ package kafka.streams.dlq.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
@@ -43,6 +41,9 @@ public class KafkaStreamsDlqSample {
@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
@@ -50,9 +51,8 @@ public class KafkaStreamsDlqSample {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-1"))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts-1")
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}

View File

@@ -1,20 +1,25 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
application.id: default
brokers: localhost #192.168.99.100
serdeError: sendToDlq
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
application.id: default
spring.cloud.stream.bindings.output:
destination: counts
producer:
headerMode: raw
#useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: words
group: group1
consumer:
headerMode: raw
useNativeDecoding: true
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
zkNodes: localhost #192.168.99.100
serdeError: sendToDlq

View File

@@ -1,12 +1,9 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
serdeError: sendToDlq
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
application.id: default
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
application.id: default
spring.cloud.stream.bindings.output:
destination: counts
producer:
@@ -20,9 +17,9 @@ spring.cloud.stream.bindings.input:
headerMode: raw
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: words-count-dlq
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
zkNodes: localhost #192.168.99.100
serdeError: sendToDlq

View File

@@ -1 +0,0 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -1,25 +0,0 @@
== What is this app?
This is an example of a Spring Cloud Stream processor using Kafka Streams support.
The application uses two inputs - one KStream for user-clicks and a GlobalKTable for user-regions.
Then it joins the information from stream to table to find out total clicks per region. You could compare the this with ktable join sample.
=== Running the app:
Go to the root of the repository.
`docker-compose up -d`
`./mvnw clean package`
`java -jar target/kafka-streams-global-table-join-0.0.1-SNAPSHOT.jar`
`docker exec -it kafka-join /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic user-regions`
`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic user-regions --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-"`
`docker exec -it kafka-join /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic output-topic --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.separator="-"`
Run the stand-alone `Producers` application to generate some data and watch the output on the console consumer above.

View File

@@ -1,19 +0,0 @@
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka-join
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper

View File

@@ -1,42 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-streams-global-table-join</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-streams-global-table-join</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,96 +0,0 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.streams.globalktable.join;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Serialized;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* This is the PR that added this sample:
* https://github.com/spring-cloud/spring-cloud-stream-samples/pull/112
*/
@SpringBootApplication
public class KafkaStreamsGlobalKTableJoin {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsGlobalKTableJoin.class, args);
}
@EnableBinding(KStreamProcessorX.class)
public static class KStreamToTableJoinApplication {
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") GlobalKTable<String, String> userRegionsTable) {
return userClicksStream
.leftJoin(userRegionsTable,
(name,value) -> name,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks)
)
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
.toStream();
}
}
interface KStreamProcessorX extends KafkaStreamsProcessor {
@Input("inputTable")
GlobalKTable<?, ?> inputKTable();
}
private static final class RegionWithClicks {
private final String region;
private final long clicks;
public RegionWithClicks(String region, long clicks) {
if (region == null || region.isEmpty()) {
throw new IllegalArgumentException("region must be set");
}
if (clicks < 0) {
throw new IllegalArgumentException("clicks must not be negative");
}
this.region = region;
this.clicks = clicks;
}
public String getRegion() {
return region;
}
public long getClicks() {
return clicks;
}
}
}

View File

@@ -1,89 +0,0 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.streams.globalktable.join;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author Soby Chacko
*/
public class Producers {
public static void main(String... args) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
List<KeyValue<String, Long>> userClicks = Arrays.asList(
new KeyValue<>("alice", 13L),
new KeyValue<>("bob", 4L),
new KeyValue<>("chao", 25L),
new KeyValue<>("bob", 19L),
new KeyValue<>("dave", 56L),
new KeyValue<>("eve", 78L),
new KeyValue<>("alice", 40L),
new KeyValue<>("fang", 99L)
);
DefaultKafkaProducerFactory<String, Long> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, Long> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("user-clicks3");
for (KeyValue<String,Long> keyValue : userClicks) {
template.sendDefault(keyValue.key, keyValue.value);
}
List<KeyValue<String, String>> userRegions = Arrays.asList(
new KeyValue<>("alice", "asia"), /* Alice lived in Asia originally... */
new KeyValue<>("bob", "americas"),
new KeyValue<>("chao", "asia"),
new KeyValue<>("dave", "europe"),
new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
new KeyValue<>("eve", "americas"),
new KeyValue<>("fang", "asia")
);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf1 = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template1 = new KafkaTemplate<>(pf1, true);
template1.setDefaultTopic("user-regions");
for (KeyValue<String,String> keyValue : userRegions) {
template1.sendDefault(keyValue.key, keyValue.value);
}
}
}

View File

@@ -1,33 +0,0 @@
spring.application.name: stream-global-table-sample
spring.cloud.stream.bindings.input:
destination: user-clicks3
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.inputTable:
destination: user-regions
contentType: application/avro
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeEncoding: true
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.bindings.inputTable:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
materializedAs: all-regions
spring.cloud.stream.kafka.streams.bindings.output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -1,18 +0,0 @@
package kafka.streams.globalktable.join;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaStreamsGlobalKTableJoinTests {
@Test
@Ignore
public void contextLoads() {
}
}

View File

@@ -9,33 +9,18 @@ There is a REST service provided as part of the application that can be used to
=== Running the app:
We will run 2 instances of the processor application to demonstrate that regardless of which instance hosts the keys, the REST endpoint will serve the requests.
For more information on how this is done, please take a look at the application code.
1. `docker-compose up -d`
2. Start the confluent schema registry: The following command is based on the confluent platform and assume that you are at the root of the confluent platform's root directory.
2. Start the confluent schema registry: The following command is based on the confluent platform.
`./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties`
3. Go to the root of the repository and do: `./mvnw clean package`
4. `java -jar target/kafka-streams-interactive-query-advanced-0.0.1-SNAPSHOT.jar`
5. On another terminal session:
`java -jar target/kafka-streams-interactive-query-advanced-0.0.1-SNAPSHOT.jar --server.port=8082 --spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8082'
4. `java -jar target/kafka-streams-interactive-query-0.0.1-SNAPSHOT.jar`
5. Run the stand-alone `Producers` application to generate data and start the processing.
Keep it running for a while.
6. Go to the URL: http://localhost:8080/charts/top-five?genre=Punk
keep refreshing the URL and you will see the song play count information changes.
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8080 that serves this request.
7. Go to the URL: http://localhost:8082/charts/top-five?genre=Punk
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8082 that serves this request.
8. Once you are done with running the sample, stop the docker containers and the schema registry.

View File

@@ -11,7 +11,7 @@
<description>Spring Cloud Stream sample for KStream interactive queries</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
@@ -47,7 +47,6 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -21,13 +21,13 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import kafka.streams.interactive.query.avro.PlayEvent;
import kafka.streams.interactive.query.avro.Song;
import kafka.streams.interactive.query.avro.SongPlayCount;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -37,11 +37,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.io.*;
import java.util.*;
@@ -55,7 +54,7 @@ public class KafkaStreamsInteractiveQuerySample {
static final String ALL_SONGS = "all-songs";
@Autowired
private InteractiveQueryService interactiveQueryService;
private QueryableStoreRegistry queryableStoreRegistry;
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsInteractiveQuerySample.class, args);
@@ -170,45 +169,16 @@ public class KafkaStreamsInteractiveQuerySample {
@RestController
public class FooController {
private final Log logger = LogFactory.getLog(getClass());
@RequestMapping("/song/idx")
public SongBean song(@RequestParam(value="id") Long id) {
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryService.getQueryableStore(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(id);
if (song == null) {
throw new IllegalArgumentException("hi");
}
return new SongBean(song.getArtist(), song.getAlbum(), song.getName());
}
@RequestMapping("/charts/top-five")
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
HostInfo hostInfo = interactiveQueryService.getHostInfo(KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE,
KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, new StringSerializer());
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
logger.info("Top Five songs request served from same host: " + hostInfo);
return topFiveSongs(KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE);
}
else {
//find the store from the proper instance.
logger.info("Top Five songs request served from different host: " + hostInfo);
RestTemplate restTemplate = new RestTemplate();
return restTemplate.postForObject(
String.format("https://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "charts/top-five?genre=Punk"), "punk", List.class);
}
public List<SongPlayCountBean> greeting(@RequestParam(value="genre") String genre) {
return topFiveSongs(KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE);
}
private List<SongPlayCountBean> topFiveSongs(final String key,
final String storeName) {
final ReadOnlyKeyValueStore<String, TopFiveSongs> topFiveStore =
interactiveQueryService.getQueryableStore(storeName, QueryableStoreTypes.<String, TopFiveSongs>keyValueStore());
queryableStoreRegistry.getQueryableStoreType(storeName, QueryableStoreTypes.<String, TopFiveSongs>keyValueStore());
// Get the value from the store
final TopFiveSongs value = topFiveStore.get(key);
@@ -217,31 +187,12 @@ public class KafkaStreamsInteractiveQuerySample {
}
final List<SongPlayCountBean> results = new ArrayList<>();
value.forEach(songPlayCount -> {
final ReadOnlyKeyValueStore<Long, Song> songStore =
queryableStoreRegistry.getQueryableStoreType(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
HostInfo hostInfo = interactiveQueryService.getHostInfo(KafkaStreamsInteractiveQuerySample.ALL_SONGS,
songPlayCount.getSongId(), new LongSerializer());
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
logger.info("Song info request served from same host: " + hostInfo);
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryService.getQueryableStore(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(songPlayCount.getSongId());
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
}
else {
logger.info("Song info request served from different host: " + hostInfo);
RestTemplate restTemplate = new RestTemplate();
SongBean song = restTemplate.postForObject(
String.format("https://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "song/idx?id=" + songPlayCount.getSongId()), "id", SongBean.class);
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
}
final Song song = songStore.get(songPlayCount.getSongId());
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
});
return results;
}

View File

@@ -1,75 +0,0 @@
package kafka.streams.interactive.query;
import java.util.Objects;
/**
* @author Soby Chacko
*/
public class SongBean {
private String artist;
private String album;
private String name;
public SongBean() {}
public SongBean(final String artist, final String album, final String name) {
this.artist = artist;
this.album = album;
this.name = name;
}
public String getArtist() {
return artist;
}
public void setArtist(final String artist) {
this.artist = artist;
}
public String getAlbum() {
return album;
}
public void setAlbum(final String album) {
this.album = album;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
@Override
public String toString() {
return "SongBean{" +
"artist='" + artist + '\'' +
", album='" + album + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SongBean that = (SongBean) o;
return Objects.equals(artist, that.artist) &&
Objects.equals(album, that.album) &&
Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(artist, album, name);
}
}

View File

@@ -2,10 +2,12 @@ spring.cloud.stream.bindings.input:
destination: play-events
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.inputX:
destination: song-feed
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
@@ -17,10 +19,7 @@ spring.cloud.stream.kafka.streams.bindings.inputX:
materializedAs: all-songs
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
zkNodes: localhost #192.168.99.100
configuration:
schema.registry.url: http://localhost:8081
commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.kafka.streams.binder.minPartitionCount: 4
spring.cloud.stream.kafka.streams.binder.configuration.application.server: localhost:8080
spring.applicaiton.name: kafka-streams-iq-advanced-sample
commit.interval.ms: 1000

View File

@@ -11,7 +11,7 @@
<description>Spring Cloud Stream sample for KStream interactive queries</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -17,12 +17,8 @@
package kafka.streams.product.tracker;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
@@ -32,7 +28,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -58,8 +54,7 @@ public class KafkaStreamsInteractiveQueryApplication {
private static final String STORE_NAME = "prod-id-count-store";
@Autowired
private InteractiveQueryService queryService;
private QueryableStoreRegistry queryableStoreRegistry;
@Autowired
ProductTrackerProperties productTrackerProperties;
@@ -73,10 +68,8 @@ public class KafkaStreamsInteractiveQueryApplication {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
.withKeySerde(Serdes.Integer())
.withValueSerde(Serdes.Long()))
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
.count(STORE_NAME)
.toStream();
}
@@ -89,7 +82,7 @@ public class KafkaStreamsInteractiveQueryApplication {
@Scheduled(fixedRate = 30000, initialDelay = 5000)
public void printProductCounts() {
if (keyValueStore == null) {
keyValueStore = queryService.getQueryableStore(STORE_NAME, QueryableStoreTypes.keyValueStore());
keyValueStore = queryableStoreRegistry.getQueryableStoreType(STORE_NAME, QueryableStoreTypes.keyValueStore());
}
for (Integer id : productIds()) {

View File

@@ -10,10 +10,12 @@ spring.cloud.stream.kafka.streams:
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
headerMode: raw
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
spring.application.name: kafka-streams-iq-basic-sample
zkNodes: localhost #192.168.99.100

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -5,12 +5,20 @@ spring.cloud.stream.kafka.streams.binder.configuration:
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.singleOutput:
destination: counts
producer:
headerMode: raw
#useNativeEncoding: true
spring.cloud.stream.bindings.binding2:
destination: words
consumer:
headerMode: raw
spring.cloud.stream.bindings.binding1:
destination: words
consumer:
headerMode: raw
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
spring.applicaiton.name: kafka-streams-message-channel-sample
zkNodes: localhost #192.168.99.100

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -18,8 +18,6 @@ package kafka.streams.product.tracker;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
@@ -62,9 +60,8 @@ public class KafkaStreamsProductTrackerApplication {
return input
.filter((key, product) -> productIds().contains(product.getId()))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)))
.windowedBy(timeWindows)
.count(Materialized.as("product-counts"))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(timeWindows, "product-counts")
.toStream()
.map((key, value) -> new KeyValue<>(key.key().id, new ProductStatus(key.key().id,
value, Instant.ofEpochMilli(key.window().start()).atZone(ZoneId.systemDefault()).toLocalTime(),

View File

@@ -8,8 +8,13 @@ spring.cloud.stream.kafka.streams:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.bindings.output:
destination: product-counts
producer:
headerMode: raw
#useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: products
consumer:
headerMode: raw
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
spring.applicaiton.name: kafka-streams-product-tracker-sample
zkNodes: localhost #192.168.99.100

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -44,7 +44,7 @@ public class KafkaStreamsTableJoin {
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
@Input("inputX") KTable<String, String> userRegionsTable) {
return userClicksStream
.leftJoin(userRegionsTable,
@@ -60,8 +60,8 @@ public class KafkaStreamsTableJoin {
interface KStreamProcessorX extends KafkaStreamsProcessor {
@Input("inputTable")
KTable<?, ?> inputKTable();
@Input("inputX")
KTable<?, ?> inputX();
}
private static final class RegionWithClicks {

View File

@@ -1,22 +1,23 @@
spring.application.name: stream-table-sample
spring.cloud.stream.bindings.input:
destination: user-clicks3
consumer:
useNativeDecoding: true
spring.cloud.stream.bindings.inputTable:
headerMode: raw
spring.cloud.stream.bindings.inputX:
destination: user-regions
contentType: application/avro
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeEncoding: true
headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.bindings.inputTable:
spring.cloud.stream.kafka.streams.bindings.inputX:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
@@ -26,6 +27,7 @@ spring.cloud.stream.kafka.streams.bindings.output:
valueSerde: org.apache.kafka.common.serialization.Serdes$LongSerde
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
zkNodes: localhost #192.168.99.100
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

View File

@@ -1,24 +0,0 @@
target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/

View File

@@ -1 +0,0 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

View File

@@ -1,32 +0,0 @@
== What is this app?
This application contains two processors, a regular Kafka Streams processor and another one that consumes data from Kafka and produces into Rabbitmq.
The example is based on the word count application from the https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java[reference documentation].
The application receives from Kafka through KStream and output to Kafka through KStream.
Then another processor listens from the same topic where the kafka streams processor output data and then it sends data to a Rabbit exchange.
There is a convenient test processor provided as part of the application that logs messages from the Rabbit destination.
=== Running the app:
Go to the root of the repository and do:
`docker-compose up -d`
(This starts both Kafka and Rabbitmq in docker containers)
`./mvnw clean package`
`java -jar target/kafka-streams-to-rabbitmq-message-channel-0.0.1-SNAPSHOT.jar`
Issue the following commands:
`docker exec -it kafka-streams-multibinder /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic words`
On another terminal:
`docker exec -it kafka-streams-multibinder /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic counts`
Enter some text in the console producer (the first terminal above) and watch the output in the console consumer (second terminal).
Also watch the console for logging statements from the test consumer that listens from the Rabbit exchange.

View File

@@ -1,25 +0,0 @@
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka-streams-multibinder
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper
rabbitmq:
image: rabbitmq:management
container_name: rabbit-multibinder-1
ports:
- 5672:5672
- 15672:15672

View File

@@ -1,225 +0,0 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -1,143 +0,0 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

View File

@@ -1,50 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-streams-to-rabbitmq-message-channel</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-streams-to-rabbitmq-message-channel</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,151 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.streams.message.channel;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import java.util.Arrays;
import java.util.Date;
@SpringBootApplication
public class KafkaStreamsWordCountApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsWordCountApplication.class, args);
}
@EnableBinding(MultipleProcessor.class)
public static class WordCountProcessorApplication {
@StreamListener("kstreamIn")
@SendTo("kstreamOut")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(20_000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
@StreamListener("fromKafka")
@SendTo("toRabbit")
public WordCount sink(WordCount input) {
return input;
}
@StreamListener("testInputFromRabbit")
public void receive(String data) {
System.out.println("Data received..." + data);
}
}
interface MultipleProcessor {
String KSTREAM_IN = "kstreamIn";
String KSTREAM_OUT = "kstreamOut";
String FROM_KAFKA = "fromKafka";
String TO_RABBIT = "toRabbit";
String TEST_INPUT_FROM_RABBIT = "testInputFromRabbit";
@Input(KSTREAM_IN)
KStream<?, ?> kstreamIn();
@Output(KSTREAM_OUT)
KStream<?, ?> kstreamOut();
@Input(FROM_KAFKA)
SubscribableChannel fromKafka();
@Output(TO_RABBIT)
MessageChannel toRabbit();
@Input(TEST_INPUT_FROM_RABBIT)
SubscribableChannel testInputFromRabbit();
}
static class WordCount {
private String word;
private long count;
private Date start;
private Date end;
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;
this.start = start;
this.end = end;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public Date getStart() {
return start;
}
public void setStart(Date start) {
this.start = start;
}
public Date getEnd() {
return end;
}
public void setEnd(Date end) {
this.end = end;
}
}
}

View File

@@ -1,24 +0,0 @@
spring.cloud.stream.bindings.singleOutput.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.kstreamIn.consumer.application-id: kafka-streams-to-rabbitmq
spring.cloud.stream.bindings.kstreamIn:
destination: words
spring.cloud.stream.bindings.kstreamOut:
destination: counts
spring.cloud.stream.bindings.fromKafka:
destination: counts
binder: kafka
spring.cloud.stream.bindings.toRabbit:
destination: countsInRabbit
binder: rabbit
spring.cloud.stream.bindings.testInputFromRabbit:
destination: countsInRabbit
binder: rabbit
spring.cloud.stream.kafka.streams.binder:
brokers: localhost

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -1,18 +0,0 @@
package kafka.streams.message.channel;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaStreamsWordCountApplicationTests {
@Test
@Ignore
public void contextLoads() {
}
}

View File

@@ -11,7 +11,7 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
@@ -31,17 +31,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -17,7 +17,6 @@
package kafka.streams.word.count;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
@@ -43,19 +42,15 @@ public class KafkaStreamsWordCountApplication {
@EnableBinding(KafkaStreamsProcessor.class)
public static class WordCountProcessorApplication {
public static final String INPUT_TOPIC = "input";
public static final String OUTPUT_TOPIC = "output";
public static final int WINDOW_SIZE_MS = 30000;
@StreamListener(INPUT_TOPIC)
@SendTo(OUTPUT_TOPIC)
public KStream<Bytes, WordCount> process(KStream<Bytes, String> input) {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(WINDOW_SIZE_MS))
.windowedBy(TimeWindows.of(30000))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
@@ -72,21 +67,6 @@ public class KafkaStreamsWordCountApplication {
private Date end;
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("WordCount{");
sb.append("word='").append(word).append('\'');
sb.append(", count=").append(count);
sb.append(", start=").append(start);
sb.append(", end=").append(end);
sb.append('}');
return sb.toString();
}
WordCount() {
}
WordCount(String word, long count, Date start, Date end) {
this.word = word;
this.count = count;

View File

@@ -1,14 +1,17 @@
spring.cloud.stream.bindings.output.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings.input.consumer.application-id: basic-word-count
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output:
destination: counts
producer:
headerMode: raw
#useNativeEncoding: true
spring.cloud.stream.bindings.input:
destination: words
consumer:
headerMode: raw
#For testing
spring.cloud.stream.bindings.input1.destination: counts
spring.cloud.stream.bindings.output1.destination: words

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="stdout"/>
</root>
<logger name="org.apache.kafka.streams.processor.internals" level="WARN"/>
</configuration>

View File

@@ -1,93 +1,18 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.streams.word.count;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = {"server.port=0",
"spring.jmx.enabled=false",
"spring.cloud.stream.bindings.input.destination=words",
"spring.cloud.stream.bindings.output.destination=counts",
"spring.cloud.stream.kafka.streams.default.consumer.application-id=basic-word-count",
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde"})
@SpringBootTest
public class KafkaStreamsWordCountApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "words", "counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
private static Consumer<String, String> consumer;
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts");
//Since there are both binders present in this app, we resort to the spring kafka broker property.
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}
@AfterClass
public static void tearDown() {
consumer.close();
System.clearProperty("spring.kafka.bootstrap-servers");
}
@Test
public void testKafkaStreamsWordCountProcessor() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
ConsumerRecords<String, String> cr = KafkaTestUtils.getRecords(consumer);
assertThat(cr.count()).isGreaterThanOrEqualTo(1);
}
finally {
pf.destroy();
}
@Ignore
public void contextLoads() {
}
}

View File

@@ -1,162 +0,0 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.streams.word.count;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.*;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/**
* TopologyTestDriver based test about stream processing of {@link KafkaStreamsWordCountApplication}
*
* @author Jukka Karvanen / jukinimi.com
*/
public class WordCountProcessorApplicationTests {
private TopologyTestDriver testDriver;
public static final String INPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.INPUT_TOPIC;
public static final String OUTPUT_TOPIC = KafkaStreamsWordCountApplication.WordCountProcessorApplication.OUTPUT_TOPIC;
final Serde<String> stringSerde = Serdes.String();
final JsonSerde<KafkaStreamsWordCountApplication.WordCount> countSerde = new JsonSerde<>(KafkaStreamsWordCountApplication.WordCount.class);
final Serde<Bytes> nullSerde = Serdes.Bytes(); //Serde for not used key
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(
stringSerde.serializer(), stringSerde.serializer()); //Key feed in as string, even read as Bytes
static Properties getStreamsConfiguration() {
final Properties streamsConfiguration = new Properties();
// Need to be set even these do not matter with TopologyTestDriver
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
return streamsConfiguration;
}
/**
* Setup Stream topology
* Add KStream based on @StreamListener annotation
* Add to(topic) based @SendTo annotation
*/
@Before
public void setup() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<Bytes, String> input = builder.stream(INPUT_TOPIC, Consumed.with(nullSerde, stringSerde));
KafkaStreamsWordCountApplication.WordCountProcessorApplication app = new KafkaStreamsWordCountApplication.WordCountProcessorApplication();
KStream<Bytes, KafkaStreamsWordCountApplication.WordCount> output = app.process(input);
output.to(OUTPUT_TOPIC, Produced.with(nullSerde, countSerde));
testDriver = new TopologyTestDriver(builder.build(), getStreamsConfiguration());
}
@After
public void tearDown() {
try {
testDriver.close();
} catch (final RuntimeException e) {
// https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
// Logged stacktrace cannot be avoided
System.out.println("Ignoring exception, test failing in Windows due this exception:" + e.getLocalizedMessage());
}
}
/**
* Read one Record from output topic.
*
* @return ProducerRecord containing WordCount as value
*/
private ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> readOutput() {
return testDriver.readOutput(OUTPUT_TOPIC, nullSerde.deserializer(), countSerde.deserializer());
}
/**
* Read counts from output to map ignoring start and end dates
* If existing word is incremented, it can appear twice in output and is replaced in map
*
* @return Map of Word and counts
*/
private Map<String, Long> getOutputList() {
final Map<String, Long> output = new HashMap<>();
ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> outputRow;
while ((outputRow = readOutput()) != null) {
output.put(outputRow.value().getWord(), outputRow.value().getCount());
}
return output;
}
/**
* Simple test validating count of one word
*/
@Test
public void testOneWord() {
final String nullKey = null;
//Feed word "Hello" to inputTopic and no kafka key, timestamp is irrelevant in this case
testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, nullKey, "Hello", 1L));
//Read and validate output
final ProducerRecord<Bytes, KafkaStreamsWordCountApplication.WordCount> output = readOutput();
assertThat(output).isNotNull();
assertThat(output.value()).isEqualToComparingFieldByField(new KafkaStreamsWordCountApplication.WordCount("hello", 1L, new Date(0), new Date(KafkaStreamsWordCountApplication.WordCountProcessorApplication.WINDOW_SIZE_MS)));
//No more output in topic
assertThat(readOutput()).isNull();
}
/**
* Test Word count of sentence list.
*/
@Test
public void shouldCountWords() {
final List<String> inputLines = Arrays.asList(
"Kafka Streams Examples",
"Spring Cloud Stream Sample",
"Using Kafka Streams Test Utils"
);
final List<KeyValue<String, String>> inputRecords = inputLines.stream().map(v -> new KeyValue<String, String>(null, v)).collect(Collectors.toList());
final Map<String, Long> expectedWordCounts = new HashMap<>();
expectedWordCounts.put("spring", 1L);
expectedWordCounts.put("cloud", 1L);
expectedWordCounts.put("examples", 1L);
expectedWordCounts.put("sample", 1L);
expectedWordCounts.put("streams", 2L);
expectedWordCounts.put("stream", 1L);
expectedWordCounts.put("test", 1L);
expectedWordCounts.put("utils", 1L);
expectedWordCounts.put("kafka", 2L);
expectedWordCounts.put("using", 1L);
testDriver.pipeInput(recordFactory.create(INPUT_TOPIC, inputRecords, 1L, 1000L)); //All feed in same 30s time window
final Map<String, Long> actualWordCounts = getOutputList();
assertThat(actualWordCounts).containsAllEntriesOf(expectedWordCounts).hasSameSizeAs(expectedWordCounts);
}
}

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>kafka-streams-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
@@ -13,13 +13,10 @@
<module>kafka-streams-branching</module>
<module>kafka-streams-dlq-sample</module>
<module>kafka-streams-table-join</module>
<module>kafka-streams-global-table-join</module>
<module>kafka-streams-interactive-query-basic</module>
<module>kafka-streams-interactive-query-advanced</module>
<module>kafka-streams-message-channel</module>
<module>kafka-streams-product-tracker</module>
<module>kafka-streams-aggregate</module>
<module>kafka-streams-to-rabbitmq-message-channel</module>
</modules>
</project>

View File

@@ -10,7 +10,7 @@
<description>Spring Cloud Stream Kinesis Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
@@ -19,8 +19,8 @@
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>1.0.0.RELEASE</version>
<artifactId>spring-cloud-starter-stream-kinesis</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -1 +0,0 @@
../../.mvn

View File

@@ -1,10 +0,0 @@
== Reactive Spring Cloud Stream AWS Kinesis Binder to SSE via WebFlux
This sample demonstrate a simple bridging of AWS Kinesis stream records to the Server Side Events subscribers.
The `@StreamListener` sink side is based on the Spring Cloud Stream Reactive support, streaming incoming messages to the `Flux` argument which, in turn, is used as a source for the `@GetMapping` controller.
The `CloudStreamKinesisToWebfluxApplicationTests` demonstrates:
- an `AmazonKinesisAsync` client configured against local Kineselite on the `4568` port;
- the `TestSource` binding for producing records into the Kinesis stream;
- a `WebTestClient` to perform SSE request against embedded Netty started by the `CloudStreamKinesisToWebfluxApplication` Spring Boot application on the random port and subsequent verification of the data produced by the `Flux` from the `@StreamListener` against Kinesis Binder consumer.

View File

@@ -1,63 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kinesis-to-webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kinesis-to-webflux</name>
<description>Spring Cloud Stream Kinesis to WebFlux Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,58 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kinesis.webflux;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.amazonaws.services.kinesis.model.Record;
import reactor.core.publisher.Flux;
@SpringBootApplication
@EnableBinding(Sink.class)
@RestController
public class CloudStreamKinesisToWebfluxApplication {
private volatile Flux<String> recordFlux;
@GetMapping(value = "/sseFromKinesis", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromKinesis() {
return this.recordFlux;
}
@StreamListener(Sink.INPUT)
public void kinesisSink(Flux<List<Record>> recordFlux) {
this.recordFlux = recordFlux
.flatMap(Flux::fromIterable)
.map(record -> new String(record.getData().array(), StandardCharsets.UTF_8));
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamKinesisToWebfluxApplication.class, args);
}
}

View File

@@ -1,20 +0,0 @@
spring:
cloud:
stream:
bindings:
input:
destination: SSE_DATA
group: kinesis-to-sse
consumer:
headerMode: none
useNativeDecoding: true
kinesis:
bindings:
input:
consumer:
listenerMode: batch
cloud:
aws:
region:
static: eu-west-1

View File

@@ -1,140 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kinesis.webflux;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.aws.autoconfigure.context.ContextResourceLoaderAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"spring.cloud.stream.bindings." + CloudStreamKinesisToWebfluxApplicationTests.TestSource.TO_KINESIS_OUTPUT + ".destination = SSE_DATA",
"spring.cloud.stream.bindings." + CloudStreamKinesisToWebfluxApplicationTests.TestSource.TO_KINESIS_OUTPUT + ".producer.headerMode = none",
"logging.level.org.springframework.integration=TRACE"
}
)
@AutoConfigureWebTestClient
public class CloudStreamKinesisToWebfluxApplicationTests {
@ClassRule
public static LocalKinesisResource localKinesisResource = new LocalKinesisResource();
@Autowired
private WebTestClient webTestClient;
@Autowired
private TestSource testSource;
@Test
public void testKinesisToWebFlux() {
this.testSource.toKinesisOutput().send(new GenericMessage<>("foo"));
this.testSource.toKinesisOutput().send(new GenericMessage<>("bar"));
this.testSource.toKinesisOutput().send(new GenericMessage<>("baz"));
Flux<String> seeFlux =
this.webTestClient.get().uri("/sseFromKinesis")
.exchange()
.returnResult(String.class)
.getResponseBody();
StepVerifier
.create(seeFlux)
.expectNext("foo", "bar", "baz")
.thenCancel()
.verify();
}
@TestConfiguration
@EnableBinding(TestSource.class)
@EnableAutoConfiguration(exclude = ContextResourceLoaderAutoConfiguration.class)
public static class KinesisTestConfiguration {
public static final int DEFAULT_KINESALITE_PORT = 4568;
@Bean
public AmazonKinesisAsync amazonKinesis() {
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
return AmazonKinesisAsyncClientBuilder.standard()
.withClientConfiguration(
new ClientConfiguration()
.withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + DEFAULT_KINESALITE_PORT,
Regions.DEFAULT_REGION.getName()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.build();
}
@Bean
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
@Bean
public ConcurrentMetadataStore simpleMetadataStore() {
return new SimpleMetadataStore();
}
}
interface TestSource {
String TO_KINESIS_OUTPUT = "toKinesisOutput";
@Output(TO_KINESIS_OUTPUT)
MessageChannel toKinesisOutput();
}
}

View File

@@ -1,121 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kinesis.webflux;
import java.util.List;
import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
/**
* An {@link AbstractExternalResourceTestSupport} implementation for Kinesis local service.
*
* @author Artem Bilan
* @author Jacob Severson
*
*/
public class LocalKinesisResource
extends AbstractExternalResourceTestSupport<AmazonKinesisAsync> {
/**
* The default port for the local Kinesis service.
*/
public static final int DEFAULT_PORT = 4568;
private final int port;
public LocalKinesisResource() {
this(DEFAULT_PORT);
}
public LocalKinesisResource(int port) {
super("KINESIS");
this.port = port;
}
@Override
protected void obtainResource() {
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true");
this.resource = AmazonKinesisAsyncClientBuilder.standard()
.withClientConfiguration(new ClientConfiguration().withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + this.port,
Regions.DEFAULT_REGION.getName()))
.withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.build();
// Check connection
this.resource.listStreams();
}
@Override
protected void cleanupResource() {
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
ListStreamsResult listStreamsResult = this.resource
.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
while (listStreamsResult.getHasMoreStreams()) {
if (streamNames.size() > 0) {
listStreamsRequest.setExclusiveStartStreamName(
streamNames.get(streamNames.size() - 1));
}
listStreamsResult = this.resource.listStreams(listStreamsRequest);
streamNames.addAll(listStreamsResult.getStreamNames());
}
for (String stream : streamNames) {
this.resource.deleteStream(stream);
while (true) {
try {
this.resource.describeStream(stream);
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}
catch (ResourceNotFoundException ex) {
break;
}
}
}
System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
this.resource.shutdown();
}
}

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>kinesis-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
@@ -10,7 +10,5 @@
<modules>
<module>kinesis-produce-consume</module>
<module>kinesis-to-webflux</module>
</modules>
</project>

View File

@@ -9,6 +9,8 @@ To run this sample, you will need to have installed:
* Java 8 or Above
This example requires Redis to be running on localhost.
## Code Tour
This sample is a Spring Boot application that bundles multiple application together to showcase how to configure multiple input/output channels.

View File

@@ -9,7 +9,7 @@
<description>Spring Cloud Stream Sample JDBC Source App</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
@@ -40,17 +40,6 @@
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>kafka</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>rabbit-binder</id>
@@ -60,17 +49,6 @@
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>rabbit</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>multi-io-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
@@ -11,5 +11,4 @@
<modules>
<module>multi-io</module>
</modules>
</project>

View File

@@ -1,246 +0,0 @@
== Spring Cloud Stream Multi-binder Kafka Application with security (JAAS)
This is a sample application that demonstrates how to connect to multi kafka clusters with security enabled using multiple binders.
This application uses two Kafka clusters both of them are enabled with security (JAAS - SASL/PLAINTEXT).
## Here are the detailed instructions for setting up your cluster.
If you already have two clusters with security (sasl/plaintext) enabled, you can skip this section. However, it may still benefit to go over these instructions in order to avoid any inconsistencies with your environment.
* Download the latest Apache Kafka distribution (version 1.0.0 or above)
* Unzip the same zip file into two directories (lets call them as cluster-1 and cluster-2 for the purposes of this sample)
* cd cluster-1/config
* Create a new file `kafka_server_jaas.conf` and add the following content:
```
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
```
* Create another file called `zookeeper_jaas.conf` and add the following content:
```
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
```
* Edit the file server.properties and make sure that the following properties are set. Some of these are already in the configuration and if so, you have to find them and modify.
```
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
```
* Edit the file zookeeper.properties and add the following properties:
```
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
```
* Edit the file producer.properties and add the following content:
```
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Edit the file consumer.properties and add the following content:
```
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Terminal 1
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-1>/config/zookeeper_jaas.conf"
$ ./<PATH-TO-CLUSTER-1>/bin/zookeeper-server-start.sh config/zookeeper.properties
```
* Terminal 2
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-1>/config/kafka_server_jaas.conf"
$ ./<PATH-TO-CLUSTER-1>/bin/kafka-server-start.sh config/server.properties
```
* Now we need to do the same for the second cluster. Follow along...
* cd cluster-2/config
* Create a new file `kafka_server_jaas.conf` and add the following content:
```
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
```
* Create another file called `zookeeper_jaas.conf` and add the following content:
```
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
```
* Edit the file server.properties and make sure that the following properties are set. Some of these are already in the configuration and if so, you have to find them and modify.
```
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://localhost:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-cluster2-logs
zookeeper.connect=localhost:2182
```
* Edit the file zookeeper.properties and add the following properties:
```
clientPort=2182
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
```
* Edit the file producer.properties and add the following content:
```
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Edit the file consumer.properties and add the following content:
```
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Terminal 3
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-2>/config/zookeeper_jaas.conf"
$ ./<PATH-TO-CLUSTER-2>/bin/zookeeper-server-start.sh config/zookeeper.properties
```
* Terminal 4
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-2>/config/kafka_server_jaas.conf"
$ ./<PATH-TO-CLUSTER-2>/bin/kafka-server-start.sh config/server.properties
```
* At this point, you should have 2 zookeeper instances and 2 kafka instances running. Verify this by running `jps` (or other OS tools).
## Running the application
The application's configuration is matched with the plaintext config that is set above (Please review the yml file)
The application contains two `StreamListener` methods. The first one receives records from a topic in cluster-1 and output that to a topic in cluster-2.
The second `StreamListener` method receives records from a topic in cluster-2 and output that to a topic in cluster-1.
Cluster-1 input topic is named as kafka1-in and output topic is kafka1-out. Similarly for cluster-2 they are kafka2-in and kafka2-out respectively.
Run the application `MultiBinderKafkaJaasSample` on the IDE or from CLI.
## Verify that the application is running
* Terminal 5
Create a file called `kafka_client_jaas.conf` and add the following content (Save it to any directory, but for this example, lets say it is in `/home/username`):
```
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
```
* Terminal 6 - Produce data to kafka1 - input.
$ `export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"`
Go to the kafka installation directory (It doesn't matter if you are in cluster-1 or cluster-2 directories for the instructions below)
$ `./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka1-in --producer.config=config/producer.properties`
* Terminal 7 (or split the above terminal into 2 window panes) - Consume data from kafka2 - output where the above data is expected to come through the processor.
$ `export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"`
Go to the kafka installation directory (It doesn't matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ `./bin/kafka-console-consumer.sh --topic kafka2-out --consumer.config=config/consumer.properties --bootstrap-server=localhost:9093`
* Terminal 8 - Produce data to kafka2 - input.
$ `export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"`
Go to the kafka installation directory (It doesn't matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ `./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic kafka2-in --producer.config=config/producer.properties`
* Terminal 9 (or split the above terminal into 2 window panes) - Consume data from kafka1 - output where the above data is expected to come through the second processor in the application.
$ `export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"`
Go to the kafka installation directory (It doesn't matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ `./bin/kafka-console-consumer.sh --topic kafka1-out --consumer.config=config/consumer.properties --bootstrap-server=localhost:9092`
* Now start adding some text to the terminal session where you are running console producer on kafka1-in.
Then verify that, you see the same exact text on the the terminal session where you are running the console consumer on kafka2-out.
Similarly, start adding some text to the terminal session where you are running console producer on kafka2-in.
Then verify that, you see the same exact text on the the terminal session where you are running the console consumer on kafka1-out.
PS: Once you are done with the testing, remember to stop the application, console consumers and producers and your local kafka clusters used for testing.

View File

@@ -1,225 +0,0 @@
#!/bin/sh
# ----------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
#
# Required ENV vars:
# ------------------
# JAVA_HOME - location of a JDK home dir
#
# Optional ENV vars
# -----------------
# M2_HOME - location of maven2's installed home dir
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
# e.g. to debug Maven itself, use
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
# ----------------------------------------------------------------------------
if [ -z "$MAVEN_SKIP_RC" ] ; then
if [ -f /etc/mavenrc ] ; then
. /etc/mavenrc
fi
if [ -f "$HOME/.mavenrc" ] ; then
. "$HOME/.mavenrc"
fi
fi
# OS specific support. $var _must_ be set to either true or false.
cygwin=false;
darwin=false;
mingw=false
case "`uname`" in
CYGWIN*) cygwin=true ;;
MINGW*) mingw=true;;
Darwin*) darwin=true
# Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
# See https://developer.apple.com/library/mac/qa/qa1170/_index.html
if [ -z "$JAVA_HOME" ]; then
if [ -x "/usr/libexec/java_home" ]; then
export JAVA_HOME="`/usr/libexec/java_home`"
else
export JAVA_HOME="/Library/Java/Home"
fi
fi
;;
esac
if [ -z "$JAVA_HOME" ] ; then
if [ -r /etc/gentoo-release ] ; then
JAVA_HOME=`java-config --jre-home`
fi
fi
if [ -z "$M2_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
M2_HOME=`dirname "$PRG"`/..
# make it fully qualified
M2_HOME=`cd "$M2_HOME" && pwd`
cd "$saveddir"
# echo Using m2 at $M2_HOME
fi
# For Cygwin, ensure paths are in UNIX format before anything is touched
if $cygwin ; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --unix "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
fi
# For Migwn, ensure paths are in UNIX format before anything is touched
if $mingw ; then
[ -n "$M2_HOME" ] &&
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
javaExecutable="`which javac`"
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
# readlink(1) is not available as standard on Solaris 10.
readLink=`which readlink`
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
if $darwin ; then
javaHome="`dirname \"$javaExecutable\"`"
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
else
javaExecutable="`readlink -f \"$javaExecutable\"`"
fi
javaHome="`dirname \"$javaExecutable\"`"
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
JAVA_HOME="$javaHome"
export JAVA_HOME
fi
fi
fi
if [ -z "$JAVACMD" ] ; then
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
else
JAVACMD="`which java`"
fi
fi
if [ ! -x "$JAVACMD" ] ; then
echo "Error: JAVA_HOME is not defined correctly." >&2
echo " We cannot execute $JAVACMD" >&2
exit 1
fi
if [ -z "$JAVA_HOME" ] ; then
echo "Warning: JAVA_HOME environment variable is not set."
fi
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
# traverses directory structure from process work directory to filesystem root
# first directory with .mvn subdirectory is considered project base directory
find_maven_basedir() {
if [ -z "$1" ]
then
echo "Path not specified to find_maven_basedir"
return 1
fi
basedir="$1"
wdir="$1"
while [ "$wdir" != '/' ] ; do
if [ -d "$wdir"/.mvn ] ; then
basedir=$wdir
break
fi
# workaround for JBEAP-8937 (on Solaris 10/Sparc)
if [ -d "${wdir}" ]; then
wdir=`cd "$wdir/.."; pwd`
fi
# end of workaround
done
echo "${basedir}"
}
# concatenates all lines of a file
concat_lines() {
if [ -f "$1" ]; then
echo "$(tr -s '\n' ' ' < "$1")"
fi
}
BASE_DIR=`find_maven_basedir "$(pwd)"`
if [ -z "$BASE_DIR" ]; then
exit 1;
fi
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
echo $MAVEN_PROJECTBASEDIR
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
[ -n "$M2_HOME" ] &&
M2_HOME=`cygpath --path --windows "$M2_HOME"`
[ -n "$JAVA_HOME" ] &&
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
[ -n "$CLASSPATH" ] &&
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
[ -n "$MAVEN_PROJECTBASEDIR" ] &&
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

View File

@@ -1,143 +0,0 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

View File

@@ -1,46 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-multibinder-jaas</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-multibinder-jaas</name>
<description>Demo project with multi kafka cluster/binder with JAAS</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,67 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder.kafka.jaas;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
@SpringBootApplication
public class MultiBinderKafkaJaasSample {
public static void main(String[] args) {
SpringApplication.run(MultiBinderKafkaJaasSample.class, args);
}
@EnableBinding(CustomProcessor.class)
static class Foo {
@StreamListener("input")
@SendTo("output")
public String receive(String foo) {
return foo;
}
@StreamListener("input1")
@SendTo("output1")
public String receive1(String foo) {
return foo;
}
}
interface CustomProcessor {
@Input("input")
SubscribableChannel input();
@Output("output")
MessageChannel output();
@Input("input1")
SubscribableChannel input1();
@Output("output1")
MessageChannel output1();
}
}

View File

@@ -1,43 +0,0 @@
spring.cloud.stream:
bindings:
input:
destination: kafka1-in
binder: kafka1
output:
destination: kafka2-out
binder: kafka2
input1:
destination: kafka2-in
binder: kafka2
output1:
destination: kafka1-out
binder: kafka1
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder.brokers: localhost:9092
binder.jaas.loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
binder.jaas.options.username: admin
binder.jaas.options.password: admin-secret
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
zkNodes: localhost:2182
brokers: localhost:9093
jaas.loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
jaas.options.username: admin
jaas.options.password: admin-secret
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN

View File

@@ -1,32 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder.kafka.jaas;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MultiBinderKafkaJaasSampleTests {
@Test
public void contextLoads() {
}
}

View File

@@ -9,7 +9,7 @@
<description>Spring Cloud Stream Multibinder Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>

View File

@@ -1 +0,0 @@
../.mvn

View File

@@ -1,43 +0,0 @@
== Spring Cloud Stream Multibinder Application with Different Systems
This example shows how to run a Spring Cloud Stream application with the same binder type configured for two separate Kafka clusters.
## Running the application
The following instructions assume that you are running Kafka as a Docker image.
* Go to the application root
* `docker-compose up -d`
This brings up two Kafka clusters in docker containers.
Local ports mapped for kafka are 9092 and 9093 (Zookeeper local parts mapped are 2181 and 2182).
* `./mvnw clean package`
The sample comes with a convenient test producer and consumer to see the processor in action.
After running the program, watch your console, every second some data is sent to Kafka cluster 1 and it is received through Kafka cluster 2.
To run the example, command line parameters for the Zookeeper ensembles and Kafka clusters must be provided, as in the following example:
```
java -jar target/multibinder-kafka-streams-0.0.1-SNAPSHOT.jar --kafkaBroker1=localhost:9092 --zk1=localhost:2181 --kafkaBroker2=localhost:9093 --zk2=localhost:2182```
```
Alternatively, the default values of `localhost:9092` and `localhost:2181` can be provided for both clusters.
Assuming you are running two dockerized Kafka clusters as above.
Issue the following commands:
`docker exec -it kafka-multibinder-1 /opt/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic dataIn`
On another terminal:
`docker exec -it kafka-multibinder-2 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic dataOut`
Enter some text on the first one and the same text appears on the second one.
## Running the Kafka Streams processor:
Run the stand-alone Producers application a few times to to generate some data.
Then go to the URL: http://localhost:8080/events

View File

@@ -1,36 +0,0 @@
version: '3'
services:
kafka1:
image: wurstmeister/kafka
container_name: kafka-multibinder-1
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181
depends_on:
- zookeeper1
zookeeper1:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper1
kafka2:
image: wurstmeister/kafka
container_name: kafka-multibinder-2
ports:
- "9093:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper2:2181
depends_on:
- zookeeper2
zookeeper2:
image: wurstmeister/zookeeper
ports:
- "2182:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper2

View File

@@ -1,55 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>multibinder-kafka-streams</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>multibinder-kafka-streams</name>
<description>Spring Cloud Stream Multibinder Two Kafka Clusters (Kafka Streams) Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,153 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Marius Bogoevici
* @author Soby Chacko
*/
@EnableBinding(Processor.class)
public class BridgeTransformer {
@Autowired
private InteractiveQueryService interactiveQueryService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object transform(Object payload) {
return payload;
}
//Following source is used as test producer.
@EnableBinding(TestSource.class)
static class TestProducer {
private AtomicBoolean semaphore = new AtomicBoolean(true);
@Bean
@InboundChannelAdapter(channel = TestSource.OUTPUT, poller = @Poller(fixedDelay = "1000"))
public MessageSource<String> sendTestData() {
return () ->
new GenericMessage<>(this.semaphore.getAndSet(!this.semaphore.get()) ? "foo" : "bar");
}
}
//Following sink is used as test consumer for the above processor. It logs the data received through the processor.
@EnableBinding(TestSink.class)
static class TestConsumer {
private final Log logger = LogFactory.getLog(getClass());
@StreamListener(TestSink.INPUT)
public void receive(String data) {
logger.info("Data received..." + data);
}
}
@EnableBinding(KafkaStreamsProcessorX.class)
static class KafkaStreamsAggregateSampleApplication {
@StreamListener("input2")
public void process(KStream<Object, DomainEvent> input) {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
input
.groupBy(
(s, domainEvent) -> domainEvent.boardUuid,
Serialized.with(null, domainEventSerde))
.aggregate(
String::new,
(s, domainEvent, board) -> board.concat(domainEvent.eventType),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
}
}
@RestController
public class FooController {
@RequestMapping("/events")
public String events() {
final ReadOnlyKeyValueStore<String, String> topFiveStore =
interactiveQueryService.getQueryableStore("test-events-snapshots", QueryableStoreTypes.<String, String>keyValueStore());
return topFiveStore.get("12345");
}
}
interface TestSink {
String INPUT = "input1";
@Input(INPUT)
SubscribableChannel input1();
}
interface TestSource {
String OUTPUT = "output1";
@Output(TestSource.OUTPUT)
MessageChannel output();
}
interface KafkaStreamsProcessorX {
@Input("input2")
KStream<?, ?> input2();
}
}

View File

@@ -1,43 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
/**
* @author Soby Chacko
*/
public class DomainEvent {
String eventType;
String boardUuid;
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public String getBoardUuid() {
return boardUuid;
}
public void setBoardUuid(String boardUuid) {
this.boardUuid = boardUuid;
}
}

View File

@@ -1,29 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MultibinderApplication {
public static void main(String[] args) {
SpringApplication.run(MultibinderApplication.class, args);
}
}

View File

@@ -1,60 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.HashMap;
import java.util.Map;
/**
* @author Soby Chacko
*/
public class Producers {
public static void main(String... args) {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, domainEventSerde.serializer().getClass());
DomainEvent ddEvent = new DomainEvent();
ddEvent.setBoardUuid("12345");
ddEvent.setEventType("thisisanevent");
DefaultKafkaProducerFactory<String, DomainEvent> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, DomainEvent> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foobar");
template.sendDefault("", ddEvent);
}
}

View File

@@ -1,55 +0,0 @@
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${kafkaBroker2}
applicationId: multi-binder-kafka-streams
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$BytesSerde
commit.interval.ms: 1000
bindings:
input:
destination: dataIn
binder: kafka1
group: testGroup
output:
destination: dataOut
binder: kafka2
#Test source binding (used for testing)
output1:
destination: dataIn
binder: kafka1
#Test sink binding (used for testing)
input1:
destination: dataOut
binder: kafka2
input2:
destination: foobar
binder: kafka3
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${kafkaBroker1}
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${kafkaBroker2}
kafka3:
type: kstream

View File

@@ -1,102 +0,0 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package multibinder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.UUID;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@RunWith(SpringRunner.class)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class TwoKafkaBindersApplicationTest {
@ClassRule
public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1);
@ClassRule
public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1);
@BeforeClass
public static void setupEnvironment() {
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString());
System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString());
System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString());
}
@Autowired
private BinderFactory binderFactory;
@Test
public void contextLoads() {
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1", MessageChannel.class);
KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties");
Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString()));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2", MessageChannel.class);
KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties");
Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString()));
}
@Test
public void messagingWorks() {
QueueChannel dataConsumer = new QueueChannel();
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka2", MessageChannel.class)).bindConsumer("dataOut", UUID.randomUUID().toString(),
dataConsumer, new ExtendedConsumerProperties<>(new KafkaConsumerProperties()));
//receiving test message sent by the test producer in the application
Message<?> receive = dataConsumer.receive(60_000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo("foo".getBytes()));
}
}

View File

@@ -9,7 +9,7 @@
<description>Spring Cloud Stream Multibinder Two Kafka Clusters Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
@@ -19,12 +19,12 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.spring.cloud.stream.sample</groupId>
<groupId>spring.cloud.stream.samples</groupId>
<artifactId>multibinder-samples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
@@ -11,8 +11,5 @@
<modules>
<module>multibinder-kafka-rabbit</module>
<module>multibinder-two-kafka-clusters</module>
<module>kafka-multibinder-jaas</module>
<module>multibinder-kafka-streams</module>
</modules>
</project>

View File

@@ -1,87 +0,0 @@
Spring Cloud Stream Partitioning Sample
========================================
This is a collection of applications that demonstrates how partitioning works in Spring Cloud Stream.
## Quick introduction
The producer used in the sample produces messages with text that has a length of 1, 2, 3 or 4.
There is a configuration in the producer's application.yml file for `partition-key-expression` that uses the length of the payload minus 1 as the partition key expression to use.
This value will be used by the binder for selecting the correct partition based on the total number of partitions configured on the destination at the broker.
We use 4 partitions for this demo.
There is a common producer module called partitioning-producer and then there is a consumer for kafka and rabbit - partitioning-consumer-kafka and partitioning-consumer-rabbit respectively.
Follow the instructions below to run the demo for Kafka or RabbitMQ.
## Running the sample for Kafka
The following instructions assume that you are running Kafka as a Docker image.
* `docker-compose up -d`
* cd partitioning-consumer-kafka
* `./mvnw clean package`
* `java -jar target/partitioning-consumer-kafka-0.0.1-SNAPSHOT.jar --server.port=9008`
On another termimal start another instance of the consumer.
* `java -jar target/partitioning-consumer-kafka-0.0.1-SNAPSHOT.jar --server.port=9009`
* cd ../partitioning-producer
* `./mvnw clean package`
* `java -jar target/partitioning-producer-0.0.1-SNAPSHOT.jar --server.port=9010`
Producer sends messages randomly that has string length of 1, 2, 3, or 4.
Watch the consumer console logs and verify that the correct partitions are receiving the messages.
The log message has the payload and partition information in it.
Once you are done testing, stop all the instances.
* `docker-compose down`
## Running the sample for Rabbit
The following instructions assume that you are running Rabbit as a Docker image.
Make sure that you are at the root directory of partitioning samples (partitioning-samples)
Rabbit partitioning demo is slightly different from Kafka.
We need to spin up 4 consumers for each of the four partitions.
* `docker-compose -f docker-compose-rabbit.yml up -d`
* cd partitioning-consumer-rabbit
* `./mvnw clean package`
* `java -jar target/partitioning-consumer-rabbit-0.0.1-SNAPSHOT.jar --server.port=9005`
On another terminal start another instance of the consumer.
* `java -jar target/partitioning-consumer-rabbit-0.0.1-SNAPSHOT.jar --server.port=9006 --spring.cloud.stream.bindings.input.consumer.instanceIndex=1`
On another terminal start another instance of the consumer.
* `java -jar target/partitioning-consumer-rabbit-0.0.1-SNAPSHOT.jar --server.port=9007 --spring.cloud.stream.bindings.input.consumer.instanceIndex=2`
On another terminal start yet another instance of the consumer.
* `java -jar target/partitioning-consumer-rabbit-0.0.1-SNAPSHOT.jar --server.port=9008 --spring.cloud.stream.bindings.input.consumer.instanceIndex=3`
* cd ../partitioning-producer
* `./mvnw clean package -P rabbit-binder`
* `java -jar target/partitioning-producer-0.0.1-SNAPSHOT.jar --server.port=9010`
Producer sends messages randomly that has string length of 1, 2, 3, or 4.
Watch the consumer console logs and verify that the correct instances are receiving the messages.
The first consumer we started should receive messages with a string length of 1 (partition-0), second consumer with `instanceIndex` set to 1 should receive messages with string length of 2 (partittion-1) so on and so forth.
Once you are done testing, stop all the instances.
* `docker-compose -f docker-compose-rabbit.yml down`

View File

@@ -1,7 +0,0 @@
version: '3'
services:
rabbitmq:
image: rabbitmq:management
ports:
- 5672:5672
- 15672:15672

View File

@@ -1,19 +0,0 @@
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka-partitioning
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeper

View File

@@ -1,54 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>partitioning-consumer-sample-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>partitioning-consumer-sample-kafka</name>
<description>Spring Cloud Stream Partitioning Kafka</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>kafka-binder</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,16 +0,0 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 https://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>uppercase-transformer-kafka</id>
<dependencySets>
<dependencySet>
<includes>
<include>io.spring.cloud.stream.sample:partitioning-consumer-sample-kafka</include>
</includes>
<outputDirectory>.</outputDirectory>
<outputFileNameMapping>partitioning-consumer-sample-kafka.jar</outputFileNameMapping>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -1,40 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
/**
* @author Soby Chacko
*/
@EnableBinding(Sink.class)
public class PartitioningKafkaDemo {
private static final Logger logger = LoggerFactory.getLogger(PartitioningKafkaDemo.class);
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
logger.info(in + " received from partition " + partition);
}
}

View File

@@ -1,29 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PartitioningKafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(PartitioningKafkaDemoApplication.class, args);
}
}

View File

@@ -1,11 +0,0 @@
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
minPartitionCount: 4
bindings:
input:
destination: partitioned.destination
group: myGroup

View File

@@ -1,37 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = PartitioningKafkaDemoApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}

View File

@@ -1,54 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>partitioning-consumer-sample-rabbit</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>partitioning-consumer-sample-rabbit</name>
<description>Spring Cloud Stream Partitioning Rabbit</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>rabbit-binder</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,16 +0,0 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 https://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>uppercase-transformer-kafka</id>
<dependencySets>
<dependencySet>
<includes>
<include>io.spring.cloud.stream.sample:partitioning-consumer-sample-rabbit</include>
</includes>
<outputDirectory>.</outputDirectory>
<outputFileNameMapping>partitioning-consumer-sample-rabbit.jar</outputFileNameMapping>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -1,40 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
/**
* @author Soby Chacko
*/
@EnableBinding(Sink.class)
public class PartitioningRabbitDemo {
private static final Logger logger = LoggerFactory.getLogger(PartitioningRabbitDemo.class);
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String partition) {
logger.info(in + " received from partition " + partition);
}
}

View File

@@ -1,29 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PartitioningRabbitDemoApplication {
public static void main(String[] args) {
SpringApplication.run(PartitioningRabbitDemoApplication.class, args);
}
}

Some files were not shown because too many files have changed in this diff Show More