GH-161: Make Kinesis samples Function-based (#162)

* GH-161: Make Kinesis samples Function-based

Fixes https://github.com/spring-cloud/spring-cloud-stream-samples/issues/161

* * Fix READMEs to avoid `@EnableBinding` and brothers
This commit is contained in:
Artem Bilan
2019-10-29 15:48:36 -04:00
committed by Soby Chacko
parent de7f31d87d
commit dee9053bd0
13 changed files with 160 additions and 288 deletions

View File

@@ -20,12 +20,11 @@ This sample is a Spring Boot application that uses Spring Cloud Stream to produc
The sample has the following components:
* KinesisApplication - The Spring Boot Main Application
* OrderController - The Controller exposing POST and GET endpoints
* OrderController - The Controller exposing POST and GET endpoints
* OrderRepository - An in memory database for storing Order objects
* OrderProcessor - An interface defining the input `ordersIn` and output `ordersOut` channel bindings
* OrderSource - The class that produces messages for the stream
* OrderStreamConfiguration - Listens to the Kinesis stream using ` @StreamListener(OrderProcessor.INPUT)` logs receiving messages from the stream
* OrderSource - The class that produces records for the stream
* OrderStreamConfiguration - Listens to the Kinesis stream and logs receiving records from the stream
## Building with Maven
Build the sample by executing:
@@ -45,7 +44,7 @@ Start the Consumer using:
`kinesis-produce-consume>$ java -jar target/spring-cloud-stream-sample-kinesis-0.0.1.BUILD-SNAPSHOT.jar --originator=KinesisConsumer --server.port=64399`
The originator is a key added to messages to tell the receiving application who sent the message.
The originator is a key added to messages to tell the receiving application who sent the message.
If the consumer sent the message nothing is done, if the producer sent the message, then it is saved to an in memory database.
@@ -88,4 +87,4 @@ The returned Order should match the Order that was POSTed to the Producer.
"name": "pencil"
}
]`

View File

@@ -9,18 +9,35 @@
<name>kinesis-produce-consume</name>
<description>Spring Cloud Stream Kinesis Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>1.0.0.RELEASE</version>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>

View File

@@ -1,39 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
*
* @author Peter Oates
*
*/
public interface OrderProcessor {
String INPUT = "ordersIn";
@Output
MessageChannel ordersOut();
@Input
SubscribableChannel ordersIn();
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,38 +16,40 @@
package demo.stream;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import demo.repository.OrderRepository;
/**
*
* @author Peter Oates
* @author Artem Bilan
*
*/
@EnableBinding(OrderProcessor.class)
public class OrderStreamConfiguration {
private final Log logger = LogFactory.getLog(getClass());
@Autowired
private OrderRepository orders;
@StreamListener(OrderProcessor.INPUT)
public void processOrder(Event event) {
//log the order received
if (!event.getOriginator().equals("KinesisProducer")) {
logger.info("An order has been received " + event.toString());
orders.save(event.getSubject());
}
else {
logger.info("An order has been placed from this service " + event.toString());
}
@Bean
public Consumer<Event> processOrder(OrderRepository orders) {
return event -> {
//log the order received
if (!event.getOriginator().equals("KinesisProducer")) {
logger.info("An order has been received " + event.toString());
orders.save(event.getSubject());
}
else {
logger.info("An order has been placed from this service " + event.toString());
}
};
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,18 +16,21 @@
package demo.stream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
*
* @author Peter Oates
* @author Artem Bilan
*
*/
@Component
@@ -35,15 +38,15 @@ public class OrdersSource {
private final Log logger = LogFactory.getLog(getClass());
private OrderProcessor orderOut;
private BlockingQueue<Event> orderEvent = new LinkedBlockingQueue<>();
@Autowired
public OrdersSource(OrderProcessor orderOut) {
this.orderOut = orderOut;
@Bean
public Supplier<Event> produceOrder() {
return () -> this.orderEvent.poll();
}
public void sendOrder(Event event) {
orderOut.ordersOut().send(new GenericMessage<>(event));
this.orderEvent.offer(event);
logger.info("Event sent: " + event.toString());
}

View File

@@ -2,39 +2,41 @@ originator: KinesisProducer
server:
port: 64398
management:
port: 8082
context-path: /manage
security:
user:
name: admin
password: 2c76788d-e661-49fd-baba-4b41e7c1dd47
server:
port: 8082
servlet:
context-path: /manage
spring:
security:
user:
name: admin
password: 2c76788d-e661-49fd-baba-4b41e7c1dd47
cloud:
stream:
bindings:
ordersOut:
produceOrder-out-0:
destination: test_stream
content-type: application/json
producer:
partitionKeyExpression: "1"
ordersIn:
producer:
partitionKeyExpression: "1"
processOrder-in-0:
destination: test_stream
content-type: application/json
cloud:
aws:
region:
static: eu-west-1
function:
definition: processOrder;produceOrder
logging:
cloud:
aws:
region:
static: eu-west-1
logging:
level:
com:
com:
amazonaws: INFO
org:
apache:
http: INFO
apache:
http: INFO

View File

@@ -1,10 +1,10 @@
== Reactive Spring Cloud Stream AWS Kinesis Binder to SSE via WebFlux
This sample demonstrate a simple bridging of AWS Kinesis stream records to the Server Side Events subscribers.
The `@StreamListener` sink side is based on the Spring Cloud Stream Reactive support, streaming incoming messages to the `Flux` argument which, in turn, is used as a source for the `@GetMapping` controller.
The `Consumer` sink side is based on the Spring Cloud Stream Reactive support, streaming incoming messages to the `Flux` argument which, in turn, is used as a source for the `@GetMapping` controller.
The `CloudStreamKinesisToWebfluxApplicationTests` demonstrates:
- an `AmazonKinesisAsync` client configured against local Kineselite on the `4568` port;
- the `TestSource` binding for producing records into the Kinesis stream;
- a `WebTestClient` to perform SSE request against embedded Netty started by the `CloudStreamKinesisToWebfluxApplication` Spring Boot application on the random port and subsequent verification of the data produced by the `Flux` from the `@StreamListener` against Kinesis Binder consumer.
- the `Supplier` binding for producing records into the Kinesis stream;
- a `WebTestClient` to perform SSE request against embedded Netty started by the `CloudStreamKinesisToWebfluxApplication` Spring Boot application on the random port and subsequent verification of the data produced by the `Flux` from the `Consumer` against Kinesis Binder.

View File

@@ -12,25 +12,37 @@
<description>Spring Cloud Stream Kinesis to WebFlux Sample</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>1.0.0.RELEASE</version>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
@@ -43,12 +55,6 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,39 +18,43 @@ package kinesis.webflux;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.amazonaws.services.kinesis.model.Record;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
/**
* @author Artem Bilan
*/
@SpringBootApplication
@EnableBinding(Sink.class)
@RestController
public class CloudStreamKinesisToWebfluxApplication {
private volatile Flux<String> recordFlux;
private final EmitterProcessor<String> recordProcessor = EmitterProcessor.create();
@GetMapping(value = "/sseFromKinesis", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getSeeFromKinesis() {
return this.recordFlux;
return this.recordProcessor;
}
@StreamListener(Sink.INPUT)
public void kinesisSink(Flux<List<Record>> recordFlux) {
this.recordFlux = recordFlux
.flatMap(Flux::fromIterable)
.map(record -> new String(record.getData().array(), StandardCharsets.UTF_8));
@Bean
public Consumer<Flux<List<Record>>> kinesisSink() {
return recordFlux ->
recordFlux
.flatMap(Flux::fromIterable)
.map(record -> new String(record.getData().array(), StandardCharsets.UTF_8))
.doOnNext(this.recordProcessor::onNext)
.subscribe();
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamKinesisToWebfluxApplication.class, args);
}

View File

@@ -2,7 +2,7 @@ spring:
cloud:
stream:
bindings:
input:
kinesisSink-in-0:
destination: SSE_DATA
group: kinesis-to-sse
consumer:
@@ -10,7 +10,7 @@ spring:
useNativeDecoding: true
kinesis:
bindings:
input:
kinesisSink-in-0:
consumer:
listenerMode: batch

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,10 +16,11 @@
package kinesis.webflux;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -27,20 +28,12 @@ import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWeb
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.aws.autoconfigure.context.ContextResourceLoaderAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.aws.autoconfigure.context.ContextStackAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import com.amazonaws.ClientConfiguration;
@@ -49,36 +42,36 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@RunWith(SpringRunner.class)
/**
* @author Artem Bilan
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"spring.cloud.stream.bindings." + CloudStreamKinesisToWebfluxApplicationTests.TestSource.TO_KINESIS_OUTPUT + ".destination = SSE_DATA",
"spring.cloud.stream.bindings." + CloudStreamKinesisToWebfluxApplicationTests.TestSource.TO_KINESIS_OUTPUT + ".producer.headerMode = none",
"logging.level.org.springframework.integration=TRACE"
"spring.cloud.stream.bindings.kinesisSource-out-0.destination = SSE_DATA",
"spring.cloud.stream.bindings.kinesisSource-out-0.producer.headerMode = none",
"spring.cloud.function.definition=kinesisSink;kinesisSource"
}
)
@AutoConfigureWebTestClient
public class CloudStreamKinesisToWebfluxApplicationTests {
@ClassRule
public static LocalKinesisResource localKinesisResource = new LocalKinesisResource();
@Autowired
private WebTestClient webTestClient;
@Autowired
private TestSource testSource;
private KinesisTestConfiguration kinesisTestConfiguration;
@Test
public void testKinesisToWebFlux() {
this.testSource.toKinesisOutput().send(new GenericMessage<>("foo"));
this.testSource.toKinesisOutput().send(new GenericMessage<>("bar"));
this.testSource.toKinesisOutput().send(new GenericMessage<>("baz"));
void testKinesisToWebFlux() {
this.kinesisTestConfiguration.eventQueue.offer("foo");
this.kinesisTestConfiguration.eventQueue.offer("bar");
this.kinesisTestConfiguration.eventQueue.offer("baz");
Flux<String> seeFlux =
this.webTestClient.get().uri("/sseFromKinesis")
@@ -94,11 +87,19 @@ public class CloudStreamKinesisToWebfluxApplicationTests {
}
@TestConfiguration
@EnableBinding(TestSource.class)
@EnableAutoConfiguration(exclude = ContextResourceLoaderAutoConfiguration.class)
@EnableAutoConfiguration(exclude =
{ ContextResourceLoaderAutoConfiguration.class,
ContextStackAutoConfiguration.class })
public static class KinesisTestConfiguration {
public static final int DEFAULT_KINESALITE_PORT = 4568;
private static final int DEFAULT_KINESALITE_PORT = 4568;
private BlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
@Bean
public Supplier<String> kinesisSource() {
return () -> this.eventQueue.poll();
}
@Bean
public AmazonKinesisAsync amazonKinesis() {
@@ -126,14 +127,12 @@ public class CloudStreamKinesisToWebfluxApplicationTests {
public ConcurrentMetadataStore simpleMetadataStore() {
return new SimpleMetadataStore();
}
}
interface TestSource {
String TO_KINESIS_OUTPUT = "toKinesisOutput";
@Output(TO_KINESIS_OUTPUT)
MessageChannel toKinesisOutput();
@Bean
public AmazonDynamoDBAsync dynamoDB() {
return null;
}
}

View File

@@ -1,121 +0,0 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kinesis.webflux;
import java.util.List;
import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
/**
* An {@link AbstractExternalResourceTestSupport} implementation for Kinesis local service.
*
* @author Artem Bilan
* @author Jacob Severson
*
*/
public class LocalKinesisResource
extends AbstractExternalResourceTestSupport<AmazonKinesisAsync> {
/**
* The default port for the local Kinesis service.
*/
public static final int DEFAULT_PORT = 4568;
private final int port;
public LocalKinesisResource() {
this(DEFAULT_PORT);
}
public LocalKinesisResource(int port) {
super("KINESIS");
this.port = port;
}
@Override
protected void obtainResource() {
// See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true");
this.resource = AmazonKinesisAsyncClientBuilder.standard()
.withClientConfiguration(new ClientConfiguration().withMaxErrorRetry(0)
.withConnectionTimeout(1000))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + this.port,
Regions.DEFAULT_REGION.getName()))
.withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
.build();
// Check connection
this.resource.listStreams();
}
@Override
protected void cleanupResource() {
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
ListStreamsResult listStreamsResult = this.resource
.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
while (listStreamsResult.getHasMoreStreams()) {
if (streamNames.size() > 0) {
listStreamsRequest.setExclusiveStartStreamName(
streamNames.get(streamNames.size() - 1));
}
listStreamsResult = this.resource.listStreams(listStreamsRequest);
streamNames.addAll(listStreamsResult.getStreamNames());
}
for (String stream : streamNames) {
this.resource.deleteStream(stream);
while (true) {
try {
this.resource.describeStream(stream);
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}
catch (ResourceNotFoundException ex) {
break;
}
}
}
System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
this.resource.shutdown();
}
}

View File

@@ -6,11 +6,11 @@
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>kinesis-samples</name>
<description>Collection of Spring Cloud Stream Aggregate Samples</description>
<description>Collection of Spring Cloud Stream Samples with Kinesis Binder</description>
<modules>
<module>kinesis-produce-consume</module>
<!-- <module>kinesis-to-webflux</module> -->
<module>kinesis-to-webflux</module>
</modules>
</project>