diff --git a/axon/pom.xml b/axon/pom.xml
index 32f12f8c43..ae199d27f9 100644
--- a/axon/pom.xml
+++ b/axon/pom.xml
@@ -71,7 +71,7 @@
- 4.5.17
+ 4.6.0
\ No newline at end of file
diff --git a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
index 9901f725fd..64058d5eca 100644
--- a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
+++ b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
@@ -92,6 +92,11 @@ public class OrderRestEndpoint {
return orderQueryService.findAllOrders();
}
+ @GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux allOrdersStreaming() {
+ return orderQueryService.allOrdersStreaming();
+ }
+
@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
return orderQueryService.totalShipped(productId);
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
index 202b48abfd..fbdf819961 100644
--- a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
@@ -17,7 +17,10 @@ import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryUpdateEmitter;
+import org.reactivestreams.Publisher;
import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.HashMap;
@@ -101,6 +104,11 @@ public class InMemoryOrdersEventHandler implements OrdersEventHandler {
return new ArrayList<>(orders.values());
}
+ @QueryHandler
+ public Publisher handleStreaming(FindAllOrderedProductsQuery query) {
+ return Mono.fromCallable(orders::values).flatMapMany(Flux::fromIterable);
+ }
+
@QueryHandler
public Integer handle(TotalProductsShippedQuery query) {
return orders.values()
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java
index 192fc78226..ae391c3cb1 100644
--- a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java
@@ -9,6 +9,7 @@ import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.SubscriptionQueryResult;
+import org.reactivestreams.Publisher;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@@ -34,6 +35,11 @@ public class OrderQueryService {
.collect(Collectors.toList()));
}
+ public Flux allOrdersStreaming() {
+ Publisher publisher = queryGateway.streamingQuery(new FindAllOrderedProductsQuery(), Order.class);
+ return Flux.from(publisher).map(OrderResponse::new);
+ }
+
public Integer totalShipped(String productId) {
return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java
index 0d4aad4c97..7e49abf93b 100644
--- a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java
@@ -11,6 +11,7 @@ import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
+import org.reactivestreams.Publisher;
import java.util.List;
@@ -32,6 +33,8 @@ public interface OrdersEventHandler {
List handle(FindAllOrderedProductsQuery query);
+ Publisher handleStreaming(FindAllOrderedProductsQuery query);
+
Integer handle(TotalProductsShippedQuery query);
Order handle(OrderUpdatesQuery query);
diff --git a/axon/src/main/resources/order-api.http b/axon/src/main/resources/order-api.http
index d5f358cbb9..bd2a4289ab 100644
--- a/axon/src/main/resources/order-api.http
+++ b/axon/src/main/resources/order-api.http
@@ -10,6 +10,10 @@ POST http://localhost:8080/ship-unconfirmed-order
GET http://localhost:8080/all-orders
+### Receive all existing orders using a stream
+
+GET http://localhost:8080/all-orders-streaming
+
### Create Order with id 666a1661-474d-4046-8b12-8b5896312768
POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768
diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java
index daef0b684d..2396d0f10b 100644
--- a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java
+++ b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java
@@ -14,10 +14,13 @@ import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.junit.jupiter.api.*;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
@@ -68,6 +71,26 @@ public abstract class AbstractOrdersEventHandlerUnitTest {
assertEquals(orderTwo, order_2);
}
+ @Test
+ void givenTwoOrdersPlacedOfWhichOneNotShipped_whenFindAllOrderedProductsQueryStreaming_thenCorrectOrdersAreReturned() {
+ resetWithTwoOrders();
+ final Consumer orderVerifier = order -> {
+ if (order.getOrderId().equals(orderOne.getOrderId())) {
+ assertEquals(orderOne, order);
+ } else if (order.getOrderId().equals(orderTwo.getOrderId())) {
+ assertEquals(orderTwo, order);
+ } else {
+ throw new RuntimeException("Would expect either order one or order two");
+ }
+ };
+
+ StepVerifier.create(Flux.from(handler.handleStreaming(new FindAllOrderedProductsQuery())))
+ .assertNext(orderVerifier)
+ .assertNext(orderVerifier)
+ .expectComplete()
+ .verify();
+ }
+
@Test
void givenNoOrdersPlaced_whenTotalProductsShippedQuery_thenZeroReturned() {
assertEquals(0, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1)));
diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java
index 60808b2271..dfb4881fdc 100644
--- a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java
+++ b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java
@@ -6,6 +6,7 @@ import com.baeldung.axon.coreapi.events.OrderShippedEvent;
import com.baeldung.axon.coreapi.events.ProductAddedEvent;
import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
+import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
import org.axonframework.eventhandling.gateway.EventGateway;
@@ -13,6 +14,7 @@ import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.Collections;
@@ -60,6 +62,15 @@ class OrderQueryServiceIntegrationTest {
.isEmpty());
}
+ @Test
+ void givenOrderCreatedEventSend_whenCallingAllOrdersStreaming_thenOneOrderIsReturned() {
+ Flux result = queryService.allOrdersStreaming();
+ StepVerifier.create(result)
+ .assertNext(order -> assertEquals(orderId, order.getOrderId()))
+ .expectComplete()
+ .verify();
+ }
+
@Test
void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
Order order = new Order(orderId);