diff --git a/.gitignore b/.gitignore
index c33d7f5..6b0a529 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,8 @@ dump.rdb
*.war
*.ear
+.DS_Store
+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
diff --git a/.travis.yml b/.travis.yml
index 51a11a0..a72a427 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,6 +2,9 @@ language: java
jdk:
- oraclejdk8
services:
-- rabbitmq
- redis
+- docker
install: mvn clean install -DskipDockerBuild
+before_install:
+- docker pull spotify/kafka
+- docker run -p 2181:2181 -p 9092:9092 spotify/kafka
\ No newline at end of file
diff --git a/account/account-web/pom.xml b/account/account-web/pom.xml
index f9870cc..9f24cf4 100644
--- a/account/account-web/pom.xml
+++ b/account/account-web/pom.xml
@@ -41,7 +41,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/account/account-web/src/main/java/demo/account/action/ActivateAccount.java b/account/account-web/src/main/java/demo/account/action/ActivateAccount.java
index 0b668bb..e431ddd 100644
--- a/account/account-web/src/main/java/demo/account/action/ActivateAccount.java
+++ b/account/account-web/src/main/java/demo/account/action/ActivateAccount.java
@@ -9,10 +9,10 @@ import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.Arrays;
-import java.util.function.Function;
import static demo.account.domain.AccountStatus.*;
@@ -22,38 +22,35 @@ import static demo.account.domain.AccountStatus.*;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ActivateAccount extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public Function getFunction() {
- return (account) -> {
- Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active");
- Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED)
- .contains(account.getStatus()), "The account cannot be activated");
+ public Account apply(Account account) {
+ Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active");
+ Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED)
+ .contains(account.getStatus()), "The account cannot be activated");
- AccountService accountService = account.getModule(AccountModule.class)
- .getDefaultService();
+ AccountService accountService = account.getModule(AccountModule.class)
+ .getDefaultService();
- AccountStatus status = account.getStatus();
+ AccountStatus status = account.getStatus();
- // Activate the account
- account.setStatus(AccountStatus.ACCOUNT_ACTIVE);
+ // Activate the account
+ account.setStatus(AccountStatus.ACCOUNT_ACTIVE);
+ account = accountService.update(account);
+
+ try {
+ // Trigger the account activated event
+ account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account));
+ } catch (Exception ex) {
+ log.error("Account could not be activated", ex);
+
+ // Rollback the operation
+ account.setStatus(status);
account = accountService.update(account);
+ }
- try {
- // Trigger the account activated event
- account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account));
- } catch (Exception ex) {
- log.error("Account could not be activated", ex);
-
- // Rollback the operation
- account.setStatus(status);
- accountService.update(account);
-
- throw ex;
- }
-
- return account;
- };
+ return account;
}
}
diff --git a/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java b/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java
index 08f13bf..9548923 100644
--- a/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java
+++ b/account/account-web/src/main/java/demo/account/action/ArchiveAccount.java
@@ -9,10 +9,9 @@ import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.Function;
-
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED;
@@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ArchiveAccount extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public Function getFunction() {
- return (account) -> {
- Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived");
- Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived");
-
- AccountService accountService = account.getModule(AccountModule.class)
- .getDefaultService();
+ public Account apply(Account account) {
+ Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived");
+ Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived");
- AccountStatus status = account.getStatus();
+ AccountService accountService = account.getModule(AccountModule.class)
+ .getDefaultService();
- // Archive the account
- account.setStatus(AccountStatus.ACCOUNT_ARCHIVED);
+ AccountStatus status = account.getStatus();
+
+ // Archive the account
+ account.setStatus(AccountStatus.ACCOUNT_ARCHIVED);
+ account = accountService.update(account);
+
+ try {
+ // Trigger the account archived event
+ account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account));
+ } catch (Exception ex) {
+ log.error("Account could not be archived", ex);
+
+ // Rollback the operation
+ account.setStatus(status);
account = accountService.update(account);
+ }
- try {
- // Trigger the account archived event
- account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account));
- } catch (Exception ex) {
- log.error("Account could not be archived", ex);
-
- // Rollback the operation
- account.setStatus(status);
- accountService.update(account);
-
- throw ex;
- }
-
- return account;
- };
+ return account;
}
}
diff --git a/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java b/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java
index 555740e..1402013 100644
--- a/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java
+++ b/account/account-web/src/main/java/demo/account/action/ConfirmAccount.java
@@ -9,10 +9,9 @@ import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.Function;
-
import static demo.account.domain.AccountStatus.ACCOUNT_CONFIRMED;
import static demo.account.domain.AccountStatus.ACCOUNT_PENDING;
@@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_PENDING;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ConfirmAccount extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public Function getFunction() {
- return (account) -> {
- Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed");
- Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed");
+ public Account apply(Account account) {
+ Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed");
+ Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed");
- AccountService accountService = account.getModule(AccountModule.class)
- .getDefaultService();
+ AccountService accountService = account.getModule(AccountModule.class)
+ .getDefaultService();
- AccountStatus status = account.getStatus();
+ AccountStatus status = account.getStatus();
- // Activate the account
- account.setStatus(AccountStatus.ACCOUNT_CONFIRMED);
+ // Activate the account
+ account.setStatus(AccountStatus.ACCOUNT_CONFIRMED);
+ account = accountService.update(account);
+
+ try {
+ // Trigger the account confirmed event
+ account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account));
+ } catch (Exception ex) {
+ log.error("Account could not be confirmed", ex);
+
+ // Rollback the operation
+ account.setStatus(status);
account = accountService.update(account);
+ }
- try {
- // Trigger the account confirmed event
- account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account));
- } catch (Exception ex) {
- log.error("Account could not be confirmed", ex);
-
- // Rollback the operation
- account.setStatus(status);
- accountService.update(account);
-
- throw ex;
- }
-
- return account;
- };
+ return account;
}
}
diff --git a/account/account-web/src/main/java/demo/account/action/GetOrders.java b/account/account-web/src/main/java/demo/account/action/GetOrders.java
index b054a77..46b6f28 100644
--- a/account/account-web/src/main/java/demo/account/action/GetOrders.java
+++ b/account/account-web/src/main/java/demo/account/action/GetOrders.java
@@ -5,8 +5,7 @@ import demo.domain.Action;
import demo.order.domain.OrderModule;
import demo.order.domain.Orders;
import org.springframework.stereotype.Service;
-
-import java.util.function.Function;
+import org.springframework.transaction.annotation.Transactional;
/**
* Query action to get {@link demo.order.domain.Order}s for an an {@link Account}
@@ -14,6 +13,7 @@ import java.util.function.Function;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class GetOrders extends Action {
private OrderModule orderModule;
@@ -22,11 +22,9 @@ public class GetOrders extends Action {
this.orderModule = orderModule;
}
- public Function getFunction() {
- return (account) -> {
- // Get orders from the order service
- return orderModule.getDefaultService()
- .findOrdersByAccountId(account.getIdentity());
- };
+ public Orders apply(Account account) {
+ // Get orders from the order service
+ return orderModule.getDefaultService()
+ .findOrdersByAccountId(account.getIdentity());
}
}
diff --git a/account/account-web/src/main/java/demo/account/action/PostOrder.java b/account/account-web/src/main/java/demo/account/action/PostOrder.java
index d2476ff..a3f29ff 100644
--- a/account/account-web/src/main/java/demo/account/action/PostOrder.java
+++ b/account/account-web/src/main/java/demo/account/action/PostOrder.java
@@ -9,6 +9,7 @@ import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.web.client.RestClientResponseException;
@@ -16,7 +17,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import java.util.function.BiFunction;
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
@@ -26,33 +26,32 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class PostOrder extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public BiFunction getFunction() {
- return (account, order) -> {
- Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order");
- order = order.post();
+ public Order apply(Account account, Order order) {
+ Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order");
+ order = order.post();
- try {
- // Create traverson for the new order
- Traverson traverson = new Traverson(URI.create(order.getLink("self")
- .getHref()), MediaTypes.HAL_JSON);
+ try {
+ // Create traverson for the new order
+ Traverson traverson = new Traverson(URI.create(order.getLink("self")
+ .getHref()), MediaTypes.HAL_JSON);
- Map params = new HashMap<>();
- params.put("accountId", account.getIdentity());
+ Map params = new HashMap<>();
+ params.put("accountId", account.getIdentity());
- order = traverson.follow("commands", "connectAccount")
- .withTemplateParameters(params)
- .toObject(Order.class);
- } catch (RestClientResponseException ex) {
- log.error("New order could not be posted for the account", ex);
- throw new IllegalStateException(getHttpStatusMessage(ex));
- }
+ order = traverson.follow("commands", "connectAccount")
+ .withTemplateParameters(params)
+ .toObject(Order.class);
+ } catch (RestClientResponseException ex) {
+ log.error("New order could not be posted for the account", ex);
+ throw new IllegalStateException(getHttpStatusMessage(ex));
+ }
- return order;
- };
+ return order;
}
private String getHttpStatusMessage(RestClientResponseException ex) {
diff --git a/account/account-web/src/main/java/demo/account/action/SuspendAccount.java b/account/account-web/src/main/java/demo/account/action/SuspendAccount.java
index c2d7f56..306fcfa 100644
--- a/account/account-web/src/main/java/demo/account/action/SuspendAccount.java
+++ b/account/account-web/src/main/java/demo/account/action/SuspendAccount.java
@@ -9,10 +9,9 @@ import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.Function;
-
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED;
@@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class SuspendAccount extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public Function getFunction() {
- return (account) -> {
- Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended");
- Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");
+ public Account apply(Account account) {
+ Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended");
+ Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");
- AccountService accountService = account.getModule(AccountModule.class)
- .getDefaultService();
+ AccountService accountService = account.getModule(AccountModule.class)
+ .getDefaultService();
- AccountStatus status = account.getStatus();
+ AccountStatus status = account.getStatus();
- // Suspend the account
- account.setStatus(AccountStatus.ACCOUNT_SUSPENDED);
+ // Suspend the account
+ account.setStatus(AccountStatus.ACCOUNT_SUSPENDED);
+ account = accountService.update(account);
+
+ try {
+ // Trigger the account suspended event
+ account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account));
+ } catch (Exception ex) {
+ log.error("Account could not be suspended", ex);
+
+ // Rollback the operation
+ account.setStatus(status);
account = accountService.update(account);
+ }
- try {
- // Trigger the account suspended event
- account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account));
- } catch (Exception ex) {
- log.error("Account could not be suspended", ex);
-
- // Rollback the operation
- account.setStatus(status);
- accountService.update(account);
-
- throw ex;
- }
-
- return account;
- };
+ return account;
}
}
diff --git a/account/account-web/src/main/java/demo/account/domain/Account.java b/account/account-web/src/main/java/demo/account/domain/Account.java
index 7e84787..a99c5cf 100644
--- a/account/account-web/src/main/java/demo/account/domain/Account.java
+++ b/account/account-web/src/main/java/demo/account/domain/Account.java
@@ -88,42 +88,36 @@ public class Account extends AbstractEntity {
@JsonIgnore
public Orders getOrders() {
return getAction(GetOrders.class)
- .getFunction()
.apply(this);
}
@Command(method = "activate", controller = AccountController.class)
public Account activate() {
return getAction(ActivateAccount.class)
- .getFunction()
.apply(this);
}
@Command(method = "archive", controller = AccountController.class)
public Account archive() {
return getAction(ArchiveAccount.class)
- .getFunction()
.apply(this);
}
@Command(method = "confirm", controller = AccountController.class)
public Account confirm() {
return getAction(ConfirmAccount.class)
- .getFunction()
.apply(this);
}
@Command(method = "suspend", controller = AccountController.class)
public Account suspend() {
return getAction(SuspendAccount.class)
- .getFunction()
.apply(this);
}
@Command(method = "postOrder", controller = AccountController.class)
public Order postOrder(Order order) {
return getAction(PostOrder.class)
- .getFunction()
.apply(this, order);
}
diff --git a/account/account-web/src/main/resources/application.yml b/account/account-web/src/main/resources/application.yml
index 175c834..ed7a66b 100644
--- a/account/account-web/src/main/resources/application.yml
+++ b/account/account-web/src/main/resources/application.yml
@@ -23,11 +23,12 @@ spring:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
output:
destination: account
diff --git a/account/account-worker/pom.xml b/account/account-worker/pom.xml
index 8ccc4c7..c18c579 100644
--- a/account/account-worker/pom.xml
+++ b/account/account-worker/pom.xml
@@ -37,7 +37,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/account/account-worker/src/main/resources/application.yml b/account/account-worker/src/main/resources/application.yml
index 356d783..e7d7f9d 100644
--- a/account/account-worker/src/main/resources/application.yml
+++ b/account/account-worker/src/main/resources/application.yml
@@ -28,11 +28,12 @@ eureka:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
input:
destination: account
diff --git a/docker-compose.yml b/docker-compose.yml
index e84ac5a..1a17348 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,10 +10,13 @@ redis:
container_name: redis
image: redis:latest
net: host
-rabbit:
- container_name: rabbit
- image: rabbitmq:3-management
+kafka:
+ container_name: kafka
+ image: spotify/kafka:latest
net: host
+ ports:
+ - 2181:2181
+ - 9092:9092
account-web:
image: account-web
environment:
diff --git a/order/order-web/pom.xml b/order/order-web/pom.xml
index be2422d..eb8fa53 100644
--- a/order/order-web/pom.xml
+++ b/order/order-web/pom.xml
@@ -41,7 +41,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/order/order-web/src/main/java/demo/order/action/AddReservation.java b/order/order-web/src/main/java/demo/order/action/AddReservation.java
index 9162589..74576eb 100644
--- a/order/order-web/src/main/java/demo/order/action/AddReservation.java
+++ b/order/order-web/src/main/java/demo/order/action/AddReservation.java
@@ -12,49 +12,46 @@ import org.springframework.hateoas.Link;
import org.springframework.hateoas.TemplateVariable;
import org.springframework.hateoas.UriTemplate;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.BiFunction;
-
/**
* Connects an {@link Order} to an Account.
*
* @author Kenny Bastani
*/
@Service
+@Transactional
public class AddReservation extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public BiFunction getFunction() {
- return (order, reservationId) -> {
- Assert.isTrue(order
- .getStatus() == OrderStatus.RESERVATION_PENDING, "Order must be in a pending reservation state");
- Assert.isTrue(!order.getReservationIds().contains(reservationId), "Reservation already added to order");
+ public Order apply(Order order, Long reservationId) {
+ Assert.isTrue(order
+ .getStatus() == OrderStatus.RESERVATION_PENDING, "Order must be in a pending reservation state");
+ Assert.isTrue(!order.getReservationIds().contains(reservationId), "Reservation already added to order");
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- order.getReservationIds().add(reservationId);
+ order.getReservationIds().add(reservationId);
+ order = orderService.update(order);
+
+ Link reservationLink = new Link(new UriTemplate("http://warehouse-web/v1/reservations/{id}")
+ .with("id", TemplateVariable.VariableType.PATH_VARIABLE)
+ .expand(reservationId)
+ .toString()).withRel("reservation");
+
+ try {
+ // Trigger reservation added event
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_ADDED, order), reservationLink);
+ } catch (Exception ex) {
+ log.error("Could not add reservation to order", ex);
+ order.getReservationIds().remove(reservationId);
+ order.setStatus(OrderStatus.RESERVATION_FAILED);
order = orderService.update(order);
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order), reservationLink);
+ }
- Link reservationLink = new Link(new UriTemplate("http://warehouse-web/v1/reservations/{id}")
- .with("id", TemplateVariable.VariableType.PATH_VARIABLE)
- .expand(reservationId)
- .toString()).withRel("reservation");
-
- try {
- // Trigger reservation added event
- order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_ADDED, order), reservationLink);
- } catch (Exception ex) {
- log.error("Could not add reservation to order", ex);
- order.getReservationIds().remove(reservationId);
- order.setStatus(OrderStatus.RESERVATION_FAILED);
- orderService.update(order);
- order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order), reservationLink);
- throw ex;
- }
-
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/CompleteOrder.java b/order/order-web/src/main/java/demo/order/action/CompleteOrder.java
index a6c0447..d16474c 100644
--- a/order/order-web/src/main/java/demo/order/action/CompleteOrder.java
+++ b/order/order-web/src/main/java/demo/order/action/CompleteOrder.java
@@ -9,10 +9,10 @@ import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.Arrays;
-import java.util.function.Function;
/**
* Completes the {@link Order} and applies a final status
@@ -20,39 +20,37 @@ import java.util.function.Function;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class CompleteOrder extends Action {
private final Logger log = Logger.getLogger(CompleteOrder.class);
- public Function getFunction() {
- return (order) -> {
- Assert.isTrue(Arrays.asList(OrderStatus.PAYMENT_FAILED,
- OrderStatus.PAYMENT_SUCCEEDED,
- OrderStatus.RESERVATION_FAILED).contains(order.getStatus()), "Order must be in a terminal state");
+ public Order apply(Order order) {
+ Assert.isTrue(Arrays.asList(OrderStatus.PAYMENT_FAILED,
+ OrderStatus.PAYMENT_SUCCEEDED,
+ OrderStatus.RESERVATION_FAILED).contains(order.getStatus()), "Order must be in a terminal state");
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- OrderStatus status = order.getStatus();
+ OrderStatus status = order.getStatus();
- try {
- if (order.getStatus() == OrderStatus.PAYMENT_SUCCEEDED) {
- order.setStatus(OrderStatus.ORDER_SUCCEEDED);
- order = orderService.update(order);
- order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_SUCCEEDED, order));
- } else {
- order.setStatus(OrderStatus.ORDER_FAILED);
- order = orderService.update(order);
- order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_FAILED, order));
- }
- } catch (RuntimeException ex) {
- log.error("Error completing the order", ex);
- // Rollback status change
- order.setStatus(status);
+ try {
+ if (order.getStatus() == OrderStatus.PAYMENT_SUCCEEDED) {
+ order.setStatus(OrderStatus.ORDER_SUCCEEDED);
order = orderService.update(order);
- throw ex;
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_SUCCEEDED, order));
+ } else {
+ order.setStatus(OrderStatus.ORDER_FAILED);
+ order = orderService.update(order);
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_FAILED, order));
}
+ } catch (RuntimeException ex) {
+ log.error("Error completing the order", ex);
+ // Rollback status change
+ order.setStatus(status);
+ order = orderService.update(order);
+ }
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/CompleteReservation.java b/order/order-web/src/main/java/demo/order/action/CompleteReservation.java
index 5b7f2c6..8630b4e 100644
--- a/order/order-web/src/main/java/demo/order/action/CompleteReservation.java
+++ b/order/order-web/src/main/java/demo/order/action/CompleteReservation.java
@@ -11,10 +11,10 @@ import demo.reservation.domain.Reservation;
import demo.reservation.domain.ReservationStatus;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.List;
-import java.util.function.Function;
import java.util.stream.Collectors;
import static demo.order.domain.OrderStatus.RESERVATION_FAILED;
@@ -26,56 +26,54 @@ import static demo.order.domain.OrderStatus.RESERVATION_SUCCEEDED;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class CompleteReservation extends Action {
private final Logger log = Logger.getLogger(CompleteReservation.class);
- public Function getFunction() {
- return (order) -> {
- if (order.getStatus() != RESERVATION_SUCCEEDED && order.getStatus() != RESERVATION_FAILED) {
- Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_PENDING,
- "The order must be in a reservation pending state");
- } else {
- // Reservation has already completed
- return order;
- }
-
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
-
- OrderStatus status = order.getStatus();
-
- try {
- List reservations = order.getReservations().getContent().stream()
- .collect(Collectors.toList());
-
- // Check if all inventory has been reserved
- Boolean orderReserved = reservations.stream()
- .allMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED);
-
- // Check if any inventory reservations have failed
- Boolean reservationFailed = reservations.stream()
- .anyMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_FAILED);
-
- if (orderReserved && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
- // Succeed the reservation and commit all inventory associated with order
- order.setStatus(RESERVATION_SUCCEEDED);
- order = orderService.update(order);
- order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_SUCCEEDED, order));
- } else if (reservationFailed && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
- // Fail the reservation and release all inventory associated with order
- order.setStatus(RESERVATION_FAILED);
- order = orderService.update(order);
- order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order));
- }
- } catch (RuntimeException ex) {
- log.error("Error completing reservation", ex);
- // Rollback status change
- order.setStatus(status);
- order = orderService.update(order);
- throw ex;
- }
-
+ public Order apply(Order order) {
+ if (order.getStatus() != RESERVATION_SUCCEEDED && order.getStatus() != RESERVATION_FAILED) {
+ Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_PENDING,
+ "The order must be in a reservation pending state");
+ } else {
+ // Reservation has already completed
return order;
- };
+ }
+
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+
+ OrderStatus status = order.getStatus();
+
+ try {
+ List reservations = order.getReservations().getContent().stream()
+ .collect(Collectors.toList());
+
+ // Check if all inventory has been reserved
+ Boolean orderReserved = reservations.stream()
+ .allMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED);
+
+ // Check if any inventory reservations have failed
+ Boolean reservationFailed = reservations.stream()
+ .anyMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_FAILED);
+
+ if (orderReserved && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
+ // Succeed the reservation and commit all inventory associated with order
+ order.setStatus(RESERVATION_SUCCEEDED);
+ order = orderService.update(order);
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_SUCCEEDED, order));
+ } else if (reservationFailed && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
+ // Fail the reservation and release all inventory associated with order
+ order.setStatus(RESERVATION_FAILED);
+ order = orderService.update(order);
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order));
+ }
+ } catch (RuntimeException ex) {
+ log.error("Error completing reservation", ex);
+ // Rollback status change
+ order.setStatus(status);
+ order = orderService.update(order);
+ }
+
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/ConnectAccount.java b/order/order-web/src/main/java/demo/order/action/ConnectAccount.java
index 29c3fab..b0f4897 100644
--- a/order/order-web/src/main/java/demo/order/action/ConnectAccount.java
+++ b/order/order-web/src/main/java/demo/order/action/ConnectAccount.java
@@ -9,43 +9,40 @@ import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.BiFunction;
-
/**
* Connects an {@link Order} to an Account.
*
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ConnectAccount extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public BiFunction getFunction() {
- return (order, accountId) -> {
- Assert.isTrue(order.getStatus() == OrderStatus.ORDER_CREATED, "Order must be in a created state");
+ public Order apply(Order order, Long accountId) {
+ Assert.isTrue(order.getStatus() == OrderStatus.ORDER_CREATED, "Order must be in a created state");
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- // Connect the account
- order.setAccountId(accountId);
- order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
+ // Connect the account
+ order.setAccountId(accountId);
+ order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
+ order = orderService.update(order);
+
+ try {
+ // Trigger the account connected event
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.ACCOUNT_CONNECTED, order));
+ } catch (Exception ex) {
+ log.error("Could not connect order to account", ex);
+ order.setAccountId(null);
+ order.setStatus(OrderStatus.ORDER_CREATED);
order = orderService.update(order);
+ }
- try {
- // Trigger the account connected event
- order.sendAsyncEvent(new OrderEvent(OrderEventType.ACCOUNT_CONNECTED, order));
- } catch (Exception ex) {
- log.error("Could not connect order to account", ex);
- order.setAccountId(null);
- order.setStatus(OrderStatus.ORDER_CREATED);
- orderService.update(order);
- throw ex;
- }
-
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/ConnectPayment.java b/order/order-web/src/main/java/demo/order/action/ConnectPayment.java
index 90db472..98f44bf 100644
--- a/order/order-web/src/main/java/demo/order/action/ConnectPayment.java
+++ b/order/order-web/src/main/java/demo/order/action/ConnectPayment.java
@@ -10,43 +10,40 @@ import demo.order.event.OrderEventType;
import demo.payment.domain.Payment;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.BiFunction;
-
/**
* Connects a {@link Payment} to an {@link Order}.
*
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ConnectPayment extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public BiFunction getFunction() {
- return (order, paymentId) -> {
- Assert.isTrue(order
- .getStatus() == OrderStatus.PAYMENT_CREATED, "Order must be in a payment created state");
+ public Order apply(Order order, Long paymentId) {
+ Assert.isTrue(order
+ .getStatus() == OrderStatus.PAYMENT_CREATED, "Order must be in a payment created state");
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- // Connect the payment
- order.setPaymentId(paymentId);
- order.setStatus(OrderStatus.PAYMENT_CONNECTED);
+ // Connect the payment
+ order.setPaymentId(paymentId);
+ order.setStatus(OrderStatus.PAYMENT_CONNECTED);
+ order = orderService.update(order);
+
+ try {
+ // Trigger the payment connected event
+ order.sendAsyncEvent(new OrderEvent(OrderEventType.PAYMENT_CONNECTED, order));
+ } catch (Exception ex) {
+ log.error("Could not connect payment to order", ex);
+ order.setPaymentId(null);
+ order.setStatus(OrderStatus.ORDER_CREATED);
order = orderService.update(order);
+ }
- try {
- // Trigger the payment connected event
- order.sendAsyncEvent(new OrderEvent(OrderEventType.PAYMENT_CONNECTED, order));
- } catch (Exception ex) {
- log.error("Could not connect payment to order", ex);
- order.setPaymentId(null);
- order.setStatus(OrderStatus.ORDER_CREATED);
- orderService.update(order);
- throw ex;
- }
-
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/CreatePayment.java b/order/order-web/src/main/java/demo/order/action/CreatePayment.java
index 5b9ac64..8c27616 100644
--- a/order/order-web/src/main/java/demo/order/action/CreatePayment.java
+++ b/order/order-web/src/main/java/demo/order/action/CreatePayment.java
@@ -12,10 +12,10 @@ import demo.payment.domain.PaymentMethod;
import demo.payment.domain.PaymentService;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.Arrays;
-import java.util.function.Function;
/**
* Creates a {@link Payment} for an {@link Order}.
@@ -23,6 +23,7 @@ import java.util.function.Function;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class CreatePayment extends Action {
private final Logger log = Logger.getLogger(this.getClass());
@@ -32,53 +33,49 @@ public class CreatePayment extends Action {
this.paymentService = paymentService;
}
- public Function getFunction() {
- return order -> {
- Assert.isTrue(order.getPaymentId() == null, "Payment has already been created");
- Assert.isTrue(!Arrays.asList(OrderStatus.PAYMENT_CREATED,
- OrderStatus.PAYMENT_CONNECTED,
- OrderStatus.PAYMENT_SUCCEEDED,
- OrderStatus.PAYMENT_PENDING).contains(order.getStatus()), "Payment has already been created");
- Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_SUCCEEDED,
- "Inventory reservations for the order must be made first before creating a payment");
+ public Order apply(Order order) {
+ Assert.isTrue(order.getPaymentId() == null, "Payment has already been created");
+ Assert.isTrue(!Arrays.asList(OrderStatus.PAYMENT_CREATED,
+ OrderStatus.PAYMENT_CONNECTED,
+ OrderStatus.PAYMENT_SUCCEEDED,
+ OrderStatus.PAYMENT_PENDING).contains(order.getStatus()), "Payment has already been created");
+ Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_SUCCEEDED,
+ "Inventory reservations for the order must be made first before creating a payment");
- // Get entity services
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ // Get entity services
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- // Update the order status
- order.setStatus(OrderStatus.PAYMENT_PENDING);
+ // Update the order status
+ order.setStatus(OrderStatus.PAYMENT_PENDING);
+ order = orderService.update(order);
+
+ Payment payment = new Payment();
+ payment.setAmount(order.calculateTotal());
+ payment.setPaymentMethod(PaymentMethod.CREDIT_CARD);
+ payment = paymentService.create(payment);
+
+ // Update the order status
+ order.setStatus(OrderStatus.PAYMENT_CREATED);
+ order = orderService.update(order);
+
+ try {
+ OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_CREATED, order);
+ event.add(payment.getLink("self").withRel("payment"));
+
+ // Trigger payment created event
+ order.sendAsyncEvent(event);
+ } catch (Exception ex) {
+ log.error("The order's payment could not be created", ex);
+
+ // Rollback the payment creation
+ if (payment.getIdentity() != null)
+ paymentService.delete(payment.getIdentity());
+
+ order.setPaymentId(null);
+ order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = orderService.update(order);
+ }
- Payment payment = new Payment();
- payment.setAmount(order.calculateTotal());
- payment.setPaymentMethod(PaymentMethod.CREDIT_CARD);
- payment = paymentService.create(payment);
-
- // Update the order status
- order.setStatus(OrderStatus.PAYMENT_CREATED);
- order = orderService.update(order);
-
- try {
- OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_CREATED, order);
- event.add(payment.getLink("self").withRel("payment"));
-
- // Trigger payment created event
- order.sendAsyncEvent(event);
- } catch (Exception ex) {
- log.error("The order's payment could not be created", ex);
-
- // Rollback the payment creation
- if (payment.getIdentity() != null)
- paymentService.delete(payment.getIdentity());
-
- order.setPaymentId(null);
- order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
- orderService.update(order);
-
- throw ex;
- }
-
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/DeleteOrder.java b/order/order-web/src/main/java/demo/order/action/DeleteOrder.java
index 6629fc1..aba03cb 100644
--- a/order/order-web/src/main/java/demo/order/action/DeleteOrder.java
+++ b/order/order-web/src/main/java/demo/order/action/DeleteOrder.java
@@ -7,8 +7,7 @@ import demo.payment.domain.Payment;
import demo.payment.domain.PaymentService;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
-
-import java.util.function.Consumer;
+import org.springframework.transaction.annotation.Transactional;
/**
* Processes a {@link Payment} for an {@link Order}.
@@ -16,6 +15,7 @@ import java.util.function.Consumer;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class DeleteOrder extends Action {
private final Logger log = Logger.getLogger(this.getClass());
@@ -25,16 +25,14 @@ public class DeleteOrder extends Action {
this.paymentService = paymentService;
}
- public Consumer getConsumer() {
- return (order) -> {
- // Delete payment
- if (order.getPaymentId() != null)
- paymentService.delete(order.getPaymentId());
+ public void apply(Order order) {
+ // Delete payment
+ if (order.getPaymentId() != null)
+ paymentService.delete(order.getPaymentId());
- // Delete order
- order.getModule(OrderModule.class)
- .getDefaultService()
- .delete(order.getIdentity());
- };
+ // Delete order
+ order.getModule(OrderModule.class)
+ .getDefaultService()
+ .delete(order.getIdentity());
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/GetReservations.java b/order/order-web/src/main/java/demo/order/action/GetReservations.java
index 6714f91..4c09aba 100644
--- a/order/order-web/src/main/java/demo/order/action/GetReservations.java
+++ b/order/order-web/src/main/java/demo/order/action/GetReservations.java
@@ -5,8 +5,7 @@ import demo.order.domain.Order;
import demo.reservation.domain.ReservationModule;
import demo.reservation.domain.Reservations;
import org.springframework.stereotype.Service;
-
-import java.util.function.Function;
+import org.springframework.transaction.annotation.Transactional;
/**
* Query action to get {@link demo.order.domain.Order}s for an an {@link Order}
@@ -14,6 +13,7 @@ import java.util.function.Function;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class GetReservations extends Action {
private final ReservationModule reservationModule;
@@ -22,11 +22,9 @@ public class GetReservations extends Action {
this.reservationModule = reservationModule;
}
- public Function getFunction() {
- return (order) -> {
- // Get orders from the order service
- return reservationModule.getDefaultService()
- .findReservationsByOrderId(order.getIdentity());
- };
+ public Reservations apply(Order order) {
+ // Get orders from the order service
+ return reservationModule.getDefaultService()
+ .findReservationsByOrderId(order.getIdentity());
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/ProcessPayment.java b/order/order-web/src/main/java/demo/order/action/ProcessPayment.java
index 6922f86..b3fecb1 100644
--- a/order/order-web/src/main/java/demo/order/action/ProcessPayment.java
+++ b/order/order-web/src/main/java/demo/order/action/ProcessPayment.java
@@ -12,11 +12,11 @@ import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.net.URI;
import java.util.Arrays;
-import java.util.function.Function;
/**
* Processes a {@link Payment} for an {@link Order}.
@@ -24,51 +24,55 @@ import java.util.function.Function;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ProcessPayment extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public Function getFunction() {
- return order -> {
- Assert.isTrue(!Arrays
- .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED)
- .contains(order.getStatus()), "Payment has already been processed");
- Assert.isTrue(order.getStatus() == OrderStatus.PAYMENT_CONNECTED,
- "Order must be in a payment connected state");
+ public Order apply(Order order) {
+ Assert.isTrue(!Arrays
+ .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED)
+ .contains(order.getStatus()), "Payment has already been processed");
+ Assert.isTrue(order.getStatus() == OrderStatus.PAYMENT_CONNECTED,
+ "Order must be in a payment connected state");
- // Get entity services
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ // Get entity services
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- // Get the payment
- Payment payment = order.getPayment();
+ // Get the payment
+ Payment payment = order.getPayment();
- // Update the order status
- order.setStatus(OrderStatus.PAYMENT_PENDING);
- order = orderService.update(order);
+ // Update the order status
+ order.setStatus(OrderStatus.PAYMENT_PENDING);
+ order = orderService.update(order);
- try {
- // Create traverson for the new order
- Traverson traverson = new Traverson(URI.create(payment.getLink("self").getHref()), MediaTypes.HAL_JSON);
- payment = traverson.follow("commands", "processPayment").toObject(Payment.class);
- } catch (Exception ex) {
- log.error("The order's payment could not be processed", ex);
+ boolean paymentSuccess = false;
- OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_FAILED, order);
- event.add(payment.getLink("self").withRel("payment"));
+ try {
+ // Create traverson for the new order
+ Traverson traverson = new Traverson(URI.create(payment.getLink("self").getHref()), MediaTypes.HAL_JSON);
+ payment = traverson.follow("commands", "processPayment").toObject(Payment.class);
+ paymentSuccess = true;
+ } catch (Exception ex) {
+ log.error("The order's payment could not be processed", ex);
- // Trigger payment failed event
- order.sendAsyncEvent(event);
+ OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_FAILED, order);
+ event.add(payment.getLink("self").withRel("payment"));
- throw ex;
- } finally {
+ // Trigger payment failed event
+ order.sendAsyncEvent(event);
+
+ paymentSuccess = false;
+ } finally {
+ if(paymentSuccess) {
OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_SUCCEEDED, order);
event.add(payment.getLink("self").withRel("payment"));
// Trigger payment succeeded event
order.sendAsyncEvent(event);
}
+ }
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/ReserveInventory.java b/order/order-web/src/main/java/demo/order/action/ReserveInventory.java
index 9b65f46..b112279 100644
--- a/order/order-web/src/main/java/demo/order/action/ReserveInventory.java
+++ b/order/order-web/src/main/java/demo/order/action/ReserveInventory.java
@@ -12,10 +12,10 @@ import demo.warehouse.domain.WarehouseService;
import demo.warehouse.exception.WarehouseNotFoundException;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.Arrays;
-import java.util.function.Function;
/**
* Reserves inventory for an {@link Order}.
@@ -23,6 +23,7 @@ import java.util.function.Function;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ReserveInventory extends Action {
private final Logger log = Logger.getLogger(ReserveInventory.class);
@@ -32,63 +33,61 @@ public class ReserveInventory extends Action {
this.warehouseService = warehouseService;
}
- public Function getFunction() {
- return (order) -> {
- Assert.isTrue(!Arrays
- .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING,
- OrderStatus.PAYMENT_FAILED, OrderStatus.INVENTORY_RESERVED,
- OrderStatus.RESERVATION_SUCCEEDED, OrderStatus.RESERVATION_PENDING,
- OrderStatus.RESERVATION_FAILED)
- .contains(order.getStatus()), "Inventory has already been reserved");
- Assert.isTrue(order
- .getStatus() == OrderStatus.ACCOUNT_CONNECTED, "The order must be connected to an account");
+ public Order apply(Order order) {
+ Assert.isTrue(!Arrays
+ .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING,
+ OrderStatus.PAYMENT_FAILED, OrderStatus.INVENTORY_RESERVED,
+ OrderStatus.RESERVATION_SUCCEEDED, OrderStatus.RESERVATION_PENDING,
+ OrderStatus.RESERVATION_FAILED)
+ .contains(order.getStatus()), "Inventory has already been reserved");
+ Assert.isTrue(order
+ .getStatus() == OrderStatus.ACCOUNT_CONNECTED, "The order must be connected to an account");
- Warehouse warehouse;
+ Warehouse warehouse;
- OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
+ OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
- OrderStatus status = order.getStatus();
- order.setStatus(OrderStatus.RESERVATION_PENDING);
+ OrderStatus status = order.getStatus();
+ order.setStatus(OrderStatus.RESERVATION_PENDING);
+ order = orderService.update(order);
+
+ try {
+ warehouse = warehouseService.findWarehouseWithInventory(order);
+ } catch (WarehouseNotFoundException ex) {
+ log.error("The order contains items that are not available at any warehouse", ex);
+ throw ex;
+ } catch (RuntimeException ex) {
+ log.error("Error connecting to warehouse service", ex);
+ // Rollback status change
+ order.setStatus(status);
+ order = orderService.update(order);
+ throw ex;
+ }
+
+ try {
+ // Reserve inventory for the order from the returned warehouse
+ warehouse = warehouseService.reserveInventory(warehouse, order);
+ } catch (Exception ex) {
+ log.error("Could not reserve inventory for the order", ex);
+
+ order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = orderService.update(order);
- try {
- warehouse = warehouseService.findWarehouseWithInventory(order);
- } catch (WarehouseNotFoundException ex) {
- log.error("The order contains items that are not available at any warehouse", ex);
- throw ex;
- } catch (RuntimeException ex) {
- log.error("Error connecting to warehouse service", ex);
- // Rollback status change
- order.setStatus(status);
- order = orderService.update(order);
- throw ex;
- }
+ OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_FAILED, order);
+ event.add(warehouse.getLink("self").withRel("warehouse"));
- try {
- // Reserve inventory for the order from the returned warehouse
- warehouse = warehouseService.reserveInventory(warehouse, order);
- } catch (Exception ex) {
- log.error("Could not reserve inventory for the order", ex);
-
- order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
- order = orderService.update(order);
-
- OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_FAILED, order);
- event.add(warehouse.getLink("self").withRel("warehouse"));
-
- // Trigger reservation failed
- order.sendAsyncEvent(event);
-
- throw ex;
- } finally {
+ // Trigger reservation failed
+ order.sendAsyncEvent(event);
+ } finally {
+ if(order.getStatus() != OrderStatus.ACCOUNT_CONNECTED) {
OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_PENDING, order);
event.add(warehouse.getLink("self").withRel("warehouse"));
// Trigger reservation pending event
order.sendAsyncEvent(event);
}
+ }
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java b/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java
index e45dd8d..49d46b8 100644
--- a/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java
+++ b/order/order-web/src/main/java/demo/order/action/UpdateOrderStatus.java
@@ -6,8 +6,7 @@ import demo.order.domain.OrderService;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
-
-import java.util.function.BiFunction;
+import org.springframework.transaction.annotation.Transactional;
/**
* Updates the status of a {@link Order} entity.
@@ -15,6 +14,7 @@ import java.util.function.BiFunction;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class UpdateOrderStatus extends Action {
private final Logger log = Logger.getLogger(this.getClass());
@@ -24,24 +24,21 @@ public class UpdateOrderStatus extends Action {
this.orderService = orderService;
}
- public BiFunction getFunction() {
- return (order, orderStatus) -> {
+ public Order apply(Order order, OrderStatus orderStatus) {
- // Save rollback status
- OrderStatus rollbackStatus = order.getStatus();
+ // Save rollback status
+ OrderStatus rollbackStatus = order.getStatus();
- try {
- // Update status
- order.setStatus(orderStatus);
- order = orderService.update(order);
- } catch (Exception ex) {
- log.error("Could not update the status", ex);
- order.setStatus(rollbackStatus);
- order = orderService.update(order);
- throw ex;
- }
+ try {
+ // Update status
+ order.setStatus(orderStatus);
+ order = orderService.update(order);
+ } catch (Exception ex) {
+ log.error("Could not update the status", ex);
+ order.setStatus(rollbackStatus);
+ order = orderService.update(order);
+ }
- return order;
- };
+ return order;
}
}
diff --git a/order/order-web/src/main/java/demo/order/domain/Order.java b/order/order-web/src/main/java/demo/order/domain/Order.java
index 7939023..f2592b8 100644
--- a/order/order-web/src/main/java/demo/order/domain/Order.java
+++ b/order/order-web/src/main/java/demo/order/domain/Order.java
@@ -117,77 +117,66 @@ public class Order extends AbstractEntity {
@JsonIgnore
public Reservations getReservations() {
return getAction(GetReservations.class)
- .getFunction()
.apply(this);
}
@Command(method = "connectAccount", controller = OrderController.class)
public Order connectAccount(Long accountId) {
return getAction(ConnectAccount.class)
- .getFunction()
.apply(this, accountId);
}
@Command(method = "connectPayment", controller = OrderController.class)
public Order connectPayment(Long paymentId) {
return getAction(ConnectPayment.class)
- .getFunction()
.apply(this, paymentId);
}
@Command(method = "createPayment", controller = OrderController.class)
public Order createPayment() {
return getAction(CreatePayment.class)
- .getFunction()
.apply(this);
}
@Command(method = "processPayment", controller = OrderController.class)
public Order processPayment() {
return getAction(ProcessPayment.class)
- .getFunction()
.apply(this);
}
@Command(method = "reserveInventory", controller = OrderController.class)
public Order reserveInventory() {
return getAction(ReserveInventory.class)
- .getFunction()
.apply(this);
}
@Command(method = "addReservation", controller = OrderController.class)
public Order addReservation(Long reservationId) {
return getAction(AddReservation.class)
- .getFunction()
.apply(this, reservationId);
}
@Command(method = "completeReservation", controller = OrderController.class)
public Order completeReservation() {
return getAction(CompleteReservation.class)
- .getFunction()
.apply(this);
}
@Command(method = "completeOrder", controller = OrderController.class)
public Order completeOrder() {
return getAction(CompleteOrder.class)
- .getFunction()
.apply(this);
}
@Command(method = "updateOrderStatus", controller = OrderController.class)
public Order updateOrderStatus(OrderStatus orderStatus) {
return getAction(UpdateOrderStatus.class)
- .getFunction()
.apply(this, orderStatus);
}
public boolean delete() {
getAction(DeleteOrder.class)
- .getConsumer()
- .accept(this);
+ .apply(this);
return true;
}
diff --git a/order/order-web/src/main/resources/application.yml b/order/order-web/src/main/resources/application.yml
index 29527b2..4bf3de8 100644
--- a/order/order-web/src/main/resources/application.yml
+++ b/order/order-web/src/main/resources/application.yml
@@ -23,11 +23,12 @@ spring:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
output:
destination: order
diff --git a/order/order-worker/pom.xml b/order/order-worker/pom.xml
index 5a6b75b..ba5ecbf 100644
--- a/order/order-worker/pom.xml
+++ b/order/order-worker/pom.xml
@@ -37,7 +37,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/order/order-worker/src/main/java/demo/config/StateMachineConfig.java b/order/order-worker/src/main/java/demo/config/StateMachineConfig.java
index 26201ca..8532c83 100644
--- a/order/order-worker/src/main/java/demo/config/StateMachineConfig.java
+++ b/order/order-worker/src/main/java/demo/config/StateMachineConfig.java
@@ -375,26 +375,33 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED)
+ .filter(r -> r.getStatus() != ReservationStatus.RESERVATION_FAILED)
.parallel()
.forEach(r -> {
- Traverson res = new Traverson(
- URI.create(r.getLink("self").getHref()),
- MediaTypes.HAL_JSON
- );
+ try {
+ Traverson res = new Traverson(
+ URI.create(r.getLink("self").getHref()),
+ MediaTypes.HAL_JSON
+ );
- res.follow("self", "commands", "releaseInventory")
- .toObject(Reservation.class);
+ res.follow("self", "commands", "releaseInventory")
+ .toObject(Reservation.class);
+ } catch (Exception ex) {
+ log.error("Could not release inventory for reservation", ex);
+ }
});
- return traverson.follow("self", "commands", "completeOrder")
- .toEntity(Order.class)
- .getBody();
+ return order;
+
}));
}
diff --git a/order/order-worker/src/main/resources/application.yml b/order/order-worker/src/main/resources/application.yml
index d2dceae..3df889d 100644
--- a/order/order-worker/src/main/resources/application.yml
+++ b/order/order-worker/src/main/resources/application.yml
@@ -22,11 +22,12 @@ spring:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
input:
contentType: 'application/json'
diff --git a/payment/payment-web/pom.xml b/payment/payment-web/pom.xml
index 53227fc..627bf47 100644
--- a/payment/payment-web/pom.xml
+++ b/payment/payment-web/pom.xml
@@ -41,7 +41,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java b/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java
index 4b8f5c3..e660e77 100644
--- a/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java
+++ b/payment/payment-web/src/main/java/demo/payment/action/ConnectOrder.java
@@ -9,42 +9,38 @@ import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.BiFunction;
-
@Service
+@Transactional
public class ConnectOrder extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public BiFunction getFunction() {
- return (payment, orderId) -> {
- Assert.isTrue(payment
- .getStatus() == PaymentStatus.PAYMENT_CREATED, "Payment has already been connected to an order");
+ public Payment apply(Payment payment, Long orderId) {
+ Assert.isTrue(payment
+ .getStatus() == PaymentStatus.PAYMENT_CREATED, "Payment has already been connected to an order");
- PaymentService paymentService = payment.getModule(PaymentModule.class)
- .getDefaultService();
+ PaymentService paymentService = payment.getModule(PaymentModule.class)
+ .getDefaultService();
- // Connect the payment to the order
- payment.setOrderId(orderId);
- payment.setStatus(PaymentStatus.ORDER_CONNECTED);
+ // Connect the payment to the order
+ payment.setOrderId(orderId);
+ payment.setStatus(PaymentStatus.ORDER_CONNECTED);
+ payment = paymentService.update(payment);
+
+ try {
+ // Trigger the payment connected
+ payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment));
+ } catch (IllegalStateException ex) {
+ log.error("Payment could not be connected to order", ex);
+
+ // Rollback operation
+ payment.setStatus(PaymentStatus.PAYMENT_CREATED);
+ payment.setOrderId(null);
payment = paymentService.update(payment);
+ }
- try {
- // Trigger the payment connected
- payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment));
- } catch (IllegalStateException ex) {
- log.error("Payment could not be connected to order", ex);
-
- // Rollback operation
- payment.setStatus(PaymentStatus.PAYMENT_CREATED);
- payment.setOrderId(null);
- paymentService.update(payment);
-
- throw ex;
- }
-
- return payment;
- };
+ return payment;
}
}
diff --git a/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java b/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java
index efce0ae..32a6d64 100644
--- a/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java
+++ b/payment/payment-web/src/main/java/demo/payment/action/ProcessPayment.java
@@ -8,15 +8,16 @@ import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.Arrays;
-import java.util.function.Function;
import static demo.payment.domain.PaymentStatus.PAYMENT_FAILED;
import static demo.payment.domain.PaymentStatus.PAYMENT_SUCCEEDED;
@Service
+@Transactional
public class ProcessPayment extends Action {
private final Logger log = Logger.getLogger(this.getClass());
private final PaymentService paymentService;
@@ -25,32 +26,31 @@ public class ProcessPayment extends Action {
this.paymentService = paymentService;
}
- public Function getFunction() {
- return payment -> {
- // Validations
- Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED,
- PaymentStatus.PAYMENT_PENDING,
- PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed");
- Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED,
- "Payment must be connected to an order");
+ public Payment apply(Payment payment) {
+ // Validations
+ Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED,
+ PaymentStatus.PAYMENT_PENDING,
+ PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed");
+ Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED,
+ "Payment must be connected to an order");
- payment.setStatus(PaymentStatus.PAYMENT_PROCESSED);
- payment = paymentService.update(payment);
+ payment.setStatus(PaymentStatus.PAYMENT_PROCESSED);
+ payment = paymentService.update(payment);
- try {
- // Trigger the payment processed event
- payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment));
- } catch (Exception ex) {
- log.error("Payment could not be processed", ex);
- finalizePayment(payment, PAYMENT_FAILED);
- throw ex;
- } finally {
+ try {
+ // Trigger the payment processed event
+ payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment));
+ } catch (Exception ex) {
+ log.error("Payment could not be processed", ex);
+ payment = finalizePayment(payment, PAYMENT_FAILED);
+ } finally {
+ if(payment.getStatus() != PAYMENT_FAILED) {
// Handle the result asynchronously
- finalizePayment(payment, PAYMENT_SUCCEEDED);
+ payment = finalizePayment(payment, PAYMENT_SUCCEEDED);
}
+ }
- return payment;
- };
+ return payment;
}
private Payment finalizePayment(Payment payment, PaymentStatus paymentStatus) {
diff --git a/payment/payment-web/src/main/java/demo/payment/domain/Payment.java b/payment/payment-web/src/main/java/demo/payment/domain/Payment.java
index 8624219..f2b1d4b 100644
--- a/payment/payment-web/src/main/java/demo/payment/domain/Payment.java
+++ b/payment/payment-web/src/main/java/demo/payment/domain/Payment.java
@@ -91,14 +91,12 @@ public class Payment extends AbstractEntity {
@Command(method = "connectOrder", controller = PaymentController.class)
public Payment connectOrder(Long orderId) {
return getAction(ConnectOrder.class)
- .getFunction()
.apply(this, orderId);
}
@Command(method = "processPayment", controller = PaymentController.class)
public Payment processPayment() {
return getAction(ProcessPayment.class)
- .getFunction()
.apply(this);
}
diff --git a/payment/payment-web/src/main/resources/application.yml b/payment/payment-web/src/main/resources/application.yml
index 3d3d06d..bb3e739 100644
--- a/payment/payment-web/src/main/resources/application.yml
+++ b/payment/payment-web/src/main/resources/application.yml
@@ -23,11 +23,12 @@ spring:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
output:
contentType: 'application/json'
diff --git a/payment/payment-worker/pom.xml b/payment/payment-worker/pom.xml
index 42643d9..f6da94e 100644
--- a/payment/payment-worker/pom.xml
+++ b/payment/payment-worker/pom.xml
@@ -37,7 +37,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/payment/payment-worker/src/main/resources/application.yml b/payment/payment-worker/src/main/resources/application.yml
index 58ce116..9556c86 100644
--- a/payment/payment-worker/src/main/resources/application.yml
+++ b/payment/payment-worker/src/main/resources/application.yml
@@ -22,11 +22,12 @@ spring:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
input:
contentType: 'application/json'
diff --git a/warehouse/warehouse-web/pom.xml b/warehouse/warehouse-web/pom.xml
index f148e24..d228dc2 100644
--- a/warehouse/warehouse-web/pom.xml
+++ b/warehouse/warehouse-web/pom.xml
@@ -51,7 +51,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java b/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java
index 31887d8..a32b368 100644
--- a/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java
+++ b/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java
@@ -12,8 +12,6 @@ import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
-import java.util.function.BiFunction;
-
import static demo.inventory.domain.InventoryStatus.RESERVATION_CONNECTED;
/**
@@ -33,34 +31,31 @@ public class ReserveInventory extends Action {
this.inventoryService = inventoryService;
}
- public BiFunction getFunction() {
- return (inventory, reservationId) -> {
- Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED,
- "Inventory must be in a reservation connected state");
- Assert.isTrue(inventory.getReservation() == null,
- "There is already a reservation attached to the inventory");
+ public Inventory apply(Inventory inventory, Long reservationId) {
+ Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED,
+ "Inventory must be in a reservation connected state");
+ Assert.isTrue(inventory.getReservation() == null,
+ "There is already a reservation attached to the inventory");
- Reservation reservation = reservationService.get(reservationId);
- Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist");
+ Reservation reservation = reservationService.get(reservationId);
+ Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist");
- try {
- // Trigger the reservation connected event
- inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory));
- } catch (Exception ex) {
- log.error("Could not connect reservation to inventory", ex);
- inventory.setReservation(null);
- inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
- inventoryService.update(inventory);
- throw ex;
- } finally {
- if (inventory.getStatus() == RESERVATION_CONNECTED && inventory.getReservation() != null) {
- inventory.setStatus(InventoryStatus.INVENTORY_RESERVED);
- inventory = inventoryService.update(inventory);
- inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.INVENTORY_RESERVED, inventory));
- }
+ try {
+ // Trigger the reservation connected event
+ inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory));
+ } catch (Exception ex) {
+ log.error("Could not connect reservation to inventory", ex);
+ inventory.setReservation(null);
+ inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
+ inventory = inventoryService.update(inventory);
+ } finally {
+ if (inventory.getStatus() == RESERVATION_CONNECTED && inventory.getReservation() != null) {
+ inventory.setStatus(InventoryStatus.INVENTORY_RESERVED);
+ inventory = inventoryService.update(inventory);
+ inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.INVENTORY_RESERVED, inventory));
}
+ }
- return inventory;
- };
+ return inventory;
}
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java b/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java
index 57fe360..1c41e08 100644
--- a/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java
+++ b/warehouse/warehouse-web/src/main/java/demo/inventory/action/UpdateInventoryStatus.java
@@ -7,8 +7,7 @@ import demo.inventory.domain.InventoryStatus;
import demo.reservation.domain.ReservationService;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
-
-import java.util.function.BiFunction;
+import org.springframework.transaction.annotation.Transactional;
/**
* Updates the status of a {@link Inventory} entity.
@@ -16,6 +15,7 @@ import java.util.function.BiFunction;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class UpdateInventoryStatus extends Action {
private final Logger log = Logger.getLogger(this.getClass());
@@ -27,24 +27,20 @@ public class UpdateInventoryStatus extends Action {
this.inventoryService = inventoryService;
}
- public BiFunction getFunction() {
- return (inventory, inventoryStatus) -> {
+ public Inventory apply(Inventory inventory, InventoryStatus inventoryStatus) {
+ // Save rollback status
+ InventoryStatus rollbackStatus = inventory.getStatus();
- // Save rollback status
- InventoryStatus rollbackStatus = inventory.getStatus();
+ try {
+ // Update status
+ inventory.setStatus(inventoryStatus);
+ inventory = inventoryService.update(inventory);
+ } catch (Exception ex) {
+ log.error("Could not update the status", ex);
+ inventory.setStatus(rollbackStatus);
+ inventory = inventoryService.update(inventory);
+ }
- try {
- // Update status
- inventory.setStatus(inventoryStatus);
- inventory = inventoryService.update(inventory);
- } catch (Exception ex) {
- log.error("Could not update the status", ex);
- inventory.setStatus(rollbackStatus);
- inventory = inventoryService.update(inventory);
- throw ex;
- }
-
- return inventory;
- };
+ return inventory;
}
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java
index 55001a6..fc4d2a2 100644
--- a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java
+++ b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java
@@ -87,14 +87,12 @@ public class Inventory extends AbstractEntity {
@Command(method = "reserve", controller = InventoryController.class)
public Inventory reserve(Long reservationId) {
return getAction(ReserveInventory.class)
- .getFunction()
.apply(this, reservationId);
}
@Command(method = "updateInventoryStatus", controller = InventoryController.class)
public Inventory updateStatus(InventoryStatus status) {
return getAction(UpdateInventoryStatus.class)
- .getFunction()
.apply(this, status);
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java
index c98342c..7193052 100644
--- a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java
+++ b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java
@@ -105,7 +105,7 @@ public class InventoryService extends Service {
Boolean lock = false;
try {
- lock = inventoryLock.tryLock(30, 5000, TimeUnit.MILLISECONDS);
+ lock = inventoryLock.tryLock(1, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted while acquiring lock on inventory", e);
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java
index b69c111..1770e8f 100644
--- a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java
+++ b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectInventory.java
@@ -21,7 +21,6 @@ import org.springframework.util.Assert;
import java.util.List;
import java.util.Random;
-import java.util.function.Function;
import static demo.reservation.event.ReservationEventType.*;
@@ -41,67 +40,73 @@ public class ConnectInventory extends Action {
this.inventoryService = inventoryService;
}
- public Function getFunction() {
- return (reservation) -> {
- Assert.isTrue(reservation.getStatus() == ReservationStatus.ORDER_CONNECTED,
- "Reservation must be in an order connected state");
+ public Reservation apply(Reservation reservation) {
+ Assert.isTrue(reservation.getStatus() == ReservationStatus.ORDER_CONNECTED,
+ "Reservation must be in an order connected state");
- ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
+ ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
- // Set reservation to pending
- reservation.setStatus(ReservationStatus.RESERVATION_PENDING);
- reservation = reservationService.update(reservation);
+ // Set reservation to pending
+ reservation.setStatus(ReservationStatus.RESERVATION_PENDING);
+ reservation = reservationService.update(reservation);
- // Get available inventory and connect reservation in an atomic transaction
- Inventory inventory = inventoryService.findAvailableInventory(reservation);
+ // Get available inventory and connect reservation in an atomic transaction
+ Inventory inventory = inventoryService.findAvailableInventory(reservation);
- try {
- if (inventory == null) {
- // Inventory is out of stock, fail the reservation process
- reservation.setStatus(ReservationStatus.RESERVATION_FAILED);
- reservation = reservationService.update(reservation);
-
- // Trigger reservation failed event
- reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
-
- // Throw the out of stock exception
- throw new OutOfStockException("Inventory for reservation is unavailable in warehouse: "
- .concat(reservation.getId().toString()));
- }
-
- // Set inventory on reservation and mark successful
- reservation.setInventory(inventory);
- reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
+ try {
+ if (inventory == null) {
+ // Inventory is out of stock, fail the reservation process
+ reservation.setStatus(ReservationStatus.RESERVATION_FAILED);
reservation = reservationService.update(reservation);
- // Trigger the inventory connected event
- reservation.sendAsyncEvent(new ReservationEvent(INVENTORY_CONNECTED, reservation),
- reservation.getInventory().getId().withRel("inventory"));
- } catch (Exception ex) {
- log.error("Could not connect reservation to order", ex);
- if (reservation.getStatus() != ReservationStatus.RESERVATION_FAILED) {
- // Rollback the reservation attempt
- if (inventory != null) {
- inventory.setReservation(null);
- inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
- inventory = inventoryService.update(inventory);
- }
+ // Trigger reservation failed event
+ reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
- reservation.setInventory(null);
- reservation.setStatus(ReservationStatus.ORDER_CONNECTED);
- reservation = reservationService.update(reservation);
- }
- throw ex;
- } finally {
- if (reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) {
- Link inventoryLink = reservation.getInventory().getId().withRel("inventory");
- Link orderLink = getRemoteLink("order-web", "/v1/orders/{id}", reservation.getOrderId(), "order");
- reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_SUCCEEDED, reservation), inventoryLink, orderLink);
- }
+ // Throw the out of stock exception
+ throw new OutOfStockException("Inventory for reservation is unavailable in warehouse: "
+ .concat(reservation.getId().toString()));
}
- return reservation;
- };
+ inventory.setReservation(reservation);
+ inventory.setStatus(InventoryStatus.RESERVATION_CONNECTED);
+ inventory = inventoryService.update(inventory);
+
+ // Set inventory on reservation and mark successful
+ reservation.setInventory(inventory);
+ reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
+ reservation = reservationService.update(reservation);
+
+
+ // Trigger the inventory connected event
+ reservation.sendAsyncEvent(new ReservationEvent(INVENTORY_CONNECTED, reservation),
+ reservation.getInventory().getId().withRel("inventory"));
+ } catch (Exception ex) {
+ log.error("Could not connect reservation to order", ex);
+ if (reservation.getStatus() != ReservationStatus.RESERVATION_FAILED) {
+ // Rollback the reservation attempt
+ if (inventory != null) {
+ inventory.setReservation(null);
+ inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
+ inventoryService.update(inventory);
+ }
+
+ reservation.setInventory(null);
+ reservation.setStatus(ReservationStatus.ORDER_CONNECTED);
+ reservation = reservationService.update(reservation);
+ }
+
+ throw ex;
+ } finally {
+ if (reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) {
+ Link inventoryLink = reservation.getInventory().getId().withRel("inventory");
+ Link orderLink = getRemoteLink("order-web", "/v1/orders/{id}", reservation.getOrderId(), "order");
+ reservation
+ .sendAsyncEvent(new ReservationEvent(RESERVATION_SUCCEEDED, reservation), inventoryLink,
+ orderLink);
+ }
+ }
+
+ return reservation;
}
private Link getRemoteLink(String service, String relative, Object identifier, String rel) {
diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java
index 113c3a0..4a7fb51 100644
--- a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java
+++ b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ConnectOrder.java
@@ -9,42 +9,40 @@ import demo.reservation.event.ReservationEvent;
import demo.reservation.event.ReservationEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.BiFunction;
-
/**
* Connects an {@link Reservation} to an Order.
*
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ConnectOrder extends Action {
private final Logger log = Logger.getLogger(this.getClass());
- public BiFunction getFunction() {
- return (reservation, orderId) -> {
- Assert.isTrue(reservation.getStatus() == ReservationStatus.RESERVATION_CREATED, "Reservation must be in a created state");
+ public Reservation apply(Reservation reservation, Long orderId) {
+ Assert.isTrue(reservation
+ .getStatus() == ReservationStatus.RESERVATION_CREATED, "Reservation must be in a created state");
- ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
+ ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
- // Connect the order
- reservation.setOrderId(orderId);
- reservation.setStatus(ReservationStatus.ORDER_CONNECTED);
+ // Connect the order
+ reservation.setOrderId(orderId);
+ reservation.setStatus(ReservationStatus.ORDER_CONNECTED);
+ reservation = reservationService.update(reservation);
+
+ try {
+ // Trigger the order connected event
+ reservation.sendAsyncEvent(new ReservationEvent(ReservationEventType.ORDER_CONNECTED, reservation));
+ } catch (Exception ex) {
+ log.error("Could not connect reservation to order", ex);
+ reservation.setOrderId(null);
+ reservation.setStatus(ReservationStatus.RESERVATION_CREATED);
reservation = reservationService.update(reservation);
+ }
- try {
- // Trigger the order connected event
- reservation.sendAsyncEvent(new ReservationEvent(ReservationEventType.ORDER_CONNECTED, reservation));
- } catch (Exception ex) {
- log.error("Could not connect reservation to order", ex);
- reservation.setOrderId(null);
- reservation.setStatus(ReservationStatus.RESERVATION_CREATED);
- reservationService.update(reservation);
- throw ex;
- }
-
- return reservation;
- };
+ return reservation;
}
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java
index 4e453fe..0f1b0ca 100644
--- a/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java
+++ b/warehouse/warehouse-web/src/main/java/demo/reservation/action/ReleaseInventory.java
@@ -12,10 +12,9 @@ import demo.reservation.domain.ReservationStatus;
import demo.reservation.event.ReservationEvent;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
-import java.util.function.Function;
-
import static demo.inventory.event.InventoryEventType.INVENTORY_RELEASED;
import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED;
@@ -25,6 +24,7 @@ import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ReleaseInventory extends Action {
private final Logger log = Logger.getLogger(this.getClass());
private final InventoryService inventoryService;
@@ -33,34 +33,32 @@ public class ReleaseInventory extends Action {
this.inventoryService = inventoryService;
}
- public Function getFunction() {
- return (reservation) -> {
- Assert.isTrue(reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED,
- "Reservation must be in a succeeded state");
- Assert.notNull(reservation.getInventory(), "The reservation has no connected inventory");
+ public Reservation apply(Reservation reservation) {
+ Assert.isTrue(reservation.getStatus() != ReservationStatus.RESERVATION_FAILED,
+ "Reservation is already in a failed state");
- ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
+ ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
- Inventory inventory = reservation.getInventory();
+ Inventory inventory = reservation.getInventory();
- try {
- // Remove the inventory and set the reservation to failed
- reservation.setInventory(null);
- reservation.setStatus(ReservationStatus.RESERVATION_FAILED);
+ try {
+ // Remove the inventory and set the reservation to failed
+ reservation.setInventory(null);
+ reservation.setStatus(ReservationStatus.RESERVATION_FAILED);
+ reservation = reservationService.update(reservation);
+
+ // Trigger the reservation failed event
+ reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
+ } catch (Exception ex) {
+ log.error("Could not release the reservation's inventory", ex);
+ if (reservation.getStatus() == ReservationStatus.RESERVATION_FAILED) {
+ // Rollback the attempt
+ reservation.setInventory(inventory);
+ reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
reservation = reservationService.update(reservation);
-
- // Trigger the reservation failed event
- reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
- } catch (Exception ex) {
- log.error("Could not release the reservation's inventory", ex);
- if (reservation.getStatus() == ReservationStatus.RESERVATION_FAILED) {
- // Rollback the attempt
- reservation.setInventory(inventory);
- reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
- reservation = reservationService.update(reservation);
- }
- throw ex;
- } finally {
+ }
+ } finally {
+ if (inventory != null && reservation.getStatus() != ReservationStatus.RESERVATION_SUCCEEDED) {
// Release the inventory
inventory.setReservation(null);
inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
@@ -69,8 +67,8 @@ public class ReleaseInventory extends Action {
// Trigger the inventory released event
inventory.sendAsyncEvent(new InventoryEvent(INVENTORY_RELEASED, inventory));
}
+ }
- return reservation;
- };
+ return reservation;
}
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java b/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java
index 3fc2942..947a6d1 100644
--- a/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java
+++ b/warehouse/warehouse-web/src/main/java/demo/reservation/domain/Reservation.java
@@ -111,21 +111,18 @@ public class Reservation extends AbstractEntity {
@Command(method = "connectInventory", controller = ReservationController.class)
public Reservation connectInventory() {
return getAction(ConnectInventory.class)
- .getFunction()
.apply(this);
}
@Command(method = "releaseInventory", controller = ReservationController.class)
public Reservation releaseInventory() {
return getAction(ReleaseInventory.class)
- .getFunction()
.apply(this);
}
@Command(method = "connectOrder", controller = ReservationController.class)
public Reservation connectOrder(Long orderId) {
return getAction(ConnectOrder.class)
- .getFunction()
.apply(this, orderId);
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java b/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java
index 81aa57c..fe3c26c 100644
--- a/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java
+++ b/warehouse/warehouse-web/src/main/java/demo/warehouse/action/ReserveOrder.java
@@ -9,9 +9,9 @@ import demo.reservation.event.ReservationEventType;
import demo.warehouse.domain.Warehouse;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import java.util.List;
-import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -21,6 +21,7 @@ import java.util.stream.IntStream;
* @author Kenny Bastani
*/
@Service
+@Transactional
public class ReserveOrder extends Action {
private final Logger log = Logger.getLogger(ReserveOrder.class);
@@ -30,25 +31,22 @@ public class ReserveOrder extends Action {
this.reservationService = reservationService;
}
- public BiConsumer getConsumer() {
- return (warehouse, order) -> {
+ public void apply(Warehouse warehouse, Order order) {
+ // Create reservations for each order item
+ List reservations = order.getLineItems().stream()
+ .map(item -> IntStream.rangeClosed(1, item.getQuantity())
+ .mapToObj(a -> new Reservation(item.getProductId(), order.getIdentity(), warehouse)))
+ .flatMap(a -> a)
+ .collect(Collectors.toList());
- // Create reservations for each order item
- List reservations = order.getLineItems().stream()
- .map(item -> IntStream.rangeClosed(1, item.getQuantity())
- .mapToObj(a -> new Reservation(item.getProductId(), order.getIdentity(), warehouse)))
- .flatMap(a -> a)
- .collect(Collectors.toList());
+ // Save the reservations
+ reservations = reservationService.create(reservations);
- // Save the reservations
- reservations = reservationService.create(reservations);
-
- // Trigger reservation requests for each order item
- reservations.forEach(r -> {
- ReservationEvent event = new ReservationEvent(ReservationEventType.RESERVATION_REQUESTED, r);
- event.add(order.getLink("self").withRel("order"));
- r.sendAsyncEvent(event);
- });
- };
+ // Trigger reservation requests for each order item
+ reservations.forEach(r -> {
+ ReservationEvent event = new ReservationEvent(ReservationEventType.RESERVATION_REQUESTED, r);
+ event.add(order.getLink("self").withRel("order"));
+ r.sendAsyncEvent(event);
+ });
}
}
diff --git a/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java b/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java
index 235c2b1..7d7b1f2 100644
--- a/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java
+++ b/warehouse/warehouse-web/src/main/java/demo/warehouse/domain/Warehouse.java
@@ -77,8 +77,7 @@ public class Warehouse extends AbstractEntity {
@Command(method = "reserveOrder", controller = WarehouseController.class)
public Warehouse reserveOrder(Order order) {
getAction(ReserveOrder.class)
- .getConsumer()
- .accept(this, order);
+ .apply(this, order);
return this;
}
diff --git a/warehouse/warehouse-worker/pom.xml b/warehouse/warehouse-worker/pom.xml
index ff9e4ba..1725523 100644
--- a/warehouse/warehouse-worker/pom.xml
+++ b/warehouse/warehouse-worker/pom.xml
@@ -37,7 +37,7 @@
org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ spring-cloud-starter-stream-kafka
org.springframework.boot
diff --git a/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java b/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java
index a097bc2..cab26a7 100644
--- a/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java
+++ b/warehouse/warehouse-worker/src/main/java/demo/reservation/config/ReservationStateMachineConfig.java
@@ -26,6 +26,8 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import static demo.order.domain.OrderStatus.ORDER_FAILED;
+import static demo.order.domain.OrderStatus.RESERVATION_FAILED;
import static demo.order.domain.OrderStatus.RESERVATION_PENDING;
/**
@@ -243,12 +245,24 @@ public class ReservationStateMachineConfig extends EnumStateMachineConfigurerAda
MediaTypes.HAL_JSON
);
- traverson.follow("self", "order", "commands", "completeReservation")
- .toObject(Order.class);
+ Order order = traverson.follow("self", "order").toObject(Order.class);
+ Reservation reservation = null;
- return traverson.follow("self")
- .toEntity(Reservation.class)
- .getBody();
+ // Check order status and release inventory if it has failed
+ if (order.getStatus() == RESERVATION_FAILED || order.getStatus() == ORDER_FAILED) {
+ reservation = traverson.follow("self", "commands", "releaseInventory")
+ .toObject(Reservation.class);
+ } else if (order.getStatus() == RESERVATION_PENDING) {
+ traverson.follow("self", "order", "commands", "completeReservation")
+ .toObject(Order.class);
+ }
+
+ if (reservation == null)
+ reservation = traverson.follow("self")
+ .toEntity(Reservation.class)
+ .getBody();
+
+ return reservation;
}));
}
diff --git a/warehouse/warehouse-worker/src/main/resources/application.yml b/warehouse/warehouse-worker/src/main/resources/application.yml
index 023a4ac..7b786c2 100644
--- a/warehouse/warehouse-worker/src/main/resources/application.yml
+++ b/warehouse/warehouse-worker/src/main/resources/application.yml
@@ -33,11 +33,12 @@ spring:
---
spring:
profiles: docker
- rabbitmq:
- host: ${DOCKER_IP:192.168.99.100}
- port: 5672
cloud:
stream:
+ kafka:
+ binder:
+ brokers: ${DOCKER_IP:192.168.99.100}
+ zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings:
warehouse:
contentType: 'application/json'