From f486ea0d9bee681d096ef0d664c26d31ffeb8694 Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Sun, 2 Oct 2022 06:17:55 +0200 Subject: [PATCH] BAEL-5807 Add streaming query (#12769) --- axon/pom.xml | 2 +- .../baeldung/axon/gui/OrderRestEndpoint.java | 5 ++++ .../InMemoryOrdersEventHandler.java | 8 +++++++ .../axon/querymodel/OrderQueryService.java | 6 +++++ .../axon/querymodel/OrdersEventHandler.java | 3 +++ axon/src/main/resources/order-api.http | 4 ++++ .../AbstractOrdersEventHandlerUnitTest.java | 23 +++++++++++++++++++ .../OrderQueryServiceIntegrationTest.java | 11 +++++++++ 8 files changed, 61 insertions(+), 1 deletion(-) diff --git a/axon/pom.xml b/axon/pom.xml index 32f12f8c43..ae199d27f9 100644 --- a/axon/pom.xml +++ b/axon/pom.xml @@ -71,7 +71,7 @@ - 4.5.17 + 4.6.0 \ No newline at end of file diff --git a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java index 9901f725fd..64058d5eca 100644 --- a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java +++ b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java @@ -92,6 +92,11 @@ public class OrderRestEndpoint { return orderQueryService.findAllOrders(); } + @GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux allOrdersStreaming() { + return orderQueryService.allOrdersStreaming(); + } + @GetMapping("/total-shipped/{product-id}") public Integer totalShipped(@PathVariable("product-id") String productId) { return orderQueryService.totalShipped(productId); diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java index 202b48abfd..fbdf819961 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java @@ -17,7 +17,10 @@ import org.axonframework.config.ProcessingGroup; import org.axonframework.eventhandling.EventHandler; import org.axonframework.queryhandling.QueryHandler; import org.axonframework.queryhandling.QueryUpdateEmitter; +import org.reactivestreams.Publisher; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.HashMap; @@ -101,6 +104,11 @@ public class InMemoryOrdersEventHandler implements OrdersEventHandler { return new ArrayList<>(orders.values()); } + @QueryHandler + public Publisher handleStreaming(FindAllOrderedProductsQuery query) { + return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable); + } + @QueryHandler public Integer handle(TotalProductsShippedQuery query) { return orders.values() diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java index 192fc78226..ae391c3cb1 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java @@ -9,6 +9,7 @@ import org.axonframework.messaging.responsetypes.ResponseType; import org.axonframework.messaging.responsetypes.ResponseTypes; import org.axonframework.queryhandling.QueryGateway; import org.axonframework.queryhandling.SubscriptionQueryResult; +import org.reactivestreams.Publisher; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; @@ -34,6 +35,11 @@ public class OrderQueryService { .collect(Collectors.toList())); } + public Flux allOrdersStreaming() { + Publisher publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class); + return Flux.from(publisher).map(OrderResponse::new); + } + public Integer totalShipped(String productId) { return queryGateway.scatterGather(new TotalProductsShippedQuery(productId), ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS) diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java index 0d4aad4c97..7e49abf93b 100644 --- a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java +++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java @@ -11,6 +11,7 @@ import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery; import com.baeldung.axon.coreapi.queries.Order; import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery; import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; +import org.reactivestreams.Publisher; import java.util.List; @@ -32,6 +33,8 @@ public interface OrdersEventHandler { List handle(FindAllOrderedProductsQuery query); + Publisher handleStreaming(FindAllOrderedProductsQuery query); + Integer handle(TotalProductsShippedQuery query); Order handle(OrderUpdatesQuery query); diff --git a/axon/src/main/resources/order-api.http b/axon/src/main/resources/order-api.http index d5f358cbb9..bd2a4289ab 100644 --- a/axon/src/main/resources/order-api.http +++ b/axon/src/main/resources/order-api.http @@ -10,6 +10,10 @@ POST http://localhost:8080/ship-unconfirmed-order GET http://localhost:8080/all-orders +### Receive all existing orders using a stream + +GET http://localhost:8080/all-orders-streaming + ### Create Order with id 666a1661-474d-4046-8b12-8b5896312768 POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768 diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java index daef0b684d..2396d0f10b 100644 --- a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java +++ b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java @@ -14,10 +14,13 @@ import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery; import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery; import org.axonframework.queryhandling.QueryUpdateEmitter; import org.junit.jupiter.api.*; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -68,6 +71,26 @@ public abstract class AbstractOrdersEventHandlerUnitTest { assertEquals(orderTwo, order_2); } + @Test + void givenTwoOrdersPlacedOfWhichOneNotShipped_whenFindAllOrderedProductsQueryStreaming_thenCorrectOrdersAreReturned() { + resetWithTwoOrders(); + final Consumer orderVerifier = order -> { + if (order.getOrderId().equals(orderOne.getOrderId())) { + assertEquals(orderOne, order); + } else if (order.getOrderId().equals(orderTwo.getOrderId())) { + assertEquals(orderTwo, order); + } else { + throw new RuntimeException("Would expect either order one or order two"); + } + }; + + StepVerifier.create(Flux.from(handler.handleStreaming(new FindAllOrderedProductsQuery()))) + .assertNext(orderVerifier) + .assertNext(orderVerifier) + .expectComplete() + .verify(); + } + @Test void givenNoOrdersPlaced_whenTotalProductsShippedQuery_thenZeroReturned() { assertEquals(0, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1))); diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java index 60808b2271..dfb4881fdc 100644 --- a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java +++ b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java @@ -6,6 +6,7 @@ import com.baeldung.axon.coreapi.events.OrderShippedEvent; import com.baeldung.axon.coreapi.events.ProductAddedEvent; import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent; import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent; +import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery; import com.baeldung.axon.coreapi.queries.Order; import org.axonframework.eventhandling.gateway.EventGateway; @@ -13,6 +14,7 @@ import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import java.util.Collections; @@ -60,6 +62,15 @@ class OrderQueryServiceIntegrationTest { .isEmpty()); } + @Test + void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() { + Flux result = queryService.allOrdersStreaming(); + StepVerifier.create(result) + .assertNext(order -> assertEquals(orderId, order.getOrderId())) + .expectComplete() + .verify(); + } + @Test void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() { Order order = new Order(orderId);