GH-233: Add batch consumer for Kinesis sample
Fixes https://github.com/spring-cloud/spring-cloud-stream-samples/issues/233
This commit is contained in:
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package demo.stream;
|
package demo.stream;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
@@ -54,4 +55,20 @@ public class OrderStreamConfiguration {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Consumer<List<Event>> processOrders(OrderRepository orders) {
|
||||||
|
return eventList -> {
|
||||||
|
//log the number of orders received and each order
|
||||||
|
logger.info("Received " + eventList.size() + " orders");
|
||||||
|
for(Event event: eventList) {
|
||||||
|
if (!event.getOriginator().equals("KinesisProducer")) {
|
||||||
|
logger.info("An order has been received " + event.toString());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
logger.info("An order has been placed from this service " + event.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,11 +20,23 @@ spring:
|
|||||||
content-type: application/json
|
content-type: application/json
|
||||||
producer:
|
producer:
|
||||||
partitionKeyExpression: "1"
|
partitionKeyExpression: "1"
|
||||||
|
headerMode: none
|
||||||
processOrder-in-0:
|
processOrder-in-0:
|
||||||
destination: test_stream
|
destination: test_stream
|
||||||
content-type: application/json
|
content-type: application/json
|
||||||
|
processOrders-in-0:
|
||||||
|
destination: test_stream
|
||||||
|
content-type: application/json
|
||||||
|
consumer:
|
||||||
|
headerMode: none
|
||||||
|
kinesis:
|
||||||
|
bindings:
|
||||||
|
processOrders-in-0:
|
||||||
|
consumer:
|
||||||
|
listenerMode: batch
|
||||||
|
recordsLimit: 5
|
||||||
function:
|
function:
|
||||||
definition: processOrder;produceOrder
|
definition: processOrder;produceOrder;processOrders
|
||||||
datasource:
|
datasource:
|
||||||
url: jdbc:h2:mem:testdb
|
url: jdbc:h2:mem:testdb
|
||||||
driverClassName: org.h2.Driver
|
driverClassName: org.h2.Driver
|
||||||
|
|||||||
Reference in New Issue
Block a user