From f2604672207322aee2288285f2a510a1cf85bc17 Mon Sep 17 00:00:00 2001 From: Kenny Bastani Date: Mon, 16 Jan 2017 00:01:00 -0800 Subject: [PATCH] Consistency testing --- .gitignore | 2 + .travis.yml | 5 +- account/account-web/pom.xml | 2 +- .../demo/account/action/ActivateAccount.java | 49 ++++---- .../demo/account/action/ArchiveAccount.java | 50 ++++---- .../demo/account/action/ConfirmAccount.java | 48 ++++---- .../java/demo/account/action/GetOrders.java | 14 +-- .../java/demo/account/action/PostOrder.java | 39 +++--- .../demo/account/action/SuspendAccount.java | 48 ++++---- .../java/demo/account/domain/Account.java | 6 - .../src/main/resources/application.yml | 7 +- account/account-worker/pom.xml | 2 +- .../src/main/resources/application.yml | 7 +- docker-compose.yml | 9 +- order/order-web/pom.xml | 2 +- .../demo/order/action/AddReservation.java | 53 ++++----- .../java/demo/order/action/CompleteOrder.java | 48 ++++---- .../order/action/CompleteReservation.java | 92 +++++++-------- .../demo/order/action/ConnectAccount.java | 41 +++---- .../demo/order/action/ConnectPayment.java | 43 ++++--- .../java/demo/order/action/CreatePayment.java | 87 +++++++------- .../java/demo/order/action/DeleteOrder.java | 22 ++-- .../demo/order/action/GetReservations.java | 14 +-- .../demo/order/action/ProcessPayment.java | 62 +++++----- .../demo/order/action/ReserveInventory.java | 91 +++++++------- .../demo/order/action/UpdateOrderStatus.java | 33 +++--- .../main/java/demo/order/domain/Order.java | 13 +- .../src/main/resources/application.yml | 7 +- order/order-worker/pom.xml | 2 +- .../java/demo/config/StateMachineConfig.java | 27 +++-- .../src/main/resources/application.yml | 7 +- payment/payment-web/pom.xml | 2 +- .../demo/payment/action/ConnectOrder.java | 50 ++++---- .../demo/payment/action/ProcessPayment.java | 44 +++---- .../java/demo/payment/domain/Payment.java | 2 - .../src/main/resources/application.yml | 7 +- payment/payment-worker/pom.xml | 2 +- .../src/main/resources/application.yml | 7 +- warehouse/warehouse-web/pom.xml | 2 +- .../inventory/action/ReserveInventory.java | 49 ++++---- .../action/UpdateInventoryStatus.java | 34 +++--- .../java/demo/inventory/domain/Inventory.java | 2 - .../inventory/domain/InventoryService.java | 2 +- .../reservation/action/ConnectInventory.java | 111 +++++++++--------- .../demo/reservation/action/ConnectOrder.java | 42 ++++--- .../reservation/action/ReleaseInventory.java | 54 ++++----- .../demo/reservation/domain/Reservation.java | 3 - .../demo/warehouse/action/ReserveOrder.java | 36 +++--- .../java/demo/warehouse/domain/Warehouse.java | 3 +- warehouse/warehouse-worker/pom.xml | 2 +- .../config/ReservationStateMachineConfig.java | 24 +++- .../src/main/resources/application.yml | 7 +- 52 files changed, 688 insertions(+), 729 deletions(-) diff --git a/.gitignore b/.gitignore index c33d7f5..6b0a529 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ dump.rdb *.war *.ear +.DS_Store + # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* diff --git a/.travis.yml b/.travis.yml index 51a11a0..a72a427 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,9 @@ language: java jdk: - oraclejdk8 services: -- rabbitmq - redis +- docker install: mvn clean install -DskipDockerBuild +before_install: +- docker pull spotify/kafka +- docker run -p 2181:2181 -p 9092:9092 spotify/kafka \ No newline at end of file diff --git a/account/account-web/pom.xml b/account/account-web/pom.xml index f9870cc..9f24cf4 100644 --- a/account/account-web/pom.xml +++ b/account/account-web/pom.xml @@ -41,7 +41,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/account/account-web/src/main/java/demo/account/action/ActivateAccount.java b/account/account-web/src/main/java/demo/account/action/ActivateAccount.java index 0b668bb..e431ddd 100644 --- a/account/account-web/src/main/java/demo/account/action/ActivateAccount.java +++ b/account/account-web/src/main/java/demo/account/action/ActivateAccount.java @@ -9,10 +9,10 @@ import demo.account.event.AccountEventType; import demo.domain.Action; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.util.Arrays; -import java.util.function.Function; import static demo.account.domain.AccountStatus.*; @@ -22,38 +22,35 @@ import static demo.account.domain.AccountStatus.*; * @author Kenny Bastani */ @Service +@Transactional public class ActivateAccount extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public Function getFunction() { - return (account) -> { - Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active"); - Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED) - .contains(account.getStatus()), "The account cannot be activated"); + public Account apply(Account account) { + Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active"); + Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED) + .contains(account.getStatus()), "The account cannot be activated"); - AccountService accountService = account.getModule(AccountModule.class) - .getDefaultService(); + AccountService accountService = account.getModule(AccountModule.class) + .getDefaultService(); - AccountStatus status = account.getStatus(); + AccountStatus status = account.getStatus(); - // Activate the account - account.setStatus(AccountStatus.ACCOUNT_ACTIVE); + // Activate the account + account.setStatus(AccountStatus.ACCOUNT_ACTIVE); + account = accountService.update(account); + + try { + // Trigger the account activated event + account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account)); + } catch (Exception ex) { + log.error("Account could not be activated", ex); + + // Rollback the operation + account.setStatus(status); account = accountService.update(account); + } - try { - // Trigger the account activated event - account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account)); - } catch (Exception ex) { - log.error("Account could not be activated", ex); - - // Rollback the operation - account.setStatus(status); - accountService.update(account); - - throw ex; - } - - return account; - }; + return account; } } diff --git a/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java b/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java index 08f13bf..9548923 100644 --- a/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java +++ b/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java @@ -9,10 +9,9 @@ import demo.account.event.AccountEventType; import demo.domain.Action; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.Function; - import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED; @@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED; * @author Kenny Bastani */ @Service +@Transactional public class ArchiveAccount extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public Function getFunction() { - return (account) -> { - Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived"); - Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived"); - - AccountService accountService = account.getModule(AccountModule.class) - .getDefaultService(); + public Account apply(Account account) { + Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived"); + Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived"); - AccountStatus status = account.getStatus(); + AccountService accountService = account.getModule(AccountModule.class) + .getDefaultService(); - // Archive the account - account.setStatus(AccountStatus.ACCOUNT_ARCHIVED); + AccountStatus status = account.getStatus(); + + // Archive the account + account.setStatus(AccountStatus.ACCOUNT_ARCHIVED); + account = accountService.update(account); + + try { + // Trigger the account archived event + account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account)); + } catch (Exception ex) { + log.error("Account could not be archived", ex); + + // Rollback the operation + account.setStatus(status); account = accountService.update(account); + } - try { - // Trigger the account archived event - account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account)); - } catch (Exception ex) { - log.error("Account could not be archived", ex); - - // Rollback the operation - account.setStatus(status); - accountService.update(account); - - throw ex; - } - - return account; - }; + return account; } } diff --git a/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java b/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java index 555740e..1402013 100644 --- a/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java +++ b/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java @@ -9,10 +9,9 @@ import demo.account.event.AccountEventType; import demo.domain.Action; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.Function; - import static demo.account.domain.AccountStatus.ACCOUNT_CONFIRMED; import static demo.account.domain.AccountStatus.ACCOUNT_PENDING; @@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_PENDING; * @author Kenny Bastani */ @Service +@Transactional public class ConfirmAccount extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public Function getFunction() { - return (account) -> { - Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed"); - Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed"); + public Account apply(Account account) { + Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed"); + Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed"); - AccountService accountService = account.getModule(AccountModule.class) - .getDefaultService(); + AccountService accountService = account.getModule(AccountModule.class) + .getDefaultService(); - AccountStatus status = account.getStatus(); + AccountStatus status = account.getStatus(); - // Activate the account - account.setStatus(AccountStatus.ACCOUNT_CONFIRMED); + // Activate the account + account.setStatus(AccountStatus.ACCOUNT_CONFIRMED); + account = accountService.update(account); + + try { + // Trigger the account confirmed event + account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account)); + } catch (Exception ex) { + log.error("Account could not be confirmed", ex); + + // Rollback the operation + account.setStatus(status); account = accountService.update(account); + } - try { - // Trigger the account confirmed event - account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account)); - } catch (Exception ex) { - log.error("Account could not be confirmed", ex); - - // Rollback the operation - account.setStatus(status); - accountService.update(account); - - throw ex; - } - - return account; - }; + return account; } } diff --git a/account/account-web/src/main/java/demo/account/action/GetOrders.java b/account/account-web/src/main/java/demo/account/action/GetOrders.java index b054a77..46b6f28 100644 --- a/account/account-web/src/main/java/demo/account/action/GetOrders.java +++ b/account/account-web/src/main/java/demo/account/action/GetOrders.java @@ -5,8 +5,7 @@ import demo.domain.Action; import demo.order.domain.OrderModule; import demo.order.domain.Orders; import org.springframework.stereotype.Service; - -import java.util.function.Function; +import org.springframework.transaction.annotation.Transactional; /** * Query action to get {@link demo.order.domain.Order}s for an an {@link Account} @@ -14,6 +13,7 @@ import java.util.function.Function; * @author Kenny Bastani */ @Service +@Transactional public class GetOrders extends Action { private OrderModule orderModule; @@ -22,11 +22,9 @@ public class GetOrders extends Action { this.orderModule = orderModule; } - public Function getFunction() { - return (account) -> { - // Get orders from the order service - return orderModule.getDefaultService() - .findOrdersByAccountId(account.getIdentity()); - }; + public Orders apply(Account account) { + // Get orders from the order service + return orderModule.getDefaultService() + .findOrdersByAccountId(account.getIdentity()); } } diff --git a/account/account-web/src/main/java/demo/account/action/PostOrder.java b/account/account-web/src/main/java/demo/account/action/PostOrder.java index d2476ff..a3f29ff 100644 --- a/account/account-web/src/main/java/demo/account/action/PostOrder.java +++ b/account/account-web/src/main/java/demo/account/action/PostOrder.java @@ -9,6 +9,7 @@ import org.apache.log4j.Logger; import org.springframework.hateoas.MediaTypes; import org.springframework.hateoas.client.Traverson; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.web.client.RestClientResponseException; @@ -16,7 +17,6 @@ import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; -import java.util.function.BiFunction; import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; @@ -26,33 +26,32 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; * @author Kenny Bastani */ @Service +@Transactional public class PostOrder extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public BiFunction getFunction() { - return (account, order) -> { - Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order"); - order = order.post(); + public Order apply(Account account, Order order) { + Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order"); + order = order.post(); - try { - // Create traverson for the new order - Traverson traverson = new Traverson(URI.create(order.getLink("self") - .getHref()), MediaTypes.HAL_JSON); + try { + // Create traverson for the new order + Traverson traverson = new Traverson(URI.create(order.getLink("self") + .getHref()), MediaTypes.HAL_JSON); - Map params = new HashMap<>(); - params.put("accountId", account.getIdentity()); + Map params = new HashMap<>(); + params.put("accountId", account.getIdentity()); - order = traverson.follow("commands", "connectAccount") - .withTemplateParameters(params) - .toObject(Order.class); - } catch (RestClientResponseException ex) { - log.error("New order could not be posted for the account", ex); - throw new IllegalStateException(getHttpStatusMessage(ex)); - } + order = traverson.follow("commands", "connectAccount") + .withTemplateParameters(params) + .toObject(Order.class); + } catch (RestClientResponseException ex) { + log.error("New order could not be posted for the account", ex); + throw new IllegalStateException(getHttpStatusMessage(ex)); + } - return order; - }; + return order; } private String getHttpStatusMessage(RestClientResponseException ex) { diff --git a/account/account-web/src/main/java/demo/account/action/SuspendAccount.java b/account/account-web/src/main/java/demo/account/action/SuspendAccount.java index c2d7f56..306fcfa 100644 --- a/account/account-web/src/main/java/demo/account/action/SuspendAccount.java +++ b/account/account-web/src/main/java/demo/account/action/SuspendAccount.java @@ -9,10 +9,9 @@ import demo.account.event.AccountEventType; import demo.domain.Action; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.Function; - import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED; @@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED; * @author Kenny Bastani */ @Service +@Transactional public class SuspendAccount extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public Function getFunction() { - return (account) -> { - Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended"); - Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended"); + public Account apply(Account account) { + Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended"); + Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended"); - AccountService accountService = account.getModule(AccountModule.class) - .getDefaultService(); + AccountService accountService = account.getModule(AccountModule.class) + .getDefaultService(); - AccountStatus status = account.getStatus(); + AccountStatus status = account.getStatus(); - // Suspend the account - account.setStatus(AccountStatus.ACCOUNT_SUSPENDED); + // Suspend the account + account.setStatus(AccountStatus.ACCOUNT_SUSPENDED); + account = accountService.update(account); + + try { + // Trigger the account suspended event + account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account)); + } catch (Exception ex) { + log.error("Account could not be suspended", ex); + + // Rollback the operation + account.setStatus(status); account = accountService.update(account); + } - try { - // Trigger the account suspended event - account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account)); - } catch (Exception ex) { - log.error("Account could not be suspended", ex); - - // Rollback the operation - account.setStatus(status); - accountService.update(account); - - throw ex; - } - - return account; - }; + return account; } } diff --git a/account/account-web/src/main/java/demo/account/domain/Account.java b/account/account-web/src/main/java/demo/account/domain/Account.java index 7e84787..a99c5cf 100644 --- a/account/account-web/src/main/java/demo/account/domain/Account.java +++ b/account/account-web/src/main/java/demo/account/domain/Account.java @@ -88,42 +88,36 @@ public class Account extends AbstractEntity { @JsonIgnore public Orders getOrders() { return getAction(GetOrders.class) - .getFunction() .apply(this); } @Command(method = "activate", controller = AccountController.class) public Account activate() { return getAction(ActivateAccount.class) - .getFunction() .apply(this); } @Command(method = "archive", controller = AccountController.class) public Account archive() { return getAction(ArchiveAccount.class) - .getFunction() .apply(this); } @Command(method = "confirm", controller = AccountController.class) public Account confirm() { return getAction(ConfirmAccount.class) - .getFunction() .apply(this); } @Command(method = "suspend", controller = AccountController.class) public Account suspend() { return getAction(SuspendAccount.class) - .getFunction() .apply(this); } @Command(method = "postOrder", controller = AccountController.class) public Order postOrder(Order order) { return getAction(PostOrder.class) - .getFunction() .apply(this, order); } diff --git a/account/account-web/src/main/resources/application.yml b/account/account-web/src/main/resources/application.yml index 175c834..ed7a66b 100644 --- a/account/account-web/src/main/resources/application.yml +++ b/account/account-web/src/main/resources/application.yml @@ -23,11 +23,12 @@ spring: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: output: destination: account diff --git a/account/account-worker/pom.xml b/account/account-worker/pom.xml index 8ccc4c7..c18c579 100644 --- a/account/account-worker/pom.xml +++ b/account/account-worker/pom.xml @@ -37,7 +37,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/account/account-worker/src/main/resources/application.yml b/account/account-worker/src/main/resources/application.yml index 356d783..e7d7f9d 100644 --- a/account/account-worker/src/main/resources/application.yml +++ b/account/account-worker/src/main/resources/application.yml @@ -28,11 +28,12 @@ eureka: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: input: destination: account diff --git a/docker-compose.yml b/docker-compose.yml index e84ac5a..1a17348 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,10 +10,13 @@ redis: container_name: redis image: redis:latest net: host -rabbit: - container_name: rabbit - image: rabbitmq:3-management +kafka: + container_name: kafka + image: spotify/kafka:latest net: host + ports: + - 2181:2181 + - 9092:9092 account-web: image: account-web environment: diff --git a/order/order-web/pom.xml b/order/order-web/pom.xml index be2422d..eb8fa53 100644 --- a/order/order-web/pom.xml +++ b/order/order-web/pom.xml @@ -41,7 +41,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/order/order-web/src/main/java/demo/order/action/AddReservation.java b/order/order-web/src/main/java/demo/order/action/AddReservation.java index 9162589..74576eb 100644 --- a/order/order-web/src/main/java/demo/order/action/AddReservation.java +++ b/order/order-web/src/main/java/demo/order/action/AddReservation.java @@ -12,49 +12,46 @@ import org.springframework.hateoas.Link; import org.springframework.hateoas.TemplateVariable; import org.springframework.hateoas.UriTemplate; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.BiFunction; - /** * Connects an {@link Order} to an Account. * * @author Kenny Bastani */ @Service +@Transactional public class AddReservation extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public BiFunction getFunction() { - return (order, reservationId) -> { - Assert.isTrue(order - .getStatus() == OrderStatus.RESERVATION_PENDING, "Order must be in a pending reservation state"); - Assert.isTrue(!order.getReservationIds().contains(reservationId), "Reservation already added to order"); + public Order apply(Order order, Long reservationId) { + Assert.isTrue(order + .getStatus() == OrderStatus.RESERVATION_PENDING, "Order must be in a pending reservation state"); + Assert.isTrue(!order.getReservationIds().contains(reservationId), "Reservation already added to order"); - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - order.getReservationIds().add(reservationId); + order.getReservationIds().add(reservationId); + order = orderService.update(order); + + Link reservationLink = new Link(new UriTemplate("http://warehouse-web/v1/reservations/{id}") + .with("id", TemplateVariable.VariableType.PATH_VARIABLE) + .expand(reservationId) + .toString()).withRel("reservation"); + + try { + // Trigger reservation added event + order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_ADDED, order), reservationLink); + } catch (Exception ex) { + log.error("Could not add reservation to order", ex); + order.getReservationIds().remove(reservationId); + order.setStatus(OrderStatus.RESERVATION_FAILED); order = orderService.update(order); + order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order), reservationLink); + } - Link reservationLink = new Link(new UriTemplate("http://warehouse-web/v1/reservations/{id}") - .with("id", TemplateVariable.VariableType.PATH_VARIABLE) - .expand(reservationId) - .toString()).withRel("reservation"); - - try { - // Trigger reservation added event - order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_ADDED, order), reservationLink); - } catch (Exception ex) { - log.error("Could not add reservation to order", ex); - order.getReservationIds().remove(reservationId); - order.setStatus(OrderStatus.RESERVATION_FAILED); - orderService.update(order); - order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order), reservationLink); - throw ex; - } - - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/CompleteOrder.java b/order/order-web/src/main/java/demo/order/action/CompleteOrder.java index a6c0447..d16474c 100644 --- a/order/order-web/src/main/java/demo/order/action/CompleteOrder.java +++ b/order/order-web/src/main/java/demo/order/action/CompleteOrder.java @@ -9,10 +9,10 @@ import demo.order.event.OrderEvent; import demo.order.event.OrderEventType; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.util.Arrays; -import java.util.function.Function; /** * Completes the {@link Order} and applies a final status @@ -20,39 +20,37 @@ import java.util.function.Function; * @author Kenny Bastani */ @Service +@Transactional public class CompleteOrder extends Action { private final Logger log = Logger.getLogger(CompleteOrder.class); - public Function getFunction() { - return (order) -> { - Assert.isTrue(Arrays.asList(OrderStatus.PAYMENT_FAILED, - OrderStatus.PAYMENT_SUCCEEDED, - OrderStatus.RESERVATION_FAILED).contains(order.getStatus()), "Order must be in a terminal state"); + public Order apply(Order order) { + Assert.isTrue(Arrays.asList(OrderStatus.PAYMENT_FAILED, + OrderStatus.PAYMENT_SUCCEEDED, + OrderStatus.RESERVATION_FAILED).contains(order.getStatus()), "Order must be in a terminal state"); - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - OrderStatus status = order.getStatus(); + OrderStatus status = order.getStatus(); - try { - if (order.getStatus() == OrderStatus.PAYMENT_SUCCEEDED) { - order.setStatus(OrderStatus.ORDER_SUCCEEDED); - order = orderService.update(order); - order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_SUCCEEDED, order)); - } else { - order.setStatus(OrderStatus.ORDER_FAILED); - order = orderService.update(order); - order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_FAILED, order)); - } - } catch (RuntimeException ex) { - log.error("Error completing the order", ex); - // Rollback status change - order.setStatus(status); + try { + if (order.getStatus() == OrderStatus.PAYMENT_SUCCEEDED) { + order.setStatus(OrderStatus.ORDER_SUCCEEDED); order = orderService.update(order); - throw ex; + order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_SUCCEEDED, order)); + } else { + order.setStatus(OrderStatus.ORDER_FAILED); + order = orderService.update(order); + order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_FAILED, order)); } + } catch (RuntimeException ex) { + log.error("Error completing the order", ex); + // Rollback status change + order.setStatus(status); + order = orderService.update(order); + } - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/CompleteReservation.java b/order/order-web/src/main/java/demo/order/action/CompleteReservation.java index 5b7f2c6..8630b4e 100644 --- a/order/order-web/src/main/java/demo/order/action/CompleteReservation.java +++ b/order/order-web/src/main/java/demo/order/action/CompleteReservation.java @@ -11,10 +11,10 @@ import demo.reservation.domain.Reservation; import demo.reservation.domain.ReservationStatus; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import static demo.order.domain.OrderStatus.RESERVATION_FAILED; @@ -26,56 +26,54 @@ import static demo.order.domain.OrderStatus.RESERVATION_SUCCEEDED; * @author Kenny Bastani */ @Service +@Transactional public class CompleteReservation extends Action { private final Logger log = Logger.getLogger(CompleteReservation.class); - public Function getFunction() { - return (order) -> { - if (order.getStatus() != RESERVATION_SUCCEEDED && order.getStatus() != RESERVATION_FAILED) { - Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_PENDING, - "The order must be in a reservation pending state"); - } else { - // Reservation has already completed - return order; - } - - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - - OrderStatus status = order.getStatus(); - - try { - List reservations = order.getReservations().getContent().stream() - .collect(Collectors.toList()); - - // Check if all inventory has been reserved - Boolean orderReserved = reservations.stream() - .allMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED); - - // Check if any inventory reservations have failed - Boolean reservationFailed = reservations.stream() - .anyMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_FAILED); - - if (orderReserved && order.getStatus() == OrderStatus.RESERVATION_PENDING) { - // Succeed the reservation and commit all inventory associated with order - order.setStatus(RESERVATION_SUCCEEDED); - order = orderService.update(order); - order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_SUCCEEDED, order)); - } else if (reservationFailed && order.getStatus() == OrderStatus.RESERVATION_PENDING) { - // Fail the reservation and release all inventory associated with order - order.setStatus(RESERVATION_FAILED); - order = orderService.update(order); - order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order)); - } - } catch (RuntimeException ex) { - log.error("Error completing reservation", ex); - // Rollback status change - order.setStatus(status); - order = orderService.update(order); - throw ex; - } - + public Order apply(Order order) { + if (order.getStatus() != RESERVATION_SUCCEEDED && order.getStatus() != RESERVATION_FAILED) { + Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_PENDING, + "The order must be in a reservation pending state"); + } else { + // Reservation has already completed return order; - }; + } + + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + + OrderStatus status = order.getStatus(); + + try { + List reservations = order.getReservations().getContent().stream() + .collect(Collectors.toList()); + + // Check if all inventory has been reserved + Boolean orderReserved = reservations.stream() + .allMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED); + + // Check if any inventory reservations have failed + Boolean reservationFailed = reservations.stream() + .anyMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_FAILED); + + if (orderReserved && order.getStatus() == OrderStatus.RESERVATION_PENDING) { + // Succeed the reservation and commit all inventory associated with order + order.setStatus(RESERVATION_SUCCEEDED); + order = orderService.update(order); + order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_SUCCEEDED, order)); + } else if (reservationFailed && order.getStatus() == OrderStatus.RESERVATION_PENDING) { + // Fail the reservation and release all inventory associated with order + order.setStatus(RESERVATION_FAILED); + order = orderService.update(order); + order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order)); + } + } catch (RuntimeException ex) { + log.error("Error completing reservation", ex); + // Rollback status change + order.setStatus(status); + order = orderService.update(order); + } + + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/ConnectAccount.java b/order/order-web/src/main/java/demo/order/action/ConnectAccount.java index 29c3fab..b0f4897 100644 --- a/order/order-web/src/main/java/demo/order/action/ConnectAccount.java +++ b/order/order-web/src/main/java/demo/order/action/ConnectAccount.java @@ -9,43 +9,40 @@ import demo.order.event.OrderEvent; import demo.order.event.OrderEventType; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.BiFunction; - /** * Connects an {@link Order} to an Account. * * @author Kenny Bastani */ @Service +@Transactional public class ConnectAccount extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public BiFunction getFunction() { - return (order, accountId) -> { - Assert.isTrue(order.getStatus() == OrderStatus.ORDER_CREATED, "Order must be in a created state"); + public Order apply(Order order, Long accountId) { + Assert.isTrue(order.getStatus() == OrderStatus.ORDER_CREATED, "Order must be in a created state"); - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - // Connect the account - order.setAccountId(accountId); - order.setStatus(OrderStatus.ACCOUNT_CONNECTED); + // Connect the account + order.setAccountId(accountId); + order.setStatus(OrderStatus.ACCOUNT_CONNECTED); + order = orderService.update(order); + + try { + // Trigger the account connected event + order.sendAsyncEvent(new OrderEvent(OrderEventType.ACCOUNT_CONNECTED, order)); + } catch (Exception ex) { + log.error("Could not connect order to account", ex); + order.setAccountId(null); + order.setStatus(OrderStatus.ORDER_CREATED); order = orderService.update(order); + } - try { - // Trigger the account connected event - order.sendAsyncEvent(new OrderEvent(OrderEventType.ACCOUNT_CONNECTED, order)); - } catch (Exception ex) { - log.error("Could not connect order to account", ex); - order.setAccountId(null); - order.setStatus(OrderStatus.ORDER_CREATED); - orderService.update(order); - throw ex; - } - - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/ConnectPayment.java b/order/order-web/src/main/java/demo/order/action/ConnectPayment.java index 90db472..98f44bf 100644 --- a/order/order-web/src/main/java/demo/order/action/ConnectPayment.java +++ b/order/order-web/src/main/java/demo/order/action/ConnectPayment.java @@ -10,43 +10,40 @@ import demo.order.event.OrderEventType; import demo.payment.domain.Payment; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.BiFunction; - /** * Connects a {@link Payment} to an {@link Order}. * * @author Kenny Bastani */ @Service +@Transactional public class ConnectPayment extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public BiFunction getFunction() { - return (order, paymentId) -> { - Assert.isTrue(order - .getStatus() == OrderStatus.PAYMENT_CREATED, "Order must be in a payment created state"); + public Order apply(Order order, Long paymentId) { + Assert.isTrue(order + .getStatus() == OrderStatus.PAYMENT_CREATED, "Order must be in a payment created state"); - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - // Connect the payment - order.setPaymentId(paymentId); - order.setStatus(OrderStatus.PAYMENT_CONNECTED); + // Connect the payment + order.setPaymentId(paymentId); + order.setStatus(OrderStatus.PAYMENT_CONNECTED); + order = orderService.update(order); + + try { + // Trigger the payment connected event + order.sendAsyncEvent(new OrderEvent(OrderEventType.PAYMENT_CONNECTED, order)); + } catch (Exception ex) { + log.error("Could not connect payment to order", ex); + order.setPaymentId(null); + order.setStatus(OrderStatus.ORDER_CREATED); order = orderService.update(order); + } - try { - // Trigger the payment connected event - order.sendAsyncEvent(new OrderEvent(OrderEventType.PAYMENT_CONNECTED, order)); - } catch (Exception ex) { - log.error("Could not connect payment to order", ex); - order.setPaymentId(null); - order.setStatus(OrderStatus.ORDER_CREATED); - orderService.update(order); - throw ex; - } - - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/CreatePayment.java b/order/order-web/src/main/java/demo/order/action/CreatePayment.java index 5b9ac64..8c27616 100644 --- a/order/order-web/src/main/java/demo/order/action/CreatePayment.java +++ b/order/order-web/src/main/java/demo/order/action/CreatePayment.java @@ -12,10 +12,10 @@ import demo.payment.domain.PaymentMethod; import demo.payment.domain.PaymentService; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.util.Arrays; -import java.util.function.Function; /** * Creates a {@link Payment} for an {@link Order}. @@ -23,6 +23,7 @@ import java.util.function.Function; * @author Kenny Bastani */ @Service +@Transactional public class CreatePayment extends Action { private final Logger log = Logger.getLogger(this.getClass()); @@ -32,53 +33,49 @@ public class CreatePayment extends Action { this.paymentService = paymentService; } - public Function getFunction() { - return order -> { - Assert.isTrue(order.getPaymentId() == null, "Payment has already been created"); - Assert.isTrue(!Arrays.asList(OrderStatus.PAYMENT_CREATED, - OrderStatus.PAYMENT_CONNECTED, - OrderStatus.PAYMENT_SUCCEEDED, - OrderStatus.PAYMENT_PENDING).contains(order.getStatus()), "Payment has already been created"); - Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_SUCCEEDED, - "Inventory reservations for the order must be made first before creating a payment"); + public Order apply(Order order) { + Assert.isTrue(order.getPaymentId() == null, "Payment has already been created"); + Assert.isTrue(!Arrays.asList(OrderStatus.PAYMENT_CREATED, + OrderStatus.PAYMENT_CONNECTED, + OrderStatus.PAYMENT_SUCCEEDED, + OrderStatus.PAYMENT_PENDING).contains(order.getStatus()), "Payment has already been created"); + Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_SUCCEEDED, + "Inventory reservations for the order must be made first before creating a payment"); - // Get entity services - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + // Get entity services + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - // Update the order status - order.setStatus(OrderStatus.PAYMENT_PENDING); + // Update the order status + order.setStatus(OrderStatus.PAYMENT_PENDING); + order = orderService.update(order); + + Payment payment = new Payment(); + payment.setAmount(order.calculateTotal()); + payment.setPaymentMethod(PaymentMethod.CREDIT_CARD); + payment = paymentService.create(payment); + + // Update the order status + order.setStatus(OrderStatus.PAYMENT_CREATED); + order = orderService.update(order); + + try { + OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_CREATED, order); + event.add(payment.getLink("self").withRel("payment")); + + // Trigger payment created event + order.sendAsyncEvent(event); + } catch (Exception ex) { + log.error("The order's payment could not be created", ex); + + // Rollback the payment creation + if (payment.getIdentity() != null) + paymentService.delete(payment.getIdentity()); + + order.setPaymentId(null); + order.setStatus(OrderStatus.ACCOUNT_CONNECTED); order = orderService.update(order); + } - Payment payment = new Payment(); - payment.setAmount(order.calculateTotal()); - payment.setPaymentMethod(PaymentMethod.CREDIT_CARD); - payment = paymentService.create(payment); - - // Update the order status - order.setStatus(OrderStatus.PAYMENT_CREATED); - order = orderService.update(order); - - try { - OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_CREATED, order); - event.add(payment.getLink("self").withRel("payment")); - - // Trigger payment created event - order.sendAsyncEvent(event); - } catch (Exception ex) { - log.error("The order's payment could not be created", ex); - - // Rollback the payment creation - if (payment.getIdentity() != null) - paymentService.delete(payment.getIdentity()); - - order.setPaymentId(null); - order.setStatus(OrderStatus.ACCOUNT_CONNECTED); - orderService.update(order); - - throw ex; - } - - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/DeleteOrder.java b/order/order-web/src/main/java/demo/order/action/DeleteOrder.java index 6629fc1..aba03cb 100644 --- a/order/order-web/src/main/java/demo/order/action/DeleteOrder.java +++ b/order/order-web/src/main/java/demo/order/action/DeleteOrder.java @@ -7,8 +7,7 @@ import demo.payment.domain.Payment; import demo.payment.domain.PaymentService; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; - -import java.util.function.Consumer; +import org.springframework.transaction.annotation.Transactional; /** * Processes a {@link Payment} for an {@link Order}. @@ -16,6 +15,7 @@ import java.util.function.Consumer; * @author Kenny Bastani */ @Service +@Transactional public class DeleteOrder extends Action { private final Logger log = Logger.getLogger(this.getClass()); @@ -25,16 +25,14 @@ public class DeleteOrder extends Action { this.paymentService = paymentService; } - public Consumer getConsumer() { - return (order) -> { - // Delete payment - if (order.getPaymentId() != null) - paymentService.delete(order.getPaymentId()); + public void apply(Order order) { + // Delete payment + if (order.getPaymentId() != null) + paymentService.delete(order.getPaymentId()); - // Delete order - order.getModule(OrderModule.class) - .getDefaultService() - .delete(order.getIdentity()); - }; + // Delete order + order.getModule(OrderModule.class) + .getDefaultService() + .delete(order.getIdentity()); } } diff --git a/order/order-web/src/main/java/demo/order/action/GetReservations.java b/order/order-web/src/main/java/demo/order/action/GetReservations.java index 6714f91..4c09aba 100644 --- a/order/order-web/src/main/java/demo/order/action/GetReservations.java +++ b/order/order-web/src/main/java/demo/order/action/GetReservations.java @@ -5,8 +5,7 @@ import demo.order.domain.Order; import demo.reservation.domain.ReservationModule; import demo.reservation.domain.Reservations; import org.springframework.stereotype.Service; - -import java.util.function.Function; +import org.springframework.transaction.annotation.Transactional; /** * Query action to get {@link demo.order.domain.Order}s for an an {@link Order} @@ -14,6 +13,7 @@ import java.util.function.Function; * @author Kenny Bastani */ @Service +@Transactional public class GetReservations extends Action { private final ReservationModule reservationModule; @@ -22,11 +22,9 @@ public class GetReservations extends Action { this.reservationModule = reservationModule; } - public Function getFunction() { - return (order) -> { - // Get orders from the order service - return reservationModule.getDefaultService() - .findReservationsByOrderId(order.getIdentity()); - }; + public Reservations apply(Order order) { + // Get orders from the order service + return reservationModule.getDefaultService() + .findReservationsByOrderId(order.getIdentity()); } } diff --git a/order/order-web/src/main/java/demo/order/action/ProcessPayment.java b/order/order-web/src/main/java/demo/order/action/ProcessPayment.java index 6922f86..b3fecb1 100644 --- a/order/order-web/src/main/java/demo/order/action/ProcessPayment.java +++ b/order/order-web/src/main/java/demo/order/action/ProcessPayment.java @@ -12,11 +12,11 @@ import org.apache.log4j.Logger; import org.springframework.hateoas.MediaTypes; import org.springframework.hateoas.client.Traverson; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.net.URI; import java.util.Arrays; -import java.util.function.Function; /** * Processes a {@link Payment} for an {@link Order}. @@ -24,51 +24,55 @@ import java.util.function.Function; * @author Kenny Bastani */ @Service +@Transactional public class ProcessPayment extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public Function getFunction() { - return order -> { - Assert.isTrue(!Arrays - .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED) - .contains(order.getStatus()), "Payment has already been processed"); - Assert.isTrue(order.getStatus() == OrderStatus.PAYMENT_CONNECTED, - "Order must be in a payment connected state"); + public Order apply(Order order) { + Assert.isTrue(!Arrays + .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED) + .contains(order.getStatus()), "Payment has already been processed"); + Assert.isTrue(order.getStatus() == OrderStatus.PAYMENT_CONNECTED, + "Order must be in a payment connected state"); - // Get entity services - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + // Get entity services + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - // Get the payment - Payment payment = order.getPayment(); + // Get the payment + Payment payment = order.getPayment(); - // Update the order status - order.setStatus(OrderStatus.PAYMENT_PENDING); - order = orderService.update(order); + // Update the order status + order.setStatus(OrderStatus.PAYMENT_PENDING); + order = orderService.update(order); - try { - // Create traverson for the new order - Traverson traverson = new Traverson(URI.create(payment.getLink("self").getHref()), MediaTypes.HAL_JSON); - payment = traverson.follow("commands", "processPayment").toObject(Payment.class); - } catch (Exception ex) { - log.error("The order's payment could not be processed", ex); + boolean paymentSuccess = false; - OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_FAILED, order); - event.add(payment.getLink("self").withRel("payment")); + try { + // Create traverson for the new order + Traverson traverson = new Traverson(URI.create(payment.getLink("self").getHref()), MediaTypes.HAL_JSON); + payment = traverson.follow("commands", "processPayment").toObject(Payment.class); + paymentSuccess = true; + } catch (Exception ex) { + log.error("The order's payment could not be processed", ex); - // Trigger payment failed event - order.sendAsyncEvent(event); + OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_FAILED, order); + event.add(payment.getLink("self").withRel("payment")); - throw ex; - } finally { + // Trigger payment failed event + order.sendAsyncEvent(event); + + paymentSuccess = false; + } finally { + if(paymentSuccess) { OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_SUCCEEDED, order); event.add(payment.getLink("self").withRel("payment")); // Trigger payment succeeded event order.sendAsyncEvent(event); } + } - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/ReserveInventory.java b/order/order-web/src/main/java/demo/order/action/ReserveInventory.java index 9b65f46..b112279 100644 --- a/order/order-web/src/main/java/demo/order/action/ReserveInventory.java +++ b/order/order-web/src/main/java/demo/order/action/ReserveInventory.java @@ -12,10 +12,10 @@ import demo.warehouse.domain.WarehouseService; import demo.warehouse.exception.WarehouseNotFoundException; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.util.Arrays; -import java.util.function.Function; /** * Reserves inventory for an {@link Order}. @@ -23,6 +23,7 @@ import java.util.function.Function; * @author Kenny Bastani */ @Service +@Transactional public class ReserveInventory extends Action { private final Logger log = Logger.getLogger(ReserveInventory.class); @@ -32,63 +33,61 @@ public class ReserveInventory extends Action { this.warehouseService = warehouseService; } - public Function getFunction() { - return (order) -> { - Assert.isTrue(!Arrays - .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, - OrderStatus.PAYMENT_FAILED, OrderStatus.INVENTORY_RESERVED, - OrderStatus.RESERVATION_SUCCEEDED, OrderStatus.RESERVATION_PENDING, - OrderStatus.RESERVATION_FAILED) - .contains(order.getStatus()), "Inventory has already been reserved"); - Assert.isTrue(order - .getStatus() == OrderStatus.ACCOUNT_CONNECTED, "The order must be connected to an account"); + public Order apply(Order order) { + Assert.isTrue(!Arrays + .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, + OrderStatus.PAYMENT_FAILED, OrderStatus.INVENTORY_RESERVED, + OrderStatus.RESERVATION_SUCCEEDED, OrderStatus.RESERVATION_PENDING, + OrderStatus.RESERVATION_FAILED) + .contains(order.getStatus()), "Inventory has already been reserved"); + Assert.isTrue(order + .getStatus() == OrderStatus.ACCOUNT_CONNECTED, "The order must be connected to an account"); - Warehouse warehouse; + Warehouse warehouse; - OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); + OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); - OrderStatus status = order.getStatus(); - order.setStatus(OrderStatus.RESERVATION_PENDING); + OrderStatus status = order.getStatus(); + order.setStatus(OrderStatus.RESERVATION_PENDING); + order = orderService.update(order); + + try { + warehouse = warehouseService.findWarehouseWithInventory(order); + } catch (WarehouseNotFoundException ex) { + log.error("The order contains items that are not available at any warehouse", ex); + throw ex; + } catch (RuntimeException ex) { + log.error("Error connecting to warehouse service", ex); + // Rollback status change + order.setStatus(status); + order = orderService.update(order); + throw ex; + } + + try { + // Reserve inventory for the order from the returned warehouse + warehouse = warehouseService.reserveInventory(warehouse, order); + } catch (Exception ex) { + log.error("Could not reserve inventory for the order", ex); + + order.setStatus(OrderStatus.ACCOUNT_CONNECTED); order = orderService.update(order); - try { - warehouse = warehouseService.findWarehouseWithInventory(order); - } catch (WarehouseNotFoundException ex) { - log.error("The order contains items that are not available at any warehouse", ex); - throw ex; - } catch (RuntimeException ex) { - log.error("Error connecting to warehouse service", ex); - // Rollback status change - order.setStatus(status); - order = orderService.update(order); - throw ex; - } + OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_FAILED, order); + event.add(warehouse.getLink("self").withRel("warehouse")); - try { - // Reserve inventory for the order from the returned warehouse - warehouse = warehouseService.reserveInventory(warehouse, order); - } catch (Exception ex) { - log.error("Could not reserve inventory for the order", ex); - - order.setStatus(OrderStatus.ACCOUNT_CONNECTED); - order = orderService.update(order); - - OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_FAILED, order); - event.add(warehouse.getLink("self").withRel("warehouse")); - - // Trigger reservation failed - order.sendAsyncEvent(event); - - throw ex; - } finally { + // Trigger reservation failed + order.sendAsyncEvent(event); + } finally { + if(order.getStatus() != OrderStatus.ACCOUNT_CONNECTED) { OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_PENDING, order); event.add(warehouse.getLink("self").withRel("warehouse")); // Trigger reservation pending event order.sendAsyncEvent(event); } + } - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java b/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java index e45dd8d..49d46b8 100644 --- a/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java +++ b/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java @@ -6,8 +6,7 @@ import demo.order.domain.OrderService; import demo.order.domain.OrderStatus; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; - -import java.util.function.BiFunction; +import org.springframework.transaction.annotation.Transactional; /** * Updates the status of a {@link Order} entity. @@ -15,6 +14,7 @@ import java.util.function.BiFunction; * @author Kenny Bastani */ @Service +@Transactional public class UpdateOrderStatus extends Action { private final Logger log = Logger.getLogger(this.getClass()); @@ -24,24 +24,21 @@ public class UpdateOrderStatus extends Action { this.orderService = orderService; } - public BiFunction getFunction() { - return (order, orderStatus) -> { + public Order apply(Order order, OrderStatus orderStatus) { - // Save rollback status - OrderStatus rollbackStatus = order.getStatus(); + // Save rollback status + OrderStatus rollbackStatus = order.getStatus(); - try { - // Update status - order.setStatus(orderStatus); - order = orderService.update(order); - } catch (Exception ex) { - log.error("Could not update the status", ex); - order.setStatus(rollbackStatus); - order = orderService.update(order); - throw ex; - } + try { + // Update status + order.setStatus(orderStatus); + order = orderService.update(order); + } catch (Exception ex) { + log.error("Could not update the status", ex); + order.setStatus(rollbackStatus); + order = orderService.update(order); + } - return order; - }; + return order; } } diff --git a/order/order-web/src/main/java/demo/order/domain/Order.java b/order/order-web/src/main/java/demo/order/domain/Order.java index 7939023..f2592b8 100644 --- a/order/order-web/src/main/java/demo/order/domain/Order.java +++ b/order/order-web/src/main/java/demo/order/domain/Order.java @@ -117,77 +117,66 @@ public class Order extends AbstractEntity { @JsonIgnore public Reservations getReservations() { return getAction(GetReservations.class) - .getFunction() .apply(this); } @Command(method = "connectAccount", controller = OrderController.class) public Order connectAccount(Long accountId) { return getAction(ConnectAccount.class) - .getFunction() .apply(this, accountId); } @Command(method = "connectPayment", controller = OrderController.class) public Order connectPayment(Long paymentId) { return getAction(ConnectPayment.class) - .getFunction() .apply(this, paymentId); } @Command(method = "createPayment", controller = OrderController.class) public Order createPayment() { return getAction(CreatePayment.class) - .getFunction() .apply(this); } @Command(method = "processPayment", controller = OrderController.class) public Order processPayment() { return getAction(ProcessPayment.class) - .getFunction() .apply(this); } @Command(method = "reserveInventory", controller = OrderController.class) public Order reserveInventory() { return getAction(ReserveInventory.class) - .getFunction() .apply(this); } @Command(method = "addReservation", controller = OrderController.class) public Order addReservation(Long reservationId) { return getAction(AddReservation.class) - .getFunction() .apply(this, reservationId); } @Command(method = "completeReservation", controller = OrderController.class) public Order completeReservation() { return getAction(CompleteReservation.class) - .getFunction() .apply(this); } @Command(method = "completeOrder", controller = OrderController.class) public Order completeOrder() { return getAction(CompleteOrder.class) - .getFunction() .apply(this); } @Command(method = "updateOrderStatus", controller = OrderController.class) public Order updateOrderStatus(OrderStatus orderStatus) { return getAction(UpdateOrderStatus.class) - .getFunction() .apply(this, orderStatus); } public boolean delete() { getAction(DeleteOrder.class) - .getConsumer() - .accept(this); + .apply(this); return true; } diff --git a/order/order-web/src/main/resources/application.yml b/order/order-web/src/main/resources/application.yml index 29527b2..4bf3de8 100644 --- a/order/order-web/src/main/resources/application.yml +++ b/order/order-web/src/main/resources/application.yml @@ -23,11 +23,12 @@ spring: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: output: destination: order diff --git a/order/order-worker/pom.xml b/order/order-worker/pom.xml index 5a6b75b..ba5ecbf 100644 --- a/order/order-worker/pom.xml +++ b/order/order-worker/pom.xml @@ -37,7 +37,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/order/order-worker/src/main/java/demo/config/StateMachineConfig.java b/order/order-worker/src/main/java/demo/config/StateMachineConfig.java index 26201ca..8532c83 100644 --- a/order/order-worker/src/main/java/demo/config/StateMachineConfig.java +++ b/order/order-worker/src/main/java/demo/config/StateMachineConfig.java @@ -375,26 +375,33 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) + .filter(r -> r.getStatus() != ReservationStatus.RESERVATION_FAILED) .parallel() .forEach(r -> { - Traverson res = new Traverson( - URI.create(r.getLink("self").getHref()), - MediaTypes.HAL_JSON - ); + try { + Traverson res = new Traverson( + URI.create(r.getLink("self").getHref()), + MediaTypes.HAL_JSON + ); - res.follow("self", "commands", "releaseInventory") - .toObject(Reservation.class); + res.follow("self", "commands", "releaseInventory") + .toObject(Reservation.class); + } catch (Exception ex) { + log.error("Could not release inventory for reservation", ex); + } }); - return traverson.follow("self", "commands", "completeOrder") - .toEntity(Order.class) - .getBody(); + return order; + })); } diff --git a/order/order-worker/src/main/resources/application.yml b/order/order-worker/src/main/resources/application.yml index d2dceae..3df889d 100644 --- a/order/order-worker/src/main/resources/application.yml +++ b/order/order-worker/src/main/resources/application.yml @@ -22,11 +22,12 @@ spring: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: input: contentType: 'application/json' diff --git a/payment/payment-web/pom.xml b/payment/payment-web/pom.xml index 53227fc..627bf47 100644 --- a/payment/payment-web/pom.xml +++ b/payment/payment-web/pom.xml @@ -41,7 +41,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java b/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java index 4b8f5c3..e660e77 100644 --- a/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java +++ b/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java @@ -9,42 +9,38 @@ import demo.payment.event.PaymentEvent; import demo.payment.event.PaymentEventType; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.BiFunction; - @Service +@Transactional public class ConnectOrder extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public BiFunction getFunction() { - return (payment, orderId) -> { - Assert.isTrue(payment - .getStatus() == PaymentStatus.PAYMENT_CREATED, "Payment has already been connected to an order"); + public Payment apply(Payment payment, Long orderId) { + Assert.isTrue(payment + .getStatus() == PaymentStatus.PAYMENT_CREATED, "Payment has already been connected to an order"); - PaymentService paymentService = payment.getModule(PaymentModule.class) - .getDefaultService(); + PaymentService paymentService = payment.getModule(PaymentModule.class) + .getDefaultService(); - // Connect the payment to the order - payment.setOrderId(orderId); - payment.setStatus(PaymentStatus.ORDER_CONNECTED); + // Connect the payment to the order + payment.setOrderId(orderId); + payment.setStatus(PaymentStatus.ORDER_CONNECTED); + payment = paymentService.update(payment); + + try { + // Trigger the payment connected + payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment)); + } catch (IllegalStateException ex) { + log.error("Payment could not be connected to order", ex); + + // Rollback operation + payment.setStatus(PaymentStatus.PAYMENT_CREATED); + payment.setOrderId(null); payment = paymentService.update(payment); + } - try { - // Trigger the payment connected - payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment)); - } catch (IllegalStateException ex) { - log.error("Payment could not be connected to order", ex); - - // Rollback operation - payment.setStatus(PaymentStatus.PAYMENT_CREATED); - payment.setOrderId(null); - paymentService.update(payment); - - throw ex; - } - - return payment; - }; + return payment; } } diff --git a/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java b/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java index efce0ae..32a6d64 100644 --- a/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java +++ b/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java @@ -8,15 +8,16 @@ import demo.payment.event.PaymentEvent; import demo.payment.event.PaymentEventType; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import java.util.Arrays; -import java.util.function.Function; import static demo.payment.domain.PaymentStatus.PAYMENT_FAILED; import static demo.payment.domain.PaymentStatus.PAYMENT_SUCCEEDED; @Service +@Transactional public class ProcessPayment extends Action { private final Logger log = Logger.getLogger(this.getClass()); private final PaymentService paymentService; @@ -25,32 +26,31 @@ public class ProcessPayment extends Action { this.paymentService = paymentService; } - public Function getFunction() { - return payment -> { - // Validations - Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED, - PaymentStatus.PAYMENT_PENDING, - PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed"); - Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED, - "Payment must be connected to an order"); + public Payment apply(Payment payment) { + // Validations + Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED, + PaymentStatus.PAYMENT_PENDING, + PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed"); + Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED, + "Payment must be connected to an order"); - payment.setStatus(PaymentStatus.PAYMENT_PROCESSED); - payment = paymentService.update(payment); + payment.setStatus(PaymentStatus.PAYMENT_PROCESSED); + payment = paymentService.update(payment); - try { - // Trigger the payment processed event - payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment)); - } catch (Exception ex) { - log.error("Payment could not be processed", ex); - finalizePayment(payment, PAYMENT_FAILED); - throw ex; - } finally { + try { + // Trigger the payment processed event + payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment)); + } catch (Exception ex) { + log.error("Payment could not be processed", ex); + payment = finalizePayment(payment, PAYMENT_FAILED); + } finally { + if(payment.getStatus() != PAYMENT_FAILED) { // Handle the result asynchronously - finalizePayment(payment, PAYMENT_SUCCEEDED); + payment = finalizePayment(payment, PAYMENT_SUCCEEDED); } + } - return payment; - }; + return payment; } private Payment finalizePayment(Payment payment, PaymentStatus paymentStatus) { diff --git a/payment/payment-web/src/main/java/demo/payment/domain/Payment.java b/payment/payment-web/src/main/java/demo/payment/domain/Payment.java index 8624219..f2b1d4b 100644 --- a/payment/payment-web/src/main/java/demo/payment/domain/Payment.java +++ b/payment/payment-web/src/main/java/demo/payment/domain/Payment.java @@ -91,14 +91,12 @@ public class Payment extends AbstractEntity { @Command(method = "connectOrder", controller = PaymentController.class) public Payment connectOrder(Long orderId) { return getAction(ConnectOrder.class) - .getFunction() .apply(this, orderId); } @Command(method = "processPayment", controller = PaymentController.class) public Payment processPayment() { return getAction(ProcessPayment.class) - .getFunction() .apply(this); } diff --git a/payment/payment-web/src/main/resources/application.yml b/payment/payment-web/src/main/resources/application.yml index 3d3d06d..bb3e739 100644 --- a/payment/payment-web/src/main/resources/application.yml +++ b/payment/payment-web/src/main/resources/application.yml @@ -23,11 +23,12 @@ spring: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: output: contentType: 'application/json' diff --git a/payment/payment-worker/pom.xml b/payment/payment-worker/pom.xml index 42643d9..f6da94e 100644 --- a/payment/payment-worker/pom.xml +++ b/payment/payment-worker/pom.xml @@ -37,7 +37,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/payment/payment-worker/src/main/resources/application.yml b/payment/payment-worker/src/main/resources/application.yml index 58ce116..9556c86 100644 --- a/payment/payment-worker/src/main/resources/application.yml +++ b/payment/payment-worker/src/main/resources/application.yml @@ -22,11 +22,12 @@ spring: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: input: contentType: 'application/json' diff --git a/warehouse/warehouse-web/pom.xml b/warehouse/warehouse-web/pom.xml index f148e24..d228dc2 100644 --- a/warehouse/warehouse-web/pom.xml +++ b/warehouse/warehouse-web/pom.xml @@ -51,7 +51,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java b/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java index 31887d8..a32b368 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java @@ -12,8 +12,6 @@ import org.apache.log4j.Logger; import org.springframework.stereotype.Service; import org.springframework.util.Assert; -import java.util.function.BiFunction; - import static demo.inventory.domain.InventoryStatus.RESERVATION_CONNECTED; /** @@ -33,34 +31,31 @@ public class ReserveInventory extends Action { this.inventoryService = inventoryService; } - public BiFunction getFunction() { - return (inventory, reservationId) -> { - Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED, - "Inventory must be in a reservation connected state"); - Assert.isTrue(inventory.getReservation() == null, - "There is already a reservation attached to the inventory"); + public Inventory apply(Inventory inventory, Long reservationId) { + Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED, + "Inventory must be in a reservation connected state"); + Assert.isTrue(inventory.getReservation() == null, + "There is already a reservation attached to the inventory"); - Reservation reservation = reservationService.get(reservationId); - Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist"); + Reservation reservation = reservationService.get(reservationId); + Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist"); - try { - // Trigger the reservation connected event - inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory)); - } catch (Exception ex) { - log.error("Could not connect reservation to inventory", ex); - inventory.setReservation(null); - inventory.setStatus(InventoryStatus.RESERVATION_PENDING); - inventoryService.update(inventory); - throw ex; - } finally { - if (inventory.getStatus() == RESERVATION_CONNECTED && inventory.getReservation() != null) { - inventory.setStatus(InventoryStatus.INVENTORY_RESERVED); - inventory = inventoryService.update(inventory); - inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.INVENTORY_RESERVED, inventory)); - } + try { + // Trigger the reservation connected event + inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory)); + } catch (Exception ex) { + log.error("Could not connect reservation to inventory", ex); + inventory.setReservation(null); + inventory.setStatus(InventoryStatus.RESERVATION_PENDING); + inventory = inventoryService.update(inventory); + } finally { + if (inventory.getStatus() == RESERVATION_CONNECTED && inventory.getReservation() != null) { + inventory.setStatus(InventoryStatus.INVENTORY_RESERVED); + inventory = inventoryService.update(inventory); + inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.INVENTORY_RESERVED, inventory)); } + } - return inventory; - }; + return inventory; } } diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java b/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java index 57fe360..1c41e08 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java @@ -7,8 +7,7 @@ import demo.inventory.domain.InventoryStatus; import demo.reservation.domain.ReservationService; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; - -import java.util.function.BiFunction; +import org.springframework.transaction.annotation.Transactional; /** * Updates the status of a {@link Inventory} entity. @@ -16,6 +15,7 @@ import java.util.function.BiFunction; * @author Kenny Bastani */ @Service +@Transactional public class UpdateInventoryStatus extends Action { private final Logger log = Logger.getLogger(this.getClass()); @@ -27,24 +27,20 @@ public class UpdateInventoryStatus extends Action { this.inventoryService = inventoryService; } - public BiFunction getFunction() { - return (inventory, inventoryStatus) -> { + public Inventory apply(Inventory inventory, InventoryStatus inventoryStatus) { + // Save rollback status + InventoryStatus rollbackStatus = inventory.getStatus(); - // Save rollback status - InventoryStatus rollbackStatus = inventory.getStatus(); + try { + // Update status + inventory.setStatus(inventoryStatus); + inventory = inventoryService.update(inventory); + } catch (Exception ex) { + log.error("Could not update the status", ex); + inventory.setStatus(rollbackStatus); + inventory = inventoryService.update(inventory); + } - try { - // Update status - inventory.setStatus(inventoryStatus); - inventory = inventoryService.update(inventory); - } catch (Exception ex) { - log.error("Could not update the status", ex); - inventory.setStatus(rollbackStatus); - inventory = inventoryService.update(inventory); - throw ex; - } - - return inventory; - }; + return inventory; } } diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java index 55001a6..fc4d2a2 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java @@ -87,14 +87,12 @@ public class Inventory extends AbstractEntity { @Command(method = "reserve", controller = InventoryController.class) public Inventory reserve(Long reservationId) { return getAction(ReserveInventory.class) - .getFunction() .apply(this, reservationId); } @Command(method = "updateInventoryStatus", controller = InventoryController.class) public Inventory updateStatus(InventoryStatus status) { return getAction(UpdateInventoryStatus.class) - .getFunction() .apply(this, status); } diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java index c98342c..7193052 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java @@ -105,7 +105,7 @@ public class InventoryService extends Service { Boolean lock = false; try { - lock = inventoryLock.tryLock(30, 5000, TimeUnit.MILLISECONDS); + lock = inventoryLock.tryLock(1, 5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted while acquiring lock on inventory", e); } diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java index b69c111..1770e8f 100644 --- a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java +++ b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java @@ -21,7 +21,6 @@ import org.springframework.util.Assert; import java.util.List; import java.util.Random; -import java.util.function.Function; import static demo.reservation.event.ReservationEventType.*; @@ -41,67 +40,73 @@ public class ConnectInventory extends Action { this.inventoryService = inventoryService; } - public Function getFunction() { - return (reservation) -> { - Assert.isTrue(reservation.getStatus() == ReservationStatus.ORDER_CONNECTED, - "Reservation must be in an order connected state"); + public Reservation apply(Reservation reservation) { + Assert.isTrue(reservation.getStatus() == ReservationStatus.ORDER_CONNECTED, + "Reservation must be in an order connected state"); - ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); + ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); - // Set reservation to pending - reservation.setStatus(ReservationStatus.RESERVATION_PENDING); - reservation = reservationService.update(reservation); + // Set reservation to pending + reservation.setStatus(ReservationStatus.RESERVATION_PENDING); + reservation = reservationService.update(reservation); - // Get available inventory and connect reservation in an atomic transaction - Inventory inventory = inventoryService.findAvailableInventory(reservation); + // Get available inventory and connect reservation in an atomic transaction + Inventory inventory = inventoryService.findAvailableInventory(reservation); - try { - if (inventory == null) { - // Inventory is out of stock, fail the reservation process - reservation.setStatus(ReservationStatus.RESERVATION_FAILED); - reservation = reservationService.update(reservation); - - // Trigger reservation failed event - reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation)); - - // Throw the out of stock exception - throw new OutOfStockException("Inventory for reservation is unavailable in warehouse: " - .concat(reservation.getId().toString())); - } - - // Set inventory on reservation and mark successful - reservation.setInventory(inventory); - reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED); + try { + if (inventory == null) { + // Inventory is out of stock, fail the reservation process + reservation.setStatus(ReservationStatus.RESERVATION_FAILED); reservation = reservationService.update(reservation); - // Trigger the inventory connected event - reservation.sendAsyncEvent(new ReservationEvent(INVENTORY_CONNECTED, reservation), - reservation.getInventory().getId().withRel("inventory")); - } catch (Exception ex) { - log.error("Could not connect reservation to order", ex); - if (reservation.getStatus() != ReservationStatus.RESERVATION_FAILED) { - // Rollback the reservation attempt - if (inventory != null) { - inventory.setReservation(null); - inventory.setStatus(InventoryStatus.RESERVATION_PENDING); - inventory = inventoryService.update(inventory); - } + // Trigger reservation failed event + reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation)); - reservation.setInventory(null); - reservation.setStatus(ReservationStatus.ORDER_CONNECTED); - reservation = reservationService.update(reservation); - } - throw ex; - } finally { - if (reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) { - Link inventoryLink = reservation.getInventory().getId().withRel("inventory"); - Link orderLink = getRemoteLink("order-web", "/v1/orders/{id}", reservation.getOrderId(), "order"); - reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_SUCCEEDED, reservation), inventoryLink, orderLink); - } + // Throw the out of stock exception + throw new OutOfStockException("Inventory for reservation is unavailable in warehouse: " + .concat(reservation.getId().toString())); } - return reservation; - }; + inventory.setReservation(reservation); + inventory.setStatus(InventoryStatus.RESERVATION_CONNECTED); + inventory = inventoryService.update(inventory); + + // Set inventory on reservation and mark successful + reservation.setInventory(inventory); + reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED); + reservation = reservationService.update(reservation); + + + // Trigger the inventory connected event + reservation.sendAsyncEvent(new ReservationEvent(INVENTORY_CONNECTED, reservation), + reservation.getInventory().getId().withRel("inventory")); + } catch (Exception ex) { + log.error("Could not connect reservation to order", ex); + if (reservation.getStatus() != ReservationStatus.RESERVATION_FAILED) { + // Rollback the reservation attempt + if (inventory != null) { + inventory.setReservation(null); + inventory.setStatus(InventoryStatus.RESERVATION_PENDING); + inventoryService.update(inventory); + } + + reservation.setInventory(null); + reservation.setStatus(ReservationStatus.ORDER_CONNECTED); + reservation = reservationService.update(reservation); + } + + throw ex; + } finally { + if (reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) { + Link inventoryLink = reservation.getInventory().getId().withRel("inventory"); + Link orderLink = getRemoteLink("order-web", "/v1/orders/{id}", reservation.getOrderId(), "order"); + reservation + .sendAsyncEvent(new ReservationEvent(RESERVATION_SUCCEEDED, reservation), inventoryLink, + orderLink); + } + } + + return reservation; } private Link getRemoteLink(String service, String relative, Object identifier, String rel) { diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java index 113c3a0..4a7fb51 100644 --- a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java +++ b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java @@ -9,42 +9,40 @@ import demo.reservation.event.ReservationEvent; import demo.reservation.event.ReservationEventType; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.BiFunction; - /** * Connects an {@link Reservation} to an Order. * * @author Kenny Bastani */ @Service +@Transactional public class ConnectOrder extends Action { private final Logger log = Logger.getLogger(this.getClass()); - public BiFunction getFunction() { - return (reservation, orderId) -> { - Assert.isTrue(reservation.getStatus() == ReservationStatus.RESERVATION_CREATED, "Reservation must be in a created state"); + public Reservation apply(Reservation reservation, Long orderId) { + Assert.isTrue(reservation + .getStatus() == ReservationStatus.RESERVATION_CREATED, "Reservation must be in a created state"); - ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); + ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); - // Connect the order - reservation.setOrderId(orderId); - reservation.setStatus(ReservationStatus.ORDER_CONNECTED); + // Connect the order + reservation.setOrderId(orderId); + reservation.setStatus(ReservationStatus.ORDER_CONNECTED); + reservation = reservationService.update(reservation); + + try { + // Trigger the order connected event + reservation.sendAsyncEvent(new ReservationEvent(ReservationEventType.ORDER_CONNECTED, reservation)); + } catch (Exception ex) { + log.error("Could not connect reservation to order", ex); + reservation.setOrderId(null); + reservation.setStatus(ReservationStatus.RESERVATION_CREATED); reservation = reservationService.update(reservation); + } - try { - // Trigger the order connected event - reservation.sendAsyncEvent(new ReservationEvent(ReservationEventType.ORDER_CONNECTED, reservation)); - } catch (Exception ex) { - log.error("Could not connect reservation to order", ex); - reservation.setOrderId(null); - reservation.setStatus(ReservationStatus.RESERVATION_CREATED); - reservationService.update(reservation); - throw ex; - } - - return reservation; - }; + return reservation; } } diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java index 4e453fe..0f1b0ca 100644 --- a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java +++ b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java @@ -12,10 +12,9 @@ import demo.reservation.domain.ReservationStatus; import demo.reservation.event.ReservationEvent; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; -import java.util.function.Function; - import static demo.inventory.event.InventoryEventType.INVENTORY_RELEASED; import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED; @@ -25,6 +24,7 @@ import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED; * @author Kenny Bastani */ @Service +@Transactional public class ReleaseInventory extends Action { private final Logger log = Logger.getLogger(this.getClass()); private final InventoryService inventoryService; @@ -33,34 +33,32 @@ public class ReleaseInventory extends Action { this.inventoryService = inventoryService; } - public Function getFunction() { - return (reservation) -> { - Assert.isTrue(reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED, - "Reservation must be in a succeeded state"); - Assert.notNull(reservation.getInventory(), "The reservation has no connected inventory"); + public Reservation apply(Reservation reservation) { + Assert.isTrue(reservation.getStatus() != ReservationStatus.RESERVATION_FAILED, + "Reservation is already in a failed state"); - ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); + ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); - Inventory inventory = reservation.getInventory(); + Inventory inventory = reservation.getInventory(); - try { - // Remove the inventory and set the reservation to failed - reservation.setInventory(null); - reservation.setStatus(ReservationStatus.RESERVATION_FAILED); + try { + // Remove the inventory and set the reservation to failed + reservation.setInventory(null); + reservation.setStatus(ReservationStatus.RESERVATION_FAILED); + reservation = reservationService.update(reservation); + + // Trigger the reservation failed event + reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation)); + } catch (Exception ex) { + log.error("Could not release the reservation's inventory", ex); + if (reservation.getStatus() == ReservationStatus.RESERVATION_FAILED) { + // Rollback the attempt + reservation.setInventory(inventory); + reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED); reservation = reservationService.update(reservation); - - // Trigger the reservation failed event - reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation)); - } catch (Exception ex) { - log.error("Could not release the reservation's inventory", ex); - if (reservation.getStatus() == ReservationStatus.RESERVATION_FAILED) { - // Rollback the attempt - reservation.setInventory(inventory); - reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED); - reservation = reservationService.update(reservation); - } - throw ex; - } finally { + } + } finally { + if (inventory != null && reservation.getStatus() != ReservationStatus.RESERVATION_SUCCEEDED) { // Release the inventory inventory.setReservation(null); inventory.setStatus(InventoryStatus.RESERVATION_PENDING); @@ -69,8 +67,8 @@ public class ReleaseInventory extends Action { // Trigger the inventory released event inventory.sendAsyncEvent(new InventoryEvent(INVENTORY_RELEASED, inventory)); } + } - return reservation; - }; + return reservation; } } diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java b/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java index 3fc2942..947a6d1 100644 --- a/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java +++ b/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java @@ -111,21 +111,18 @@ public class Reservation extends AbstractEntity { @Command(method = "connectInventory", controller = ReservationController.class) public Reservation connectInventory() { return getAction(ConnectInventory.class) - .getFunction() .apply(this); } @Command(method = "releaseInventory", controller = ReservationController.class) public Reservation releaseInventory() { return getAction(ReleaseInventory.class) - .getFunction() .apply(this); } @Command(method = "connectOrder", controller = ReservationController.class) public Reservation connectOrder(Long orderId) { return getAction(ConnectOrder.class) - .getFunction() .apply(this, orderId); } diff --git a/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java b/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java index 81aa57c..fe3c26c 100644 --- a/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java +++ b/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java @@ -9,9 +9,9 @@ import demo.reservation.event.ReservationEventType; import demo.warehouse.domain.Warehouse; import org.apache.log4j.Logger; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.util.List; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -21,6 +21,7 @@ import java.util.stream.IntStream; * @author Kenny Bastani */ @Service +@Transactional public class ReserveOrder extends Action { private final Logger log = Logger.getLogger(ReserveOrder.class); @@ -30,25 +31,22 @@ public class ReserveOrder extends Action { this.reservationService = reservationService; } - public BiConsumer getConsumer() { - return (warehouse, order) -> { + public void apply(Warehouse warehouse, Order order) { + // Create reservations for each order item + List reservations = order.getLineItems().stream() + .map(item -> IntStream.rangeClosed(1, item.getQuantity()) + .mapToObj(a -> new Reservation(item.getProductId(), order.getIdentity(), warehouse))) + .flatMap(a -> a) + .collect(Collectors.toList()); - // Create reservations for each order item - List reservations = order.getLineItems().stream() - .map(item -> IntStream.rangeClosed(1, item.getQuantity()) - .mapToObj(a -> new Reservation(item.getProductId(), order.getIdentity(), warehouse))) - .flatMap(a -> a) - .collect(Collectors.toList()); + // Save the reservations + reservations = reservationService.create(reservations); - // Save the reservations - reservations = reservationService.create(reservations); - - // Trigger reservation requests for each order item - reservations.forEach(r -> { - ReservationEvent event = new ReservationEvent(ReservationEventType.RESERVATION_REQUESTED, r); - event.add(order.getLink("self").withRel("order")); - r.sendAsyncEvent(event); - }); - }; + // Trigger reservation requests for each order item + reservations.forEach(r -> { + ReservationEvent event = new ReservationEvent(ReservationEventType.RESERVATION_REQUESTED, r); + event.add(order.getLink("self").withRel("order")); + r.sendAsyncEvent(event); + }); } } diff --git a/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java b/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java index 235c2b1..7d7b1f2 100644 --- a/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java +++ b/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java @@ -77,8 +77,7 @@ public class Warehouse extends AbstractEntity { @Command(method = "reserveOrder", controller = WarehouseController.class) public Warehouse reserveOrder(Order order) { getAction(ReserveOrder.class) - .getConsumer() - .accept(this, order); + .apply(this, order); return this; } diff --git a/warehouse/warehouse-worker/pom.xml b/warehouse/warehouse-worker/pom.xml index ff9e4ba..1725523 100644 --- a/warehouse/warehouse-worker/pom.xml +++ b/warehouse/warehouse-worker/pom.xml @@ -37,7 +37,7 @@ org.springframework.cloud - spring-cloud-starter-stream-rabbit + spring-cloud-starter-stream-kafka org.springframework.boot diff --git a/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java b/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java index a097bc2..cab26a7 100644 --- a/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java +++ b/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java @@ -26,6 +26,8 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import static demo.order.domain.OrderStatus.ORDER_FAILED; +import static demo.order.domain.OrderStatus.RESERVATION_FAILED; import static demo.order.domain.OrderStatus.RESERVATION_PENDING; /** @@ -243,12 +245,24 @@ public class ReservationStateMachineConfig extends EnumStateMachineConfigurerAda MediaTypes.HAL_JSON ); - traverson.follow("self", "order", "commands", "completeReservation") - .toObject(Order.class); + Order order = traverson.follow("self", "order").toObject(Order.class); + Reservation reservation = null; - return traverson.follow("self") - .toEntity(Reservation.class) - .getBody(); + // Check order status and release inventory if it has failed + if (order.getStatus() == RESERVATION_FAILED || order.getStatus() == ORDER_FAILED) { + reservation = traverson.follow("self", "commands", "releaseInventory") + .toObject(Reservation.class); + } else if (order.getStatus() == RESERVATION_PENDING) { + traverson.follow("self", "order", "commands", "completeReservation") + .toObject(Order.class); + } + + if (reservation == null) + reservation = traverson.follow("self") + .toEntity(Reservation.class) + .getBody(); + + return reservation; })); } diff --git a/warehouse/warehouse-worker/src/main/resources/application.yml b/warehouse/warehouse-worker/src/main/resources/application.yml index 023a4ac..7b786c2 100644 --- a/warehouse/warehouse-worker/src/main/resources/application.yml +++ b/warehouse/warehouse-worker/src/main/resources/application.yml @@ -33,11 +33,12 @@ spring: --- spring: profiles: docker - rabbitmq: - host: ${DOCKER_IP:192.168.99.100} - port: 5672 cloud: stream: + kafka: + binder: + brokers: ${DOCKER_IP:192.168.99.100} + zk-nodes: ${DOCKER_IP:192.168.99.100} bindings: warehouse: contentType: 'application/json'