From b07961aa1b89fcffad5447486de2730f13000c85 Mon Sep 17 00:00:00 2001 From: augustine-p <121275043+augustine-p@users.noreply.github.com> Date: Tue, 3 Jan 2023 23:35:36 +0530 Subject: [PATCH] GH-233: Add batch consumer for Kinesis sample Fixes https://github.com/spring-cloud/spring-cloud-stream-samples/issues/233 --- .../demo/stream/OrderStreamConfiguration.java | 17 +++++++++++++++++ .../src/main/resources/application.yml | 14 +++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java index b0b5918..a6708ad 100644 --- a/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java +++ b/kinesis-samples/kinesis-produce-consume/src/main/java/demo/stream/OrderStreamConfiguration.java @@ -16,6 +16,7 @@ package demo.stream; +import java.util.List; import java.util.function.Consumer; import org.apache.commons.logging.Log; @@ -54,4 +55,20 @@ public class OrderStreamConfiguration { }; } + @Bean + public Consumer> processOrders(OrderRepository orders) { + return eventList -> { + //log the number of orders received and each order + logger.info("Received " + eventList.size() + " orders"); + for(Event event: eventList) { + if (!event.getOriginator().equals("KinesisProducer")) { + logger.info("An order has been received " + event.toString()); + } + else { + logger.info("An order has been placed from this service " + event.toString()); + } + } + }; + } + } diff --git a/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml b/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml index e628d26..2d35f96 100644 --- a/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml +++ b/kinesis-samples/kinesis-produce-consume/src/main/resources/application.yml @@ -20,11 +20,23 @@ spring: content-type: application/json producer: partitionKeyExpression: "1" + headerMode: none processOrder-in-0: destination: test_stream content-type: application/json + processOrders-in-0: + destination: test_stream + content-type: application/json + consumer: + headerMode: none + kinesis: + bindings: + processOrders-in-0: + consumer: + listenerMode: batch + recordsLimit: 5 function: - definition: processOrder;produceOrder + definition: processOrder;produceOrder;processOrders datasource: url: jdbc:h2:mem:testdb driverClassName: org.h2.Driver