diff --git a/axon/README.md b/axon/README.md
index 18f5d568e6..493aebd822 100644
--- a/axon/README.md
+++ b/axon/README.md
@@ -2,6 +2,12 @@
This module contains articles about Axon
+## Scripts
+
+One script is included to easily start middleware using Docker:
+
+- `start_axon_server.sh` to start an Axon Server instance
+
### Relevant articles
- [A Guide to the Axon Framework](https://www.baeldung.com/axon-cqrs-event-sourcing)
diff --git a/axon/pom.xml b/axon/pom.xml
index f37344dc81..32f12f8c43 100644
--- a/axon/pom.xml
+++ b/axon/pom.xml
@@ -1,7 +1,7 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
axon
axon
@@ -31,11 +31,6 @@
org.axonframework
axon-spring-boot-starter
-
- org.axonframework
- axon-test
- test
-
org.springframework.boot
spring-boot-autoconfigure
@@ -49,15 +44,34 @@
org.springframework.boot
spring-boot-starter-data-jpa
+
+ io.projectreactor
+ reactor-core
+
com.h2database
h2
runtime
+
+ org.axonframework
+ axon-test
+ test
+
+
+ org.springframework
+ spring-test
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
- 4.5.13
+ 4.5.17
\ No newline at end of file
diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java b/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java
new file mode 100644
index 0000000000..37d2e67445
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/coreapi/queries/OrderUpdatesQuery.java
@@ -0,0 +1,39 @@
+package com.baeldung.axon.coreapi.queries;
+
+import java.util.Objects;
+
+public class OrderUpdatesQuery {
+
+ private final String orderId;
+
+ public OrderUpdatesQuery(String orderId) {
+ this.orderId = orderId;
+ }
+
+ public String getOrderId() {
+ return orderId;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OrderUpdatesQuery that = (OrderUpdatesQuery) o;
+ return Objects.equals(orderId, that.orderId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(orderId);
+ }
+
+ @Override
+ public String toString() {
+ return "OrderUpdatesQuery{" +
+ "orderId='" + orderId + '\'' +
+ '}';
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java b/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java
new file mode 100644
index 0000000000..3a4129685b
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/coreapi/queries/TotalProductsShippedQuery.java
@@ -0,0 +1,39 @@
+package com.baeldung.axon.coreapi.queries;
+
+import java.util.Objects;
+
+public class TotalProductsShippedQuery {
+
+ private final String productId;
+
+ public TotalProductsShippedQuery(String productId) {
+ this.productId = productId;
+ }
+
+ public String getProductId() {
+ return productId;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TotalProductsShippedQuery that = (TotalProductsShippedQuery) o;
+ return Objects.equals(productId, that.productId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(productId);
+ }
+
+ @Override
+ public String toString() {
+ return "TotalProductsShippedQuery{" +
+ "productId='" + productId + '\'' +
+ '}';
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
index 11e03bf6a5..9901f725fd 100644
--- a/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
+++ b/axon/src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
@@ -6,15 +6,15 @@ import com.baeldung.axon.coreapi.commands.CreateOrderCommand;
import com.baeldung.axon.coreapi.commands.DecrementProductCountCommand;
import com.baeldung.axon.coreapi.commands.IncrementProductCountCommand;
import com.baeldung.axon.coreapi.commands.ShipOrderCommand;
-import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
-import com.baeldung.axon.coreapi.queries.Order;
+import com.baeldung.axon.querymodel.OrderQueryService;
+import com.baeldung.axon.querymodel.OrderResponse;
import org.axonframework.commandhandling.gateway.CommandGateway;
-import org.axonframework.messaging.responsetypes.ResponseTypes;
-import org.axonframework.queryhandling.QueryGateway;
+import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
import java.util.List;
import java.util.UUID;
@@ -24,11 +24,11 @@ import java.util.concurrent.CompletableFuture;
public class OrderRestEndpoint {
private final CommandGateway commandGateway;
- private final QueryGateway queryGateway;
+ private final OrderQueryService orderQueryService;
- public OrderRestEndpoint(CommandGateway commandGateway, QueryGateway queryGateway) {
+ public OrderRestEndpoint(CommandGateway commandGateway, OrderQueryService orderQueryService) {
this.commandGateway = commandGateway;
- this.queryGateway = queryGateway;
+ this.orderQueryService = orderQueryService;
}
@PostMapping("/ship-order")
@@ -88,7 +88,17 @@ public class OrderRestEndpoint {
}
@GetMapping("/all-orders")
- public CompletableFuture> findAllOrders() {
- return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class));
+ public CompletableFuture> findAllOrders() {
+ return orderQueryService.findAllOrders();
+ }
+
+ @GetMapping("/total-shipped/{product-id}")
+ public Integer totalShipped(@PathVariable("product-id") String productId) {
+ return orderQueryService.totalShipped(productId);
+ }
+
+ @GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux orderUpdates(@PathVariable("order-id") String orderId) {
+ return orderQueryService.orderUpdates(orderId);
}
}
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
new file mode 100644
index 0000000000..202b48abfd
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
@@ -0,0 +1,130 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.coreapi.events.OrderConfirmedEvent;
+import com.baeldung.axon.coreapi.events.OrderCreatedEvent;
+import com.baeldung.axon.coreapi.events.OrderShippedEvent;
+import com.baeldung.axon.coreapi.events.ProductAddedEvent;
+import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
+import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
+import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
+import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
+import com.baeldung.axon.coreapi.queries.Order;
+import com.baeldung.axon.coreapi.queries.OrderStatus;
+import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
+import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
+
+import org.axonframework.config.ProcessingGroup;
+import org.axonframework.eventhandling.EventHandler;
+import org.axonframework.queryhandling.QueryHandler;
+import org.axonframework.queryhandling.QueryUpdateEmitter;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Service
+@ProcessingGroup("orders")
+public class InMemoryOrdersEventHandler implements OrdersEventHandler {
+
+ private final Map orders = new HashMap<>();
+ private final QueryUpdateEmitter emitter;
+
+ public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
+ this.emitter = emitter;
+ }
+
+ @EventHandler
+ public void on(OrderCreatedEvent event) {
+ String orderId = event.getOrderId();
+ orders.put(orderId, new Order(orderId));
+ }
+
+ @EventHandler
+ public void on(ProductAddedEvent event) {
+ orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
+ order.addProduct(event.getProductId());
+ emitUpdate(order);
+ return order;
+ });
+ }
+
+ @EventHandler
+ public void on(ProductCountIncrementedEvent event) {
+ orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
+ order.incrementProductInstance(event.getProductId());
+ emitUpdate(order);
+ return order;
+ });
+ }
+
+ @EventHandler
+ public void on(ProductCountDecrementedEvent event) {
+ orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
+ order.decrementProductInstance(event.getProductId());
+ emitUpdate(order);
+ return order;
+ });
+ }
+
+ @EventHandler
+ public void on(ProductRemovedEvent event) {
+ orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
+ order.removeProduct(event.getProductId());
+ emitUpdate(order);
+ return order;
+ });
+ }
+
+ @EventHandler
+ public void on(OrderConfirmedEvent event) {
+ orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
+ order.setOrderConfirmed();
+ emitUpdate(order);
+ return order;
+ });
+ }
+
+ @EventHandler
+ public void on(OrderShippedEvent event) {
+ orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
+ order.setOrderShipped();
+ emitUpdate(order);
+ return order;
+ });
+ }
+
+ @QueryHandler
+ public List handle(FindAllOrderedProductsQuery query) {
+ return new ArrayList<>(orders.values());
+ }
+
+ @QueryHandler
+ public Integer handle(TotalProductsShippedQuery query) {
+ return orders.values()
+ .stream()
+ .filter(o -> o.getOrderStatus() == OrderStatus.SHIPPED)
+ .map(o -> Optional.ofNullable(o.getProducts()
+ .get(query.getProductId()))
+ .orElse(0))
+ .reduce(0, Integer::sum);
+ }
+
+ @QueryHandler
+ public Order handle(OrderUpdatesQuery query) {
+ return orders.get(query.getOrderId());
+ }
+
+ private void emitUpdate(Order order) {
+ emitter.emit(OrderUpdatesQuery.class, q -> order.getOrderId()
+ .equals(q.getOrderId()), order);
+ }
+
+ @Override
+ public void reset(List orderList) {
+ orders.clear();
+ orderList.forEach(o -> orders.put(o.getOrderId(), o));
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/LegacyQueryHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/LegacyQueryHandler.java
new file mode 100644
index 0000000000..f2dfdeb9fb
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/LegacyQueryHandler.java
@@ -0,0 +1,22 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
+
+import org.axonframework.queryhandling.QueryHandler;
+import org.springframework.stereotype.Service;
+
+@Service
+public class LegacyQueryHandler {
+
+ @QueryHandler
+ public Integer handle(TotalProductsShippedQuery query) {
+ switch (query.getProductId()) {
+ case "Deluxe Chair":
+ return 234;
+ case "a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":
+ return 10;
+ default:
+ return 0;
+ }
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java
new file mode 100644
index 0000000000..192fc78226
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderQueryService.java
@@ -0,0 +1,53 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
+import com.baeldung.axon.coreapi.queries.Order;
+import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
+import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
+
+import org.axonframework.messaging.responsetypes.ResponseType;
+import org.axonframework.messaging.responsetypes.ResponseTypes;
+import org.axonframework.queryhandling.QueryGateway;
+import org.axonframework.queryhandling.SubscriptionQueryResult;
+import org.springframework.stereotype.Service;
+
+import reactor.core.publisher.Flux;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Service
+public class OrderQueryService {
+
+ private final QueryGateway queryGateway;
+
+ public OrderQueryService(QueryGateway queryGateway) {
+ this.queryGateway = queryGateway;
+ }
+
+ public CompletableFuture> findAllOrders() {
+ return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(Order.class))
+ .thenApply(r -> r.stream()
+ .map(OrderResponse::new)
+ .collect(Collectors.toList()));
+ }
+
+ public Integer totalShipped(String productId) {
+ return queryGateway.scatterGather(new TotalProductsShippedQuery(productId),
+ ResponseTypes.instanceOf(Integer.class), 10L, TimeUnit.SECONDS)
+ .reduce(0, Integer::sum);
+ }
+
+ public Flux orderUpdates(String orderId) {
+ return subscriptionQuery(new OrderUpdatesQuery(orderId), ResponseTypes.instanceOf(Order.class)).map(OrderResponse::new);
+ }
+
+ private Flux subscriptionQuery(Q query, ResponseType resultType) {
+ SubscriptionQueryResult result = queryGateway.subscriptionQuery(query, resultType, resultType);
+ return result.initialResult()
+ .concatWith(result.updates())
+ .doFinally(signal -> result.close());
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderResponse.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderResponse.java
new file mode 100644
index 0000000000..8e384edd9e
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderResponse.java
@@ -0,0 +1,38 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.coreapi.queries.Order;
+
+import java.util.Map;
+
+import static com.baeldung.axon.querymodel.OrderStatusResponse.toResponse;
+
+public class OrderResponse {
+
+ private String orderId;
+ private Map products;
+ private OrderStatusResponse orderStatus;
+
+ OrderResponse(Order order) {
+ this.orderId = order.getOrderId();
+ this.products = order.getProducts();
+ this.orderStatus = toResponse(order.getOrderStatus());
+ }
+
+ /**
+ * Added for the integration test, since it's using Jackson for the response
+ */
+ OrderResponse() {
+ }
+
+ public String getOrderId() {
+ return orderId;
+ }
+
+ public Map getProducts() {
+ return products;
+ }
+
+ public OrderStatusResponse getOrderStatus() {
+ return orderStatus;
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java
new file mode 100644
index 0000000000..90430b1e3d
--- /dev/null
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrderStatusResponse.java
@@ -0,0 +1,16 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.coreapi.queries.OrderStatus;
+
+public enum OrderStatusResponse {
+ CREATED, CONFIRMED, SHIPPED, UNKNOWN;
+
+ static OrderStatusResponse toResponse(OrderStatus status) {
+ for (OrderStatusResponse response : values()) {
+ if (response.toString().equals(status.toString())) {
+ return response;
+ }
+ }
+ return UNKNOWN;
+ }
+}
diff --git a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java
index 25666b0bf3..0d4aad4c97 100644
--- a/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java
+++ b/axon/src/main/java/com/baeldung/axon/querymodel/OrdersEventHandler.java
@@ -9,78 +9,32 @@ import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
import com.baeldung.axon.coreapi.queries.Order;
-import org.axonframework.config.ProcessingGroup;
-import org.axonframework.eventhandling.EventHandler;
-import org.axonframework.queryhandling.QueryHandler;
-import org.springframework.stereotype.Service;
+import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
+import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-@Service
-@ProcessingGroup("orders")
-public class OrdersEventHandler {
+public interface OrdersEventHandler {
- private final Map orders = new HashMap<>();
+ void on(OrderCreatedEvent event);
- @EventHandler
- public void on(OrderCreatedEvent event) {
- String orderId = event.getOrderId();
- orders.put(orderId, new Order(orderId));
- }
+ void on(ProductAddedEvent event);
- @EventHandler
- public void on(ProductAddedEvent event) {
- orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
- order.addProduct(event.getProductId());
- return order;
- });
- }
+ void on(ProductCountIncrementedEvent event);
- @EventHandler
- public void on(ProductCountIncrementedEvent event) {
- orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
- order.incrementProductInstance(event.getProductId());
- return order;
- });
- }
+ void on(ProductCountDecrementedEvent event);
- @EventHandler
- public void on(ProductCountDecrementedEvent event) {
- orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
- order.decrementProductInstance(event.getProductId());
- return order;
- });
- }
+ void on(ProductRemovedEvent event);
- @EventHandler
- public void on(ProductRemovedEvent event) {
- orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
- order.removeProduct(event.getProductId());
- return order;
- });
- }
+ void on(OrderConfirmedEvent event);
- @EventHandler
- public void on(OrderConfirmedEvent event) {
- orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
- order.setOrderConfirmed();
- return order;
- });
- }
+ void on(OrderShippedEvent event);
- @EventHandler
- public void on(OrderShippedEvent event) {
- orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
- order.setOrderShipped();
- return order;
- });
- }
+ List handle(FindAllOrderedProductsQuery query);
- @QueryHandler
- public List handle(FindAllOrderedProductsQuery query) {
- return new ArrayList<>(orders.values());
- }
-}
\ No newline at end of file
+ Integer handle(TotalProductsShippedQuery query);
+
+ Order handle(OrderUpdatesQuery query);
+
+ void reset(List orderList);
+}
diff --git a/axon/src/main/resources/order-api.http b/axon/src/main/resources/order-api.http
index 6c06c48989..d5f358cbb9 100644
--- a/axon/src/main/resources/order-api.http
+++ b/axon/src/main/resources/order-api.http
@@ -34,4 +34,16 @@ POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/confirm
POST http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/ship
+### Retrieve shipped Deluxe Chairs
+
+GET http://localhost:8080/total-shipped/Deluxe Chair
+
+### Retrieve shipped a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3
+
+GET http://localhost:8080/total-shipped/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3
+
+### Receive updates for 666a1661-474d-4046-8b12-8b5896312768
+
+GET http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768
+
###
diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java
new file mode 100644
index 0000000000..daef0b684d
--- /dev/null
+++ b/axon/src/test/java/com/baeldung/axon/querymodel/AbstractOrdersEventHandlerUnitTest.java
@@ -0,0 +1,171 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.coreapi.events.OrderConfirmedEvent;
+import com.baeldung.axon.coreapi.events.OrderCreatedEvent;
+import com.baeldung.axon.coreapi.events.OrderShippedEvent;
+import com.baeldung.axon.coreapi.events.ProductAddedEvent;
+import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
+import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
+import com.baeldung.axon.coreapi.events.ProductRemovedEvent;
+import com.baeldung.axon.coreapi.queries.FindAllOrderedProductsQuery;
+import com.baeldung.axon.coreapi.queries.Order;
+import com.baeldung.axon.coreapi.queries.OrderStatus;
+import com.baeldung.axon.coreapi.queries.OrderUpdatesQuery;
+import com.baeldung.axon.coreapi.queries.TotalProductsShippedQuery;
+import org.axonframework.queryhandling.QueryUpdateEmitter;
+import org.junit.jupiter.api.*;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+public abstract class AbstractOrdersEventHandlerUnitTest {
+
+ private static final String ORDER_ID_1 = UUID.randomUUID().toString();
+ private static final String ORDER_ID_2 = UUID.randomUUID().toString();
+ private static final String PRODUCT_ID_1 = UUID.randomUUID().toString();
+ private static final String PRODUCT_ID_2 = UUID.randomUUID().toString();
+ private OrdersEventHandler handler;
+ private static Order orderOne;
+ private static Order orderTwo;
+ QueryUpdateEmitter emitter = mock(QueryUpdateEmitter.class);
+
+ @BeforeAll
+ static void createOrders() {
+ orderOne = new Order(ORDER_ID_1);
+ orderOne.getProducts().put(PRODUCT_ID_1, 3);
+ orderOne.setOrderShipped();
+
+ orderTwo = new Order(ORDER_ID_2);
+ orderTwo.getProducts().put(PRODUCT_ID_1, 1);
+ orderTwo.getProducts().put(PRODUCT_ID_2, 1);
+ orderTwo.setOrderConfirmed();
+ }
+
+ @BeforeEach
+ void setUp() {
+ handler = getHandler();
+ }
+
+ protected abstract OrdersEventHandler getHandler();
+
+ @Test
+ void givenTwoOrdersPlacedOfWhichOneNotShipped_whenFindAllOrderedProductsQuery_thenCorrectOrdersAreReturned() {
+ resetWithTwoOrders();
+
+ List result = handler.handle(new FindAllOrderedProductsQuery());
+
+ assertNotNull(result);
+ assertEquals(2, result.size());
+
+ Order order_1 = result.stream().filter(o -> o.getOrderId().equals(ORDER_ID_1)).findFirst().orElse(null);
+ assertEquals(orderOne, order_1);
+
+ Order order_2 = result.stream().filter(o -> o.getOrderId().equals(ORDER_ID_2)).findFirst().orElse(null);
+ assertEquals(orderTwo, order_2);
+ }
+
+ @Test
+ void givenNoOrdersPlaced_whenTotalProductsShippedQuery_thenZeroReturned() {
+ assertEquals(0, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1)));
+ }
+
+ @Test
+ void givenTwoOrdersPlacedOfWhichOneNotShipped_whenTotalProductsShippedQuery_thenOnlyCountProductsFirstOrder() {
+ resetWithTwoOrders();
+
+ assertEquals(3, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1)));
+ assertEquals(0, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_2)));
+ }
+
+ @Test
+ void givenTwoOrdersPlacedAndShipped_whenTotalProductsShippedQuery_thenCountBothOrders() {
+ resetWithTwoOrders();
+ handler.on(new OrderShippedEvent(ORDER_ID_2));
+
+ assertEquals(4, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_1)));
+ assertEquals(1, handler.handle(new TotalProductsShippedQuery(PRODUCT_ID_2)));
+ }
+
+ @Test
+ void givenOrderExist_whenOrderUpdatesQuery_thenOrderReturned() {
+ resetWithTwoOrders();
+
+ Order result = handler.handle(new OrderUpdatesQuery(ORDER_ID_1));
+ assertNotNull(result);
+ assertEquals(ORDER_ID_1, result.getOrderId());
+ assertEquals(3, result.getProducts().get(PRODUCT_ID_1));
+ assertEquals(OrderStatus.SHIPPED, result.getOrderStatus());
+ }
+
+ @Test
+ void givenOrderExist_whenProductAddedEvent_thenUpdateEmittedOnce() {
+ handler.on(new OrderCreatedEvent(ORDER_ID_1));
+
+ handler.on(new ProductAddedEvent(ORDER_ID_1, PRODUCT_ID_1));
+
+ verify(emitter, times(1)).emit(eq(OrderUpdatesQuery.class), any(), any(Order.class));
+ }
+
+ @Test
+ void givenOrderWithProductExist_whenProductCountDecrementedEvent_thenUpdateEmittedOnce() {
+ handler.on(new OrderCreatedEvent(ORDER_ID_1));
+ handler.on(new ProductAddedEvent(ORDER_ID_1, PRODUCT_ID_1));
+ reset(emitter);
+
+ handler.on(new ProductCountDecrementedEvent(ORDER_ID_1, PRODUCT_ID_1));
+
+ verify(emitter, times(1)).emit(eq(OrderUpdatesQuery.class), any(), any(Order.class));
+ }
+
+ @Test
+ void givenOrderWithProductExist_whenProductRemovedEvent_thenUpdateEmittedOnce() {
+ handler.on(new OrderCreatedEvent(ORDER_ID_1));
+ handler.on(new ProductAddedEvent(ORDER_ID_1, PRODUCT_ID_1));
+ reset(emitter);
+
+ handler.on(new ProductRemovedEvent(ORDER_ID_1, PRODUCT_ID_1));
+
+ verify(emitter, times(1)).emit(eq(OrderUpdatesQuery.class), any(), any(Order.class));
+ }
+
+ @Test
+ void givenOrderWithProductExist_whenProductCountIncrementedEvent_thenUpdateEmittedOnce() {
+ handler.on(new OrderCreatedEvent(ORDER_ID_1));
+ handler.on(new ProductAddedEvent(ORDER_ID_1, PRODUCT_ID_1));
+ reset(emitter);
+
+ handler.on(new ProductCountIncrementedEvent(ORDER_ID_1, PRODUCT_ID_1));
+
+ verify(emitter, times(1)).emit(eq(OrderUpdatesQuery.class), any(), any(Order.class));
+ }
+
+ @Test
+ void givenOrderWithProductExist_whenOrderConfirmedEvent_thenUpdateEmittedOnce() {
+ handler.on(new OrderCreatedEvent(ORDER_ID_1));
+ handler.on(new ProductAddedEvent(ORDER_ID_1, PRODUCT_ID_1));
+ reset(emitter);
+
+ handler.on(new OrderConfirmedEvent(ORDER_ID_1));
+
+ verify(emitter, times(1)).emit(eq(OrderUpdatesQuery.class), any(), any(Order.class));
+ }
+
+ @Test
+ void givenOrderWithProductAndConfirmationExist_whenOrderShippedEvent_thenUpdateEmittedOnce() {
+ handler.on(new OrderCreatedEvent(ORDER_ID_1));
+ handler.on(new ProductAddedEvent(ORDER_ID_1, PRODUCT_ID_1));
+ reset(emitter);
+
+ handler.on(new OrderShippedEvent(ORDER_ID_1));
+
+ verify(emitter, times(1)).emit(eq(OrderUpdatesQuery.class), any(), any(Order.class));
+ }
+
+ private void resetWithTwoOrders() {
+ handler.reset(Arrays.asList(orderOne, orderTwo));
+ }
+}
diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandlerUnitTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandlerUnitTest.java
new file mode 100644
index 0000000000..8eddbb0f4f
--- /dev/null
+++ b/axon/src/test/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandlerUnitTest.java
@@ -0,0 +1,9 @@
+package com.baeldung.axon.querymodel;
+
+public class InMemoryOrdersEventHandlerUnitTest extends AbstractOrdersEventHandlerUnitTest {
+
+ @Override
+ protected OrdersEventHandler getHandler() {
+ return new InMemoryOrdersEventHandler(emitter);
+ }
+}
diff --git a/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java
new file mode 100644
index 0000000000..60808b2271
--- /dev/null
+++ b/axon/src/test/java/com/baeldung/axon/querymodel/OrderQueryServiceIntegrationTest.java
@@ -0,0 +1,129 @@
+package com.baeldung.axon.querymodel;
+
+import com.baeldung.axon.OrderApplication;
+import com.baeldung.axon.coreapi.events.OrderConfirmedEvent;
+import com.baeldung.axon.coreapi.events.OrderShippedEvent;
+import com.baeldung.axon.coreapi.events.ProductAddedEvent;
+import com.baeldung.axon.coreapi.events.ProductCountDecrementedEvent;
+import com.baeldung.axon.coreapi.events.ProductCountIncrementedEvent;
+import com.baeldung.axon.coreapi.queries.Order;
+
+import org.axonframework.eventhandling.gateway.EventGateway;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import reactor.test.StepVerifier;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@SpringBootTest(classes = OrderApplication.class)
+class OrderQueryServiceIntegrationTest {
+
+ @Autowired
+ OrderQueryService queryService;
+
+ @Autowired
+ EventGateway eventGateway;
+
+ @Autowired
+ OrdersEventHandler handler;
+
+ private String orderId;
+ private final String productId = "Deluxe Chair";
+
+ @BeforeEach
+ void setUp() {
+ orderId = UUID.randomUUID()
+ .toString();
+ Order order = new Order(orderId);
+ handler.reset(Collections.singletonList(order));
+ }
+
+ @Test
+ void givenOrderCreatedEventSend_whenCallingAllOrders_thenOneCreatedOrderIsReturned() throws ExecutionException, InterruptedException {
+ List result = queryService.findAllOrders()
+ .get();
+ assertEquals(1, result.size());
+ OrderResponse response = result.get(0);
+ assertEquals(orderId, response.getOrderId());
+ assertEquals(OrderStatusResponse.CREATED, response.getOrderStatus());
+ assertTrue(response.getProducts()
+ .isEmpty());
+ }
+
+ @Test
+ void givenThreeDeluxeChairsShipped_whenCallingAllShippedChairs_then234PlusTreeIsReturned() {
+ Order order = new Order(orderId);
+ order.getProducts()
+ .put(productId, 3);
+ order.setOrderShipped();
+ handler.reset(Collections.singletonList(order));
+
+ assertEquals(237, queryService.totalShipped(productId));
+ }
+
+ @Test
+ void givenOrdersAreUpdated_whenCallingOrderUpdates_thenUpdatesReturned() {
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ executor.schedule(this::addIncrementDecrementConfirmAndShip, 100L, TimeUnit.MILLISECONDS);
+ try {
+ StepVerifier.create(queryService.orderUpdates(orderId))
+ .assertNext(order -> assertTrue(order.getProducts()
+ .isEmpty()))
+ .assertNext(order -> assertEquals(1, order.getProducts()
+ .get(productId)))
+ .assertNext(order -> assertEquals(2, order.getProducts()
+ .get(productId)))
+ .assertNext(order -> assertEquals(1, order.getProducts()
+ .get(productId)))
+ .assertNext(order -> assertEquals(OrderStatusResponse.CONFIRMED, order.getOrderStatus()))
+ .assertNext(order -> assertEquals(OrderStatusResponse.SHIPPED, order.getOrderStatus()))
+ .thenCancel()
+ .verify();
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ private void addIncrementDecrementConfirmAndShip() {
+ sendProductAddedEvent();
+ sendProductCountIncrementEvent();
+ sendProductCountDecrementEvent();
+ sendOrderConfirmedEvent();
+ sendOrderShippedEvent();
+ }
+
+ private void sendProductAddedEvent() {
+ ProductAddedEvent event = new ProductAddedEvent(orderId, productId);
+ eventGateway.publish(event);
+ }
+
+ private void sendProductCountIncrementEvent() {
+ ProductCountIncrementedEvent event = new ProductCountIncrementedEvent(orderId, productId);
+ eventGateway.publish(event);
+ }
+
+ private void sendProductCountDecrementEvent() {
+ ProductCountDecrementedEvent event = new ProductCountDecrementedEvent(orderId, productId);
+ eventGateway.publish(event);
+ }
+
+ private void sendOrderConfirmedEvent() {
+ OrderConfirmedEvent event = new OrderConfirmedEvent(orderId);
+ eventGateway.publish(event);
+ }
+
+ private void sendOrderShippedEvent() {
+ OrderShippedEvent event = new OrderShippedEvent(orderId);
+ eventGateway.publish(event);
+ }
+}
diff --git a/axon/src/test/resources/application.properties b/axon/src/test/resources/application.properties
new file mode 100644
index 0000000000..35b5452b57
--- /dev/null
+++ b/axon/src/test/resources/application.properties
@@ -0,0 +1 @@
+axon.axonserver.enabled=false
\ No newline at end of file
diff --git a/axon/start_axon_server.sh b/axon/start_axon_server.sh
new file mode 100755
index 0000000000..e3fcff3346
--- /dev/null
+++ b/axon/start_axon_server.sh
@@ -0,0 +1,7 @@
+docker run \
+ -d \
+ --name axon_server \
+ -p 8024:8024 \
+ -p 8124:8124 \
+ -e AXONIQ_AXONSERVER_NAME=order_demo \
+ axoniq/axonserver
\ No newline at end of file