diff --git a/kinesis-samples/kinesis-produce-consume/README.adoc b/kinesis-samples/kinesis-produce-consume/README.adoc
index bb383a3..65e00e0 100644
--- a/kinesis-samples/kinesis-produce-consume/README.adoc
+++ b/kinesis-samples/kinesis-produce-consume/README.adoc
@@ -20,12 +20,11 @@ This sample is a Spring Boot application that uses Spring Cloud Stream to produc
The sample has the following components:
* KinesisApplication - The Spring Boot Main Application
-* OrderController - The Controller exposing POST and GET endpoints
+* OrderController - The Controller exposing POST and GET endpoints
* OrderRepository - An in memory database for storing Order objects
-* OrderProcessor - An interface defining the input `ordersIn` and output `ordersOut` channel bindings
-* OrderSource - The class that produces messages for the stream
-* OrderStreamConfiguration - Listens to the Kinesis stream using ` @StreamListener(OrderProcessor.INPUT)` logs receiving messages from the stream
-
+* OrderSource - The class that produces records for the stream
+* OrderStreamConfiguration - Listens to the Kinesis stream and logs receiving records from the stream
+
## Building with Maven
Build the sample by executing:
@@ -45,7 +44,7 @@ Start the Consumer using:
`kinesis-produce-consume>$ java -jar target/spring-cloud-stream-sample-kinesis-0.0.1.BUILD-SNAPSHOT.jar --originator=KinesisConsumer --server.port=64399`
-The originator is a key added to messages to tell the receiving application who sent the message.
+The originator is a key added to messages to tell the receiving application who sent the message.
If the consumer sent the message nothing is done, if the producer sent the message, then it is saved to an in memory database.
@@ -88,4 +87,4 @@ The returned Order should match the Order that was POSTed to the Producer.
"name": "pencil"
}
]`
-
+
diff --git a/kinesis-samples/kinesis-produce-consume/pom.xml b/kinesis-samples/kinesis-produce-consume/pom.xml
index cd4b7a4..b614b80 100644
--- a/kinesis-samples/kinesis-produce-consume/pom.xml
+++ b/kinesis-samples/kinesis-produce-consume/pom.xml
@@ -9,18 +9,35 @@
kinesis-produce-consume
Spring Cloud Stream Kinesis Sample
+
- io.spring.cloud.stream.sample
- spring-cloud-stream-samples-parent
- 0.0.1-SNAPSHOT
- ../..
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+ Hoxton.BUILD-SNAPSHOT
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
org.springframework.cloud
spring-cloud-stream-binder-kinesis
- 1.0.0.RELEASE
+ 2.0.0.BUILD-SNAPSHOT
diff --git a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderProcessor.java b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderProcessor.java
deleted file mode 100644
index 74d3713..0000000
--- a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderProcessor.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2017 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package demo.stream;
-
-import org.springframework.cloud.stream.annotation.Input;
-import org.springframework.cloud.stream.annotation.Output;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.SubscribableChannel;
-
-/**
- *
- * @author Peter Oates
- *
- */
-public interface OrderProcessor {
-
- String INPUT = "ordersIn";
-
- @Output
- MessageChannel ordersOut();
-
- @Input
- SubscribableChannel ordersIn();
-
-}
diff --git a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java
index eacc87e..78041b6 100644
--- a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java
+++ b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017 the original author or authors.
+ * Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,38 +16,40 @@
package demo.stream;
+import java.util.function.Consumer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.context.annotation.Bean;
import demo.repository.OrderRepository;
/**
*
* @author Peter Oates
+ * @author Artem Bilan
*
*/
-@EnableBinding(OrderProcessor.class)
public class OrderStreamConfiguration {
private final Log logger = LogFactory.getLog(getClass());
- @Autowired
- private OrderRepository orders;
-
- @StreamListener(OrderProcessor.INPUT)
- public void processOrder(Event event) {
- //log the order received
- if (!event.getOriginator().equals("KinesisProducer")) {
- logger.info("An order has been received " + event.toString());
- orders.save(event.getSubject());
- }
- else {
- logger.info("An order has been placed from this service " + event.toString());
- }
+ @Bean
+ public Consumer processOrder(OrderRepository orders) {
+ return event -> {
+ //log the order received
+ if (!event.getOriginator().equals("KinesisProducer")) {
+ logger.info("An order has been received " + event.toString());
+ orders.save(event.getSubject());
+ }
+ else {
+ logger.info("An order has been placed from this service " + event.toString());
+ }
+ };
}
}
diff --git a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrdersSource.java b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrdersSource.java
index 1b917aa..fb3f401 100644
--- a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrdersSource.java
+++ b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrdersSource.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017 the original author or authors.
+ * Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,18 +16,21 @@
package demo.stream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.integration.support.MessageBuilder;
-import org.springframework.messaging.support.GenericMessage;
+import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
*
* @author Peter Oates
+ * @author Artem Bilan
*
*/
@Component
@@ -35,15 +38,15 @@ public class OrdersSource {
private final Log logger = LogFactory.getLog(getClass());
- private OrderProcessor orderOut;
+ private BlockingQueue orderEvent = new LinkedBlockingQueue<>();
- @Autowired
- public OrdersSource(OrderProcessor orderOut) {
- this.orderOut = orderOut;
+ @Bean
+ public Supplier produceOrder() {
+ return () -> this.orderEvent.poll();
}
public void sendOrder(Event event) {
- orderOut.ordersOut().send(new GenericMessage<>(event));
+ this.orderEvent.offer(event);
logger.info("Event sent: " + event.toString());
}
diff --git a/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml b/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml
index e3247b3..9941e7a 100644
--- a/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml
+++ b/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml
@@ -2,39 +2,41 @@ originator: KinesisProducer
server:
port: 64398
management:
- port: 8082
- context-path: /manage
-security:
- user:
- name: admin
- password: 2c76788d-e661-49fd-baba-4b41e7c1dd47
+ server:
+ port: 8082
+ servlet:
+ context-path: /manage
-
spring:
+ security:
+ user:
+ name: admin
+ password: 2c76788d-e661-49fd-baba-4b41e7c1dd47
cloud:
stream:
bindings:
- ordersOut:
+ produceOrder-out-0:
destination: test_stream
content-type: application/json
- producer:
- partitionKeyExpression: "1"
- ordersIn:
+ producer:
+ partitionKeyExpression: "1"
+ processOrder-in-0:
destination: test_stream
content-type: application/json
-
-
-
-cloud:
- aws:
- region:
- static: eu-west-1
+ function:
+ definition: processOrder;produceOrder
-logging:
+cloud:
+ aws:
+ region:
+ static: eu-west-1
+
+
+logging:
level:
- com:
+ com:
amazonaws: INFO
org:
- apache:
- http: INFO
\ No newline at end of file
+ apache:
+ http: INFO
diff --git a/kinesis-samples/kinesis-to-webflux/README.adoc b/kinesis-samples/kinesis-to-webflux/README.adoc
index fafa203..ec4523f 100644
--- a/kinesis-samples/kinesis-to-webflux/README.adoc
+++ b/kinesis-samples/kinesis-to-webflux/README.adoc
@@ -1,10 +1,10 @@
== Reactive Spring Cloud Stream AWS Kinesis Binder to SSE via WebFlux
This sample demonstrate a simple bridging of AWS Kinesis stream records to the Server Side Events subscribers.
-The `@StreamListener` sink side is based on the Spring Cloud Stream Reactive support, streaming incoming messages to the `Flux` argument which, in turn, is used as a source for the `@GetMapping` controller.
+The `Consumer` sink side is based on the Spring Cloud Stream Reactive support, streaming incoming messages to the `Flux` argument which, in turn, is used as a source for the `@GetMapping` controller.
The `CloudStreamKinesisToWebfluxApplicationTests` demonstrates:
- an `AmazonKinesisAsync` client configured against local Kineselite on the `4568` port;
-- the `TestSource` binding for producing records into the Kinesis stream;
-- a `WebTestClient` to perform SSE request against embedded Netty started by the `CloudStreamKinesisToWebfluxApplication` Spring Boot application on the random port and subsequent verification of the data produced by the `Flux` from the `@StreamListener` against Kinesis Binder consumer.
+- the `Supplier` binding for producing records into the Kinesis stream;
+- a `WebTestClient` to perform SSE request against embedded Netty started by the `CloudStreamKinesisToWebfluxApplication` Spring Boot application on the random port and subsequent verification of the data produced by the `Flux` from the `Consumer` against Kinesis Binder.
diff --git a/kinesis-samples/kinesis-to-webflux/pom.xml b/kinesis-samples/kinesis-to-webflux/pom.xml
index 5be9e20..b073070 100644
--- a/kinesis-samples/kinesis-to-webflux/pom.xml
+++ b/kinesis-samples/kinesis-to-webflux/pom.xml
@@ -12,25 +12,37 @@
Spring Cloud Stream Kinesis to WebFlux Sample
- io.spring.cloud.stream.sample
- spring-cloud-stream-samples-parent
- 0.0.1-SNAPSHOT
- ../..
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.2.0.RELEASE
+
+
+ Hoxton.BUILD-SNAPSHOT
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+
+
org.springframework.boot
spring-boot-starter-webflux
-
- org.springframework.cloud
- spring-cloud-stream-reactive
-
org.springframework.cloud
spring-cloud-stream-binder-kinesis
- 1.0.0.RELEASE
+ 2.0.0.BUILD-SNAPSHOT
@@ -43,12 +55,6 @@
reactor-test
test
-
- org.springframework.cloud
- spring-cloud-stream-test-support-internal
- 2.1.0.BUILD-SNAPSHOT
- test
-
diff --git a/kinesis-samples/kinesis-to-webflux/src/main/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplication.java b/kinesis-samples/kinesis-to-webflux/src/main/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplication.java
index 718a444..9671004 100644
--- a/kinesis-samples/kinesis-to-webflux/src/main/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplication.java
+++ b/kinesis-samples/kinesis-to-webflux/src/main/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplication.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 the original author or authors.
+ * Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,39 +18,43 @@ package kinesis.webflux;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.amazonaws.services.kinesis.model.Record;
+import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
+/**
+ * @author Artem Bilan
+ */
@SpringBootApplication
-@EnableBinding(Sink.class)
@RestController
public class CloudStreamKinesisToWebfluxApplication {
- private volatile Flux recordFlux;
+ private final EmitterProcessor recordProcessor = EmitterProcessor.create();
@GetMapping(value = "/sseFromKinesis", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux getSeeFromKinesis() {
- return this.recordFlux;
+ return this.recordProcessor;
}
- @StreamListener(Sink.INPUT)
- public void kinesisSink(Flux> recordFlux) {
- this.recordFlux = recordFlux
- .flatMap(Flux::fromIterable)
- .map(record -> new String(record.getData().array(), StandardCharsets.UTF_8));
+ @Bean
+ public Consumer>> kinesisSink() {
+ return recordFlux ->
+ recordFlux
+ .flatMap(Flux::fromIterable)
+ .map(record -> new String(record.getData().array(), StandardCharsets.UTF_8))
+ .doOnNext(this.recordProcessor::onNext)
+ .subscribe();
}
-
public static void main(String[] args) {
SpringApplication.run(CloudStreamKinesisToWebfluxApplication.class, args);
}
diff --git a/kinesis-samples/kinesis-to-webflux/src/main/resources/application.yml b/kinesis-samples/kinesis-to-webflux/src/main/resources/application.yml
index 8c6f378..f981089 100644
--- a/kinesis-samples/kinesis-to-webflux/src/main/resources/application.yml
+++ b/kinesis-samples/kinesis-to-webflux/src/main/resources/application.yml
@@ -2,7 +2,7 @@ spring:
cloud:
stream:
bindings:
- input:
+ kinesisSink-in-0:
destination: SSE_DATA
group: kinesis-to-sse
consumer:
@@ -10,7 +10,7 @@ spring:
useNativeDecoding: true
kinesis:
bindings:
- input:
+ kinesisSink-in-0:
consumer:
listenerMode: batch
diff --git a/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplicationTests.java b/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplicationTests.java
index 3cd38e1..4867f21 100644
--- a/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplicationTests.java
+++ b/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/CloudStreamKinesisToWebfluxApplicationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 the original author or authors.
+ * Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,10 +16,11 @@
package kinesis.webflux;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -27,20 +28,12 @@ import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWeb
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.aws.autoconfigure.context.ContextResourceLoaderAutoConfiguration;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.Output;
-import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.aws.autoconfigure.context.ContextStackAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
-import org.springframework.integration.metadata.MetadataStore;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.support.GenericMessage;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import com.amazonaws.ClientConfiguration;
@@ -49,36 +42,36 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
-@RunWith(SpringRunner.class)
+/**
+ * @author Artem Bilan
+ */
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
- "spring.cloud.stream.bindings." + CloudStreamKinesisToWebfluxApplicationTests.TestSource.TO_KINESIS_OUTPUT + ".destination = SSE_DATA",
- "spring.cloud.stream.bindings." + CloudStreamKinesisToWebfluxApplicationTests.TestSource.TO_KINESIS_OUTPUT + ".producer.headerMode = none",
- "logging.level.org.springframework.integration=TRACE"
+ "spring.cloud.stream.bindings.kinesisSource-out-0.destination = SSE_DATA",
+ "spring.cloud.stream.bindings.kinesisSource-out-0.producer.headerMode = none",
+ "spring.cloud.function.definition=kinesisSink;kinesisSource"
}
)
@AutoConfigureWebTestClient
public class CloudStreamKinesisToWebfluxApplicationTests {
- @ClassRule
- public static LocalKinesisResource localKinesisResource = new LocalKinesisResource();
-
@Autowired
private WebTestClient webTestClient;
@Autowired
- private TestSource testSource;
+ private KinesisTestConfiguration kinesisTestConfiguration;
@Test
- public void testKinesisToWebFlux() {
- this.testSource.toKinesisOutput().send(new GenericMessage<>("foo"));
- this.testSource.toKinesisOutput().send(new GenericMessage<>("bar"));
- this.testSource.toKinesisOutput().send(new GenericMessage<>("baz"));
+ void testKinesisToWebFlux() {
+ this.kinesisTestConfiguration.eventQueue.offer("foo");
+ this.kinesisTestConfiguration.eventQueue.offer("bar");
+ this.kinesisTestConfiguration.eventQueue.offer("baz");
Flux seeFlux =
this.webTestClient.get().uri("/sseFromKinesis")
@@ -94,11 +87,19 @@ public class CloudStreamKinesisToWebfluxApplicationTests {
}
@TestConfiguration
- @EnableBinding(TestSource.class)
- @EnableAutoConfiguration(exclude = ContextResourceLoaderAutoConfiguration.class)
+ @EnableAutoConfiguration(exclude =
+ { ContextResourceLoaderAutoConfiguration.class,
+ ContextStackAutoConfiguration.class })
public static class KinesisTestConfiguration {
- public static final int DEFAULT_KINESALITE_PORT = 4568;
+ private static final int DEFAULT_KINESALITE_PORT = 4568;
+
+ private BlockingQueue eventQueue = new LinkedBlockingQueue<>();
+
+ @Bean
+ public Supplier kinesisSource() {
+ return () -> this.eventQueue.poll();
+ }
@Bean
public AmazonKinesisAsync amazonKinesis() {
@@ -126,14 +127,12 @@ public class CloudStreamKinesisToWebfluxApplicationTests {
public ConcurrentMetadataStore simpleMetadataStore() {
return new SimpleMetadataStore();
}
- }
- interface TestSource {
- String TO_KINESIS_OUTPUT = "toKinesisOutput";
-
- @Output(TO_KINESIS_OUTPUT)
- MessageChannel toKinesisOutput();
+ @Bean
+ public AmazonDynamoDBAsync dynamoDB() {
+ return null;
+ }
}
diff --git a/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/LocalKinesisResource.java b/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/LocalKinesisResource.java
deleted file mode 100644
index 04af197..0000000
--- a/kinesis-samples/kinesis-to-webflux/src/test/java/kinesis/webflux/LocalKinesisResource.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kinesis.webflux;
-
-import java.util.List;
-
-import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.SDKGlobalConfiguration;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisAsync;
-import com.amazonaws.services.kinesis.AmazonKinesisAsyncClientBuilder;
-import com.amazonaws.services.kinesis.model.ListStreamsRequest;
-import com.amazonaws.services.kinesis.model.ListStreamsResult;
-import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
-
-/**
- * An {@link AbstractExternalResourceTestSupport} implementation for Kinesis local service.
- *
- * @author Artem Bilan
- * @author Jacob Severson
- *
- */
-public class LocalKinesisResource
- extends AbstractExternalResourceTestSupport {
-
- /**
- * The default port for the local Kinesis service.
- */
- public static final int DEFAULT_PORT = 4568;
-
- private final int port;
-
- public LocalKinesisResource() {
- this(DEFAULT_PORT);
- }
-
- public LocalKinesisResource(int port) {
- super("KINESIS");
- this.port = port;
- }
-
- @Override
- protected void obtainResource() {
- // See https://github.com/mhart/kinesalite#cbor-protocol-issues-with-the-java-sdk
- System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
- "true");
-
- this.resource = AmazonKinesisAsyncClientBuilder.standard()
- .withClientConfiguration(new ClientConfiguration().withMaxErrorRetry(0)
- .withConnectionTimeout(1000))
- .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
- "http://localhost:" + this.port,
- Regions.DEFAULT_REGION.getName()))
- .withCredentials(
- new AWSStaticCredentialsProvider(new BasicAWSCredentials("", "")))
- .build();
-
- // Check connection
- this.resource.listStreams();
- }
-
- @Override
- protected void cleanupResource() {
- ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
- ListStreamsResult listStreamsResult = this.resource
- .listStreams(listStreamsRequest);
-
- List streamNames = listStreamsResult.getStreamNames();
-
- while (listStreamsResult.getHasMoreStreams()) {
- if (streamNames.size() > 0) {
- listStreamsRequest.setExclusiveStartStreamName(
- streamNames.get(streamNames.size() - 1));
- }
- listStreamsResult = this.resource.listStreams(listStreamsRequest);
- streamNames.addAll(listStreamsResult.getStreamNames());
- }
-
- for (String stream : streamNames) {
- this.resource.deleteStream(stream);
- while (true) {
- try {
- this.resource.describeStream(stream);
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(ex);
- }
- }
- catch (ResourceNotFoundException ex) {
- break;
- }
- }
- }
-
- System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
- this.resource.shutdown();
- }
-
-}
diff --git a/kinesis-samples/pom.xml b/kinesis-samples/pom.xml
index 166c009..c488a1b 100644
--- a/kinesis-samples/pom.xml
+++ b/kinesis-samples/pom.xml
@@ -6,11 +6,11 @@
0.0.1-SNAPSHOT
pom
kinesis-samples
- Collection of Spring Cloud Stream Aggregate Samples
+ Collection of Spring Cloud Stream Samples with Kinesis Binder
kinesis-produce-consume
-
+ kinesis-to-webflux