Consistency testing

This commit is contained in:
Kenny Bastani
2017-01-16 00:01:00 -08:00
parent 86eb604d48
commit f260467220
52 changed files with 688 additions and 729 deletions

2
.gitignore vendored
View File

@@ -9,6 +9,8 @@ dump.rdb
*.war *.war
*.ear *.ear
.DS_Store
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid* hs_err_pid*

View File

@@ -2,6 +2,9 @@ language: java
jdk: jdk:
- oraclejdk8 - oraclejdk8
services: services:
- rabbitmq
- redis - redis
- docker
install: mvn clean install -DskipDockerBuild install: mvn clean install -DskipDockerBuild
before_install:
- docker pull spotify/kafka
- docker run -p 2181:2181 -p 9092:9092 spotify/kafka

View File

@@ -41,7 +41,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -9,10 +9,10 @@ import demo.account.event.AccountEventType;
import demo.domain.Action; import demo.domain.Action;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.*; import static demo.account.domain.AccountStatus.*;
@@ -22,38 +22,35 @@ import static demo.account.domain.AccountStatus.*;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ActivateAccount extends Action<Account> { public class ActivateAccount extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() { public Account apply(Account account) {
return (account) -> { Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active");
Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active"); Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED)
Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED) .contains(account.getStatus()), "The account cannot be activated");
.contains(account.getStatus()), "The account cannot be activated");
AccountService accountService = account.getModule(AccountModule.class) AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService(); .getDefaultService();
AccountStatus status = account.getStatus(); AccountStatus status = account.getStatus();
// Activate the account // Activate the account
account.setStatus(AccountStatus.ACCOUNT_ACTIVE); account.setStatus(AccountStatus.ACCOUNT_ACTIVE);
account = accountService.update(account);
try {
// Trigger the account activated event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account));
} catch (Exception ex) {
log.error("Account could not be activated", ex);
// Rollback the operation
account.setStatus(status);
account = accountService.update(account); account = accountService.update(account);
}
try { return account;
// Trigger the account activated event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account));
} catch (Exception ex) {
log.error("Account could not be activated", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return account;
};
} }
} }

View File

@@ -9,10 +9,9 @@ import demo.account.event.AccountEventType;
import demo.domain.Action; import demo.domain.Action;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED; import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED;
@@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ArchiveAccount extends Action<Account> { public class ArchiveAccount extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() { public Account apply(Account account) {
return (account) -> { Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived");
Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived"); Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived");
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived");
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
AccountStatus status = account.getStatus(); AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
// Archive the account AccountStatus status = account.getStatus();
account.setStatus(AccountStatus.ACCOUNT_ARCHIVED);
// Archive the account
account.setStatus(AccountStatus.ACCOUNT_ARCHIVED);
account = accountService.update(account);
try {
// Trigger the account archived event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account));
} catch (Exception ex) {
log.error("Account could not be archived", ex);
// Rollback the operation
account.setStatus(status);
account = accountService.update(account); account = accountService.update(account);
}
try { return account;
// Trigger the account archived event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account));
} catch (Exception ex) {
log.error("Account could not be archived", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return account;
};
} }
} }

View File

@@ -9,10 +9,9 @@ import demo.account.event.AccountEventType;
import demo.domain.Action; import demo.domain.Action;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.ACCOUNT_CONFIRMED; import static demo.account.domain.AccountStatus.ACCOUNT_CONFIRMED;
import static demo.account.domain.AccountStatus.ACCOUNT_PENDING; import static demo.account.domain.AccountStatus.ACCOUNT_PENDING;
@@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_PENDING;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ConfirmAccount extends Action<Account> { public class ConfirmAccount extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() { public Account apply(Account account) {
return (account) -> { Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed");
Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed"); Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed");
Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed");
AccountService accountService = account.getModule(AccountModule.class) AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService(); .getDefaultService();
AccountStatus status = account.getStatus(); AccountStatus status = account.getStatus();
// Activate the account // Activate the account
account.setStatus(AccountStatus.ACCOUNT_CONFIRMED); account.setStatus(AccountStatus.ACCOUNT_CONFIRMED);
account = accountService.update(account);
try {
// Trigger the account confirmed event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account));
} catch (Exception ex) {
log.error("Account could not be confirmed", ex);
// Rollback the operation
account.setStatus(status);
account = accountService.update(account); account = accountService.update(account);
}
try { return account;
// Trigger the account confirmed event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account));
} catch (Exception ex) {
log.error("Account could not be confirmed", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return account;
};
} }
} }

View File

@@ -5,8 +5,7 @@ import demo.domain.Action;
import demo.order.domain.OrderModule; import demo.order.domain.OrderModule;
import demo.order.domain.Orders; import demo.order.domain.Orders;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.function.Function;
/** /**
* Query action to get {@link demo.order.domain.Order}s for an an {@link Account} * Query action to get {@link demo.order.domain.Order}s for an an {@link Account}
@@ -14,6 +13,7 @@ import java.util.function.Function;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class GetOrders extends Action<Account> { public class GetOrders extends Action<Account> {
private OrderModule orderModule; private OrderModule orderModule;
@@ -22,11 +22,9 @@ public class GetOrders extends Action<Account> {
this.orderModule = orderModule; this.orderModule = orderModule;
} }
public Function<Account, Orders> getFunction() { public Orders apply(Account account) {
return (account) -> { // Get orders from the order service
// Get orders from the order service return orderModule.getDefaultService()
return orderModule.getDefaultService() .findOrdersByAccountId(account.getIdentity());
.findOrdersByAccountId(account.getIdentity());
};
} }
} }

View File

@@ -9,6 +9,7 @@ import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes; import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson; import org.springframework.hateoas.client.Traverson;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.client.RestClientResponseException; import org.springframework.web.client.RestClientResponseException;
@@ -16,7 +17,6 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.BiFunction;
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
@@ -26,33 +26,32 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class PostOrder extends Action<Account> { public class PostOrder extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Account, Order, Order> getFunction() { public Order apply(Account account, Order order) {
return (account, order) -> { Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order");
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order"); order = order.post();
order = order.post();
try { try {
// Create traverson for the new order // Create traverson for the new order
Traverson traverson = new Traverson(URI.create(order.getLink("self") Traverson traverson = new Traverson(URI.create(order.getLink("self")
.getHref()), MediaTypes.HAL_JSON); .getHref()), MediaTypes.HAL_JSON);
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
params.put("accountId", account.getIdentity()); params.put("accountId", account.getIdentity());
order = traverson.follow("commands", "connectAccount") order = traverson.follow("commands", "connectAccount")
.withTemplateParameters(params) .withTemplateParameters(params)
.toObject(Order.class); .toObject(Order.class);
} catch (RestClientResponseException ex) { } catch (RestClientResponseException ex) {
log.error("New order could not be posted for the account", ex); log.error("New order could not be posted for the account", ex);
throw new IllegalStateException(getHttpStatusMessage(ex)); throw new IllegalStateException(getHttpStatusMessage(ex));
} }
return order; return order;
};
} }
private String getHttpStatusMessage(RestClientResponseException ex) { private String getHttpStatusMessage(RestClientResponseException ex) {

View File

@@ -9,10 +9,9 @@ import demo.account.event.AccountEventType;
import demo.domain.Action; import demo.domain.Action;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE; import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED; import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED;
@@ -22,38 +21,35 @@ import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class SuspendAccount extends Action<Account> { public class SuspendAccount extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() { public Account apply(Account account) {
return (account) -> { Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended");
Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended"); Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");
AccountService accountService = account.getModule(AccountModule.class) AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService(); .getDefaultService();
AccountStatus status = account.getStatus(); AccountStatus status = account.getStatus();
// Suspend the account // Suspend the account
account.setStatus(AccountStatus.ACCOUNT_SUSPENDED); account.setStatus(AccountStatus.ACCOUNT_SUSPENDED);
account = accountService.update(account);
try {
// Trigger the account suspended event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account));
} catch (Exception ex) {
log.error("Account could not be suspended", ex);
// Rollback the operation
account.setStatus(status);
account = accountService.update(account); account = accountService.update(account);
}
try { return account;
// Trigger the account suspended event
account.sendAsyncEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account));
} catch (Exception ex) {
log.error("Account could not be suspended", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return account;
};
} }
} }

View File

@@ -88,42 +88,36 @@ public class Account extends AbstractEntity<AccountEvent, Long> {
@JsonIgnore @JsonIgnore
public Orders getOrders() { public Orders getOrders() {
return getAction(GetOrders.class) return getAction(GetOrders.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "activate", controller = AccountController.class) @Command(method = "activate", controller = AccountController.class)
public Account activate() { public Account activate() {
return getAction(ActivateAccount.class) return getAction(ActivateAccount.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "archive", controller = AccountController.class) @Command(method = "archive", controller = AccountController.class)
public Account archive() { public Account archive() {
return getAction(ArchiveAccount.class) return getAction(ArchiveAccount.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "confirm", controller = AccountController.class) @Command(method = "confirm", controller = AccountController.class)
public Account confirm() { public Account confirm() {
return getAction(ConfirmAccount.class) return getAction(ConfirmAccount.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "suspend", controller = AccountController.class) @Command(method = "suspend", controller = AccountController.class)
public Account suspend() { public Account suspend() {
return getAction(SuspendAccount.class) return getAction(SuspendAccount.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "postOrder", controller = AccountController.class) @Command(method = "postOrder", controller = AccountController.class)
public Order postOrder(Order order) { public Order postOrder(Order order) {
return getAction(PostOrder.class) return getAction(PostOrder.class)
.getFunction()
.apply(this, order); .apply(this, order);
} }

View File

@@ -23,11 +23,12 @@ spring:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
output: output:
destination: account destination: account

View File

@@ -37,7 +37,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -28,11 +28,12 @@ eureka:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
input: input:
destination: account destination: account

View File

@@ -10,10 +10,13 @@ redis:
container_name: redis container_name: redis
image: redis:latest image: redis:latest
net: host net: host
rabbit: kafka:
container_name: rabbit container_name: kafka
image: rabbitmq:3-management image: spotify/kafka:latest
net: host net: host
ports:
- 2181:2181
- 9092:9092
account-web: account-web:
image: account-web image: account-web
environment: environment:

View File

@@ -41,7 +41,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -12,49 +12,46 @@ import org.springframework.hateoas.Link;
import org.springframework.hateoas.TemplateVariable; import org.springframework.hateoas.TemplateVariable;
import org.springframework.hateoas.UriTemplate; import org.springframework.hateoas.UriTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.BiFunction;
/** /**
* Connects an {@link Order} to an Account. * Connects an {@link Order} to an Account.
* *
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class AddReservation extends Action<Order> { public class AddReservation extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Order, Long, Order> getFunction() { public Order apply(Order order, Long reservationId) {
return (order, reservationId) -> { Assert.isTrue(order
Assert.isTrue(order .getStatus() == OrderStatus.RESERVATION_PENDING, "Order must be in a pending reservation state");
.getStatus() == OrderStatus.RESERVATION_PENDING, "Order must be in a pending reservation state"); Assert.isTrue(!order.getReservationIds().contains(reservationId), "Reservation already added to order");
Assert.isTrue(!order.getReservationIds().contains(reservationId), "Reservation already added to order");
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
order.getReservationIds().add(reservationId); order.getReservationIds().add(reservationId);
order = orderService.update(order);
Link reservationLink = new Link(new UriTemplate("http://warehouse-web/v1/reservations/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(reservationId)
.toString()).withRel("reservation");
try {
// Trigger reservation added event
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_ADDED, order), reservationLink);
} catch (Exception ex) {
log.error("Could not add reservation to order", ex);
order.getReservationIds().remove(reservationId);
order.setStatus(OrderStatus.RESERVATION_FAILED);
order = orderService.update(order); order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order), reservationLink);
}
Link reservationLink = new Link(new UriTemplate("http://warehouse-web/v1/reservations/{id}") return order;
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(reservationId)
.toString()).withRel("reservation");
try {
// Trigger reservation added event
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_ADDED, order), reservationLink);
} catch (Exception ex) {
log.error("Could not add reservation to order", ex);
order.getReservationIds().remove(reservationId);
order.setStatus(OrderStatus.RESERVATION_FAILED);
orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order), reservationLink);
throw ex;
}
return order;
};
} }
} }

View File

@@ -9,10 +9,10 @@ import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType; import demo.order.event.OrderEventType;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;
/** /**
* Completes the {@link Order} and applies a final status * Completes the {@link Order} and applies a final status
@@ -20,39 +20,37 @@ import java.util.function.Function;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class CompleteOrder extends Action<Order> { public class CompleteOrder extends Action<Order> {
private final Logger log = Logger.getLogger(CompleteOrder.class); private final Logger log = Logger.getLogger(CompleteOrder.class);
public Function<Order, Order> getFunction() { public Order apply(Order order) {
return (order) -> { Assert.isTrue(Arrays.asList(OrderStatus.PAYMENT_FAILED,
Assert.isTrue(Arrays.asList(OrderStatus.PAYMENT_FAILED, OrderStatus.PAYMENT_SUCCEEDED,
OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.RESERVATION_FAILED).contains(order.getStatus()), "Order must be in a terminal state");
OrderStatus.RESERVATION_FAILED).contains(order.getStatus()), "Order must be in a terminal state");
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
OrderStatus status = order.getStatus(); OrderStatus status = order.getStatus();
try { try {
if (order.getStatus() == OrderStatus.PAYMENT_SUCCEEDED) { if (order.getStatus() == OrderStatus.PAYMENT_SUCCEEDED) {
order.setStatus(OrderStatus.ORDER_SUCCEEDED); order.setStatus(OrderStatus.ORDER_SUCCEEDED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_SUCCEEDED, order));
} else {
order.setStatus(OrderStatus.ORDER_FAILED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_FAILED, order));
}
} catch (RuntimeException ex) {
log.error("Error completing the order", ex);
// Rollback status change
order.setStatus(status);
order = orderService.update(order); order = orderService.update(order);
throw ex; order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_SUCCEEDED, order));
} else {
order.setStatus(OrderStatus.ORDER_FAILED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.ORDER_FAILED, order));
} }
} catch (RuntimeException ex) {
log.error("Error completing the order", ex);
// Rollback status change
order.setStatus(status);
order = orderService.update(order);
}
return order; return order;
};
} }
} }

View File

@@ -11,10 +11,10 @@ import demo.reservation.domain.Reservation;
import demo.reservation.domain.ReservationStatus; import demo.reservation.domain.ReservationStatus;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.List; import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static demo.order.domain.OrderStatus.RESERVATION_FAILED; import static demo.order.domain.OrderStatus.RESERVATION_FAILED;
@@ -26,56 +26,54 @@ import static demo.order.domain.OrderStatus.RESERVATION_SUCCEEDED;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class CompleteReservation extends Action<Order> { public class CompleteReservation extends Action<Order> {
private final Logger log = Logger.getLogger(CompleteReservation.class); private final Logger log = Logger.getLogger(CompleteReservation.class);
public Function<Order, Order> getFunction() { public Order apply(Order order) {
return (order) -> { if (order.getStatus() != RESERVATION_SUCCEEDED && order.getStatus() != RESERVATION_FAILED) {
if (order.getStatus() != RESERVATION_SUCCEEDED && order.getStatus() != RESERVATION_FAILED) { Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_PENDING,
Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_PENDING, "The order must be in a reservation pending state");
"The order must be in a reservation pending state"); } else {
} else { // Reservation has already completed
// Reservation has already completed
return order;
}
OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
OrderStatus status = order.getStatus();
try {
List<Reservation> reservations = order.getReservations().getContent().stream()
.collect(Collectors.toList());
// Check if all inventory has been reserved
Boolean orderReserved = reservations.stream()
.allMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED);
// Check if any inventory reservations have failed
Boolean reservationFailed = reservations.stream()
.anyMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_FAILED);
if (orderReserved && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
// Succeed the reservation and commit all inventory associated with order
order.setStatus(RESERVATION_SUCCEEDED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_SUCCEEDED, order));
} else if (reservationFailed && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
// Fail the reservation and release all inventory associated with order
order.setStatus(RESERVATION_FAILED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order));
}
} catch (RuntimeException ex) {
log.error("Error completing reservation", ex);
// Rollback status change
order.setStatus(status);
order = orderService.update(order);
throw ex;
}
return order; return order;
}; }
OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
OrderStatus status = order.getStatus();
try {
List<Reservation> reservations = order.getReservations().getContent().stream()
.collect(Collectors.toList());
// Check if all inventory has been reserved
Boolean orderReserved = reservations.stream()
.allMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED);
// Check if any inventory reservations have failed
Boolean reservationFailed = reservations.stream()
.anyMatch(r -> r.getStatus() == ReservationStatus.RESERVATION_FAILED);
if (orderReserved && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
// Succeed the reservation and commit all inventory associated with order
order.setStatus(RESERVATION_SUCCEEDED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_SUCCEEDED, order));
} else if (reservationFailed && order.getStatus() == OrderStatus.RESERVATION_PENDING) {
// Fail the reservation and release all inventory associated with order
order.setStatus(RESERVATION_FAILED);
order = orderService.update(order);
order.sendAsyncEvent(new OrderEvent(OrderEventType.RESERVATION_FAILED, order));
}
} catch (RuntimeException ex) {
log.error("Error completing reservation", ex);
// Rollback status change
order.setStatus(status);
order = orderService.update(order);
}
return order;
} }
} }

View File

@@ -9,43 +9,40 @@ import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType; import demo.order.event.OrderEventType;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.BiFunction;
/** /**
* Connects an {@link Order} to an Account. * Connects an {@link Order} to an Account.
* *
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ConnectAccount extends Action<Order> { public class ConnectAccount extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Order, Long, Order> getFunction() { public Order apply(Order order, Long accountId) {
return (order, accountId) -> { Assert.isTrue(order.getStatus() == OrderStatus.ORDER_CREATED, "Order must be in a created state");
Assert.isTrue(order.getStatus() == OrderStatus.ORDER_CREATED, "Order must be in a created state");
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
// Connect the account // Connect the account
order.setAccountId(accountId); order.setAccountId(accountId);
order.setStatus(OrderStatus.ACCOUNT_CONNECTED); order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = orderService.update(order);
try {
// Trigger the account connected event
order.sendAsyncEvent(new OrderEvent(OrderEventType.ACCOUNT_CONNECTED, order));
} catch (Exception ex) {
log.error("Could not connect order to account", ex);
order.setAccountId(null);
order.setStatus(OrderStatus.ORDER_CREATED);
order = orderService.update(order); order = orderService.update(order);
}
try { return order;
// Trigger the account connected event
order.sendAsyncEvent(new OrderEvent(OrderEventType.ACCOUNT_CONNECTED, order));
} catch (Exception ex) {
log.error("Could not connect order to account", ex);
order.setAccountId(null);
order.setStatus(OrderStatus.ORDER_CREATED);
orderService.update(order);
throw ex;
}
return order;
};
} }
} }

View File

@@ -10,43 +10,40 @@ import demo.order.event.OrderEventType;
import demo.payment.domain.Payment; import demo.payment.domain.Payment;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.BiFunction;
/** /**
* Connects a {@link Payment} to an {@link Order}. * Connects a {@link Payment} to an {@link Order}.
* *
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ConnectPayment extends Action<Order> { public class ConnectPayment extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Order, Long, Order> getFunction() { public Order apply(Order order, Long paymentId) {
return (order, paymentId) -> { Assert.isTrue(order
Assert.isTrue(order .getStatus() == OrderStatus.PAYMENT_CREATED, "Order must be in a payment created state");
.getStatus() == OrderStatus.PAYMENT_CREATED, "Order must be in a payment created state");
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
// Connect the payment // Connect the payment
order.setPaymentId(paymentId); order.setPaymentId(paymentId);
order.setStatus(OrderStatus.PAYMENT_CONNECTED); order.setStatus(OrderStatus.PAYMENT_CONNECTED);
order = orderService.update(order);
try {
// Trigger the payment connected event
order.sendAsyncEvent(new OrderEvent(OrderEventType.PAYMENT_CONNECTED, order));
} catch (Exception ex) {
log.error("Could not connect payment to order", ex);
order.setPaymentId(null);
order.setStatus(OrderStatus.ORDER_CREATED);
order = orderService.update(order); order = orderService.update(order);
}
try { return order;
// Trigger the payment connected event
order.sendAsyncEvent(new OrderEvent(OrderEventType.PAYMENT_CONNECTED, order));
} catch (Exception ex) {
log.error("Could not connect payment to order", ex);
order.setPaymentId(null);
order.setStatus(OrderStatus.ORDER_CREATED);
orderService.update(order);
throw ex;
}
return order;
};
} }
} }

View File

@@ -12,10 +12,10 @@ import demo.payment.domain.PaymentMethod;
import demo.payment.domain.PaymentService; import demo.payment.domain.PaymentService;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;
/** /**
* Creates a {@link Payment} for an {@link Order}. * Creates a {@link Payment} for an {@link Order}.
@@ -23,6 +23,7 @@ import java.util.function.Function;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class CreatePayment extends Action<Order> { public class CreatePayment extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
@@ -32,53 +33,49 @@ public class CreatePayment extends Action<Order> {
this.paymentService = paymentService; this.paymentService = paymentService;
} }
public Function<Order, Order> getFunction() { public Order apply(Order order) {
return order -> { Assert.isTrue(order.getPaymentId() == null, "Payment has already been created");
Assert.isTrue(order.getPaymentId() == null, "Payment has already been created"); Assert.isTrue(!Arrays.asList(OrderStatus.PAYMENT_CREATED,
Assert.isTrue(!Arrays.asList(OrderStatus.PAYMENT_CREATED, OrderStatus.PAYMENT_CONNECTED,
OrderStatus.PAYMENT_CONNECTED, OrderStatus.PAYMENT_SUCCEEDED,
OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING).contains(order.getStatus()), "Payment has already been created");
OrderStatus.PAYMENT_PENDING).contains(order.getStatus()), "Payment has already been created"); Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_SUCCEEDED,
Assert.isTrue(order.getStatus() == OrderStatus.RESERVATION_SUCCEEDED, "Inventory reservations for the order must be made first before creating a payment");
"Inventory reservations for the order must be made first before creating a payment");
// Get entity services // Get entity services
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
// Update the order status // Update the order status
order.setStatus(OrderStatus.PAYMENT_PENDING); order.setStatus(OrderStatus.PAYMENT_PENDING);
order = orderService.update(order);
Payment payment = new Payment();
payment.setAmount(order.calculateTotal());
payment.setPaymentMethod(PaymentMethod.CREDIT_CARD);
payment = paymentService.create(payment);
// Update the order status
order.setStatus(OrderStatus.PAYMENT_CREATED);
order = orderService.update(order);
try {
OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_CREATED, order);
event.add(payment.getLink("self").withRel("payment"));
// Trigger payment created event
order.sendAsyncEvent(event);
} catch (Exception ex) {
log.error("The order's payment could not be created", ex);
// Rollback the payment creation
if (payment.getIdentity() != null)
paymentService.delete(payment.getIdentity());
order.setPaymentId(null);
order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = orderService.update(order); order = orderService.update(order);
}
Payment payment = new Payment(); return order;
payment.setAmount(order.calculateTotal());
payment.setPaymentMethod(PaymentMethod.CREDIT_CARD);
payment = paymentService.create(payment);
// Update the order status
order.setStatus(OrderStatus.PAYMENT_CREATED);
order = orderService.update(order);
try {
OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_CREATED, order);
event.add(payment.getLink("self").withRel("payment"));
// Trigger payment created event
order.sendAsyncEvent(event);
} catch (Exception ex) {
log.error("The order's payment could not be created", ex);
// Rollback the payment creation
if (payment.getIdentity() != null)
paymentService.delete(payment.getIdentity());
order.setPaymentId(null);
order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
orderService.update(order);
throw ex;
}
return order;
};
} }
} }

View File

@@ -7,8 +7,7 @@ import demo.payment.domain.Payment;
import demo.payment.domain.PaymentService; import demo.payment.domain.PaymentService;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.function.Consumer;
/** /**
* Processes a {@link Payment} for an {@link Order}. * Processes a {@link Payment} for an {@link Order}.
@@ -16,6 +15,7 @@ import java.util.function.Consumer;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class DeleteOrder extends Action<Order> { public class DeleteOrder extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
@@ -25,16 +25,14 @@ public class DeleteOrder extends Action<Order> {
this.paymentService = paymentService; this.paymentService = paymentService;
} }
public Consumer<Order> getConsumer() { public void apply(Order order) {
return (order) -> { // Delete payment
// Delete payment if (order.getPaymentId() != null)
if (order.getPaymentId() != null) paymentService.delete(order.getPaymentId());
paymentService.delete(order.getPaymentId());
// Delete order // Delete order
order.getModule(OrderModule.class) order.getModule(OrderModule.class)
.getDefaultService() .getDefaultService()
.delete(order.getIdentity()); .delete(order.getIdentity());
};
} }
} }

View File

@@ -5,8 +5,7 @@ import demo.order.domain.Order;
import demo.reservation.domain.ReservationModule; import demo.reservation.domain.ReservationModule;
import demo.reservation.domain.Reservations; import demo.reservation.domain.Reservations;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.function.Function;
/** /**
* Query action to get {@link demo.order.domain.Order}s for an an {@link Order} * Query action to get {@link demo.order.domain.Order}s for an an {@link Order}
@@ -14,6 +13,7 @@ import java.util.function.Function;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class GetReservations extends Action<Order> { public class GetReservations extends Action<Order> {
private final ReservationModule reservationModule; private final ReservationModule reservationModule;
@@ -22,11 +22,9 @@ public class GetReservations extends Action<Order> {
this.reservationModule = reservationModule; this.reservationModule = reservationModule;
} }
public Function<Order, Reservations> getFunction() { public Reservations apply(Order order) {
return (order) -> { // Get orders from the order service
// Get orders from the order service return reservationModule.getDefaultService()
return reservationModule.getDefaultService() .findReservationsByOrderId(order.getIdentity());
.findReservationsByOrderId(order.getIdentity());
};
} }
} }

View File

@@ -12,11 +12,11 @@ import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes; import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson; import org.springframework.hateoas.client.Traverson;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;
/** /**
* Processes a {@link Payment} for an {@link Order}. * Processes a {@link Payment} for an {@link Order}.
@@ -24,51 +24,55 @@ import java.util.function.Function;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ProcessPayment extends Action<Order> { public class ProcessPayment extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public Function<Order, Order> getFunction() { public Order apply(Order order) {
return order -> { Assert.isTrue(!Arrays
Assert.isTrue(!Arrays .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED)
.asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED) .contains(order.getStatus()), "Payment has already been processed");
.contains(order.getStatus()), "Payment has already been processed"); Assert.isTrue(order.getStatus() == OrderStatus.PAYMENT_CONNECTED,
Assert.isTrue(order.getStatus() == OrderStatus.PAYMENT_CONNECTED, "Order must be in a payment connected state");
"Order must be in a payment connected state");
// Get entity services // Get entity services
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
// Get the payment // Get the payment
Payment payment = order.getPayment(); Payment payment = order.getPayment();
// Update the order status // Update the order status
order.setStatus(OrderStatus.PAYMENT_PENDING); order.setStatus(OrderStatus.PAYMENT_PENDING);
order = orderService.update(order); order = orderService.update(order);
try { boolean paymentSuccess = false;
// Create traverson for the new order
Traverson traverson = new Traverson(URI.create(payment.getLink("self").getHref()), MediaTypes.HAL_JSON);
payment = traverson.follow("commands", "processPayment").toObject(Payment.class);
} catch (Exception ex) {
log.error("The order's payment could not be processed", ex);
OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_FAILED, order); try {
event.add(payment.getLink("self").withRel("payment")); // Create traverson for the new order
Traverson traverson = new Traverson(URI.create(payment.getLink("self").getHref()), MediaTypes.HAL_JSON);
payment = traverson.follow("commands", "processPayment").toObject(Payment.class);
paymentSuccess = true;
} catch (Exception ex) {
log.error("The order's payment could not be processed", ex);
// Trigger payment failed event OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_FAILED, order);
order.sendAsyncEvent(event); event.add(payment.getLink("self").withRel("payment"));
throw ex; // Trigger payment failed event
} finally { order.sendAsyncEvent(event);
paymentSuccess = false;
} finally {
if(paymentSuccess) {
OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_SUCCEEDED, order); OrderEvent event = new OrderEvent(OrderEventType.PAYMENT_SUCCEEDED, order);
event.add(payment.getLink("self").withRel("payment")); event.add(payment.getLink("self").withRel("payment"));
// Trigger payment succeeded event // Trigger payment succeeded event
order.sendAsyncEvent(event); order.sendAsyncEvent(event);
} }
}
return order; return order;
};
} }
} }

View File

@@ -12,10 +12,10 @@ import demo.warehouse.domain.WarehouseService;
import demo.warehouse.exception.WarehouseNotFoundException; import demo.warehouse.exception.WarehouseNotFoundException;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;
/** /**
* Reserves inventory for an {@link Order}. * Reserves inventory for an {@link Order}.
@@ -23,6 +23,7 @@ import java.util.function.Function;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ReserveInventory extends Action<Order> { public class ReserveInventory extends Action<Order> {
private final Logger log = Logger.getLogger(ReserveInventory.class); private final Logger log = Logger.getLogger(ReserveInventory.class);
@@ -32,63 +33,61 @@ public class ReserveInventory extends Action<Order> {
this.warehouseService = warehouseService; this.warehouseService = warehouseService;
} }
public Function<Order, Order> getFunction() { public Order apply(Order order) {
return (order) -> { Assert.isTrue(!Arrays
Assert.isTrue(!Arrays .asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING,
.asList(OrderStatus.PAYMENT_SUCCEEDED, OrderStatus.PAYMENT_PENDING, OrderStatus.PAYMENT_FAILED, OrderStatus.INVENTORY_RESERVED,
OrderStatus.PAYMENT_FAILED, OrderStatus.INVENTORY_RESERVED, OrderStatus.RESERVATION_SUCCEEDED, OrderStatus.RESERVATION_PENDING,
OrderStatus.RESERVATION_SUCCEEDED, OrderStatus.RESERVATION_PENDING, OrderStatus.RESERVATION_FAILED)
OrderStatus.RESERVATION_FAILED) .contains(order.getStatus()), "Inventory has already been reserved");
.contains(order.getStatus()), "Inventory has already been reserved"); Assert.isTrue(order
Assert.isTrue(order .getStatus() == OrderStatus.ACCOUNT_CONNECTED, "The order must be connected to an account");
.getStatus() == OrderStatus.ACCOUNT_CONNECTED, "The order must be connected to an account");
Warehouse warehouse; Warehouse warehouse;
OrderService orderService = order.getModule(OrderModule.class).getDefaultService(); OrderService orderService = order.getModule(OrderModule.class).getDefaultService();
OrderStatus status = order.getStatus(); OrderStatus status = order.getStatus();
order.setStatus(OrderStatus.RESERVATION_PENDING); order.setStatus(OrderStatus.RESERVATION_PENDING);
order = orderService.update(order);
try {
warehouse = warehouseService.findWarehouseWithInventory(order);
} catch (WarehouseNotFoundException ex) {
log.error("The order contains items that are not available at any warehouse", ex);
throw ex;
} catch (RuntimeException ex) {
log.error("Error connecting to warehouse service", ex);
// Rollback status change
order.setStatus(status);
order = orderService.update(order);
throw ex;
}
try {
// Reserve inventory for the order from the returned warehouse
warehouse = warehouseService.reserveInventory(warehouse, order);
} catch (Exception ex) {
log.error("Could not reserve inventory for the order", ex);
order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = orderService.update(order); order = orderService.update(order);
try { OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_FAILED, order);
warehouse = warehouseService.findWarehouseWithInventory(order); event.add(warehouse.getLink("self").withRel("warehouse"));
} catch (WarehouseNotFoundException ex) {
log.error("The order contains items that are not available at any warehouse", ex);
throw ex;
} catch (RuntimeException ex) {
log.error("Error connecting to warehouse service", ex);
// Rollback status change
order.setStatus(status);
order = orderService.update(order);
throw ex;
}
try { // Trigger reservation failed
// Reserve inventory for the order from the returned warehouse order.sendAsyncEvent(event);
warehouse = warehouseService.reserveInventory(warehouse, order); } finally {
} catch (Exception ex) { if(order.getStatus() != OrderStatus.ACCOUNT_CONNECTED) {
log.error("Could not reserve inventory for the order", ex);
order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = orderService.update(order);
OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_FAILED, order);
event.add(warehouse.getLink("self").withRel("warehouse"));
// Trigger reservation failed
order.sendAsyncEvent(event);
throw ex;
} finally {
OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_PENDING, order); OrderEvent event = new OrderEvent(OrderEventType.RESERVATION_PENDING, order);
event.add(warehouse.getLink("self").withRel("warehouse")); event.add(warehouse.getLink("self").withRel("warehouse"));
// Trigger reservation pending event // Trigger reservation pending event
order.sendAsyncEvent(event); order.sendAsyncEvent(event);
} }
}
return order; return order;
};
} }
} }

View File

@@ -6,8 +6,7 @@ import demo.order.domain.OrderService;
import demo.order.domain.OrderStatus; import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.function.BiFunction;
/** /**
* Updates the status of a {@link Order} entity. * Updates the status of a {@link Order} entity.
@@ -15,6 +14,7 @@ import java.util.function.BiFunction;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class UpdateOrderStatus extends Action<Order> { public class UpdateOrderStatus extends Action<Order> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
@@ -24,24 +24,21 @@ public class UpdateOrderStatus extends Action<Order> {
this.orderService = orderService; this.orderService = orderService;
} }
public BiFunction<Order, OrderStatus, Order> getFunction() { public Order apply(Order order, OrderStatus orderStatus) {
return (order, orderStatus) -> {
// Save rollback status // Save rollback status
OrderStatus rollbackStatus = order.getStatus(); OrderStatus rollbackStatus = order.getStatus();
try { try {
// Update status // Update status
order.setStatus(orderStatus); order.setStatus(orderStatus);
order = orderService.update(order); order = orderService.update(order);
} catch (Exception ex) { } catch (Exception ex) {
log.error("Could not update the status", ex); log.error("Could not update the status", ex);
order.setStatus(rollbackStatus); order.setStatus(rollbackStatus);
order = orderService.update(order); order = orderService.update(order);
throw ex; }
}
return order; return order;
};
} }
} }

View File

@@ -117,77 +117,66 @@ public class Order extends AbstractEntity<OrderEvent, Long> {
@JsonIgnore @JsonIgnore
public Reservations getReservations() { public Reservations getReservations() {
return getAction(GetReservations.class) return getAction(GetReservations.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "connectAccount", controller = OrderController.class) @Command(method = "connectAccount", controller = OrderController.class)
public Order connectAccount(Long accountId) { public Order connectAccount(Long accountId) {
return getAction(ConnectAccount.class) return getAction(ConnectAccount.class)
.getFunction()
.apply(this, accountId); .apply(this, accountId);
} }
@Command(method = "connectPayment", controller = OrderController.class) @Command(method = "connectPayment", controller = OrderController.class)
public Order connectPayment(Long paymentId) { public Order connectPayment(Long paymentId) {
return getAction(ConnectPayment.class) return getAction(ConnectPayment.class)
.getFunction()
.apply(this, paymentId); .apply(this, paymentId);
} }
@Command(method = "createPayment", controller = OrderController.class) @Command(method = "createPayment", controller = OrderController.class)
public Order createPayment() { public Order createPayment() {
return getAction(CreatePayment.class) return getAction(CreatePayment.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "processPayment", controller = OrderController.class) @Command(method = "processPayment", controller = OrderController.class)
public Order processPayment() { public Order processPayment() {
return getAction(ProcessPayment.class) return getAction(ProcessPayment.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "reserveInventory", controller = OrderController.class) @Command(method = "reserveInventory", controller = OrderController.class)
public Order reserveInventory() { public Order reserveInventory() {
return getAction(ReserveInventory.class) return getAction(ReserveInventory.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "addReservation", controller = OrderController.class) @Command(method = "addReservation", controller = OrderController.class)
public Order addReservation(Long reservationId) { public Order addReservation(Long reservationId) {
return getAction(AddReservation.class) return getAction(AddReservation.class)
.getFunction()
.apply(this, reservationId); .apply(this, reservationId);
} }
@Command(method = "completeReservation", controller = OrderController.class) @Command(method = "completeReservation", controller = OrderController.class)
public Order completeReservation() { public Order completeReservation() {
return getAction(CompleteReservation.class) return getAction(CompleteReservation.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "completeOrder", controller = OrderController.class) @Command(method = "completeOrder", controller = OrderController.class)
public Order completeOrder() { public Order completeOrder() {
return getAction(CompleteOrder.class) return getAction(CompleteOrder.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "updateOrderStatus", controller = OrderController.class) @Command(method = "updateOrderStatus", controller = OrderController.class)
public Order updateOrderStatus(OrderStatus orderStatus) { public Order updateOrderStatus(OrderStatus orderStatus) {
return getAction(UpdateOrderStatus.class) return getAction(UpdateOrderStatus.class)
.getFunction()
.apply(this, orderStatus); .apply(this, orderStatus);
} }
public boolean delete() { public boolean delete() {
getAction(DeleteOrder.class) getAction(DeleteOrder.class)
.getConsumer() .apply(this);
.accept(this);
return true; return true;
} }

View File

@@ -23,11 +23,12 @@ spring:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
output: output:
destination: order destination: order

View File

@@ -37,7 +37,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -375,26 +375,33 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
MediaTypes.HAL_JSON MediaTypes.HAL_JSON
); );
Order order = traverson.follow("self", "commands", "completeOrder")
.toEntity(Order.class)
.getBody();
// Release the reservations // Release the reservations
Reservations reservations = traverson.follow("self", "reservations") Reservations reservations = traverson.follow("self", "reservations")
.toObject(Reservations.class); .toObject(Reservations.class);
reservations.getContent().stream() reservations.getContent().stream()
.filter(r -> r.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) .filter(r -> r.getStatus() != ReservationStatus.RESERVATION_FAILED)
.parallel() .parallel()
.forEach(r -> { .forEach(r -> {
Traverson res = new Traverson( try {
URI.create(r.getLink("self").getHref()), Traverson res = new Traverson(
MediaTypes.HAL_JSON URI.create(r.getLink("self").getHref()),
); MediaTypes.HAL_JSON
);
res.follow("self", "commands", "releaseInventory") res.follow("self", "commands", "releaseInventory")
.toObject(Reservation.class); .toObject(Reservation.class);
} catch (Exception ex) {
log.error("Could not release inventory for reservation", ex);
}
}); });
return traverson.follow("self", "commands", "completeOrder") return order;
.toEntity(Order.class)
.getBody();
})); }));
} }

View File

@@ -22,11 +22,12 @@ spring:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
input: input:
contentType: 'application/json' contentType: 'application/json'

View File

@@ -41,7 +41,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -9,42 +9,38 @@ import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType; import demo.payment.event.PaymentEventType;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.BiFunction;
@Service @Service
@Transactional
public class ConnectOrder extends Action<Payment> { public class ConnectOrder extends Action<Payment> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Payment, Long, Payment> getFunction() { public Payment apply(Payment payment, Long orderId) {
return (payment, orderId) -> { Assert.isTrue(payment
Assert.isTrue(payment .getStatus() == PaymentStatus.PAYMENT_CREATED, "Payment has already been connected to an order");
.getStatus() == PaymentStatus.PAYMENT_CREATED, "Payment has already been connected to an order");
PaymentService paymentService = payment.getModule(PaymentModule.class) PaymentService paymentService = payment.getModule(PaymentModule.class)
.getDefaultService(); .getDefaultService();
// Connect the payment to the order // Connect the payment to the order
payment.setOrderId(orderId); payment.setOrderId(orderId);
payment.setStatus(PaymentStatus.ORDER_CONNECTED); payment.setStatus(PaymentStatus.ORDER_CONNECTED);
payment = paymentService.update(payment);
try {
// Trigger the payment connected
payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment));
} catch (IllegalStateException ex) {
log.error("Payment could not be connected to order", ex);
// Rollback operation
payment.setStatus(PaymentStatus.PAYMENT_CREATED);
payment.setOrderId(null);
payment = paymentService.update(payment); payment = paymentService.update(payment);
}
try { return payment;
// Trigger the payment connected
payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment));
} catch (IllegalStateException ex) {
log.error("Payment could not be connected to order", ex);
// Rollback operation
payment.setStatus(PaymentStatus.PAYMENT_CREATED);
payment.setOrderId(null);
paymentService.update(payment);
throw ex;
}
return payment;
};
} }
} }

View File

@@ -8,15 +8,16 @@ import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType; import demo.payment.event.PaymentEventType;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.Function;
import static demo.payment.domain.PaymentStatus.PAYMENT_FAILED; import static demo.payment.domain.PaymentStatus.PAYMENT_FAILED;
import static demo.payment.domain.PaymentStatus.PAYMENT_SUCCEEDED; import static demo.payment.domain.PaymentStatus.PAYMENT_SUCCEEDED;
@Service @Service
@Transactional
public class ProcessPayment extends Action<Payment> { public class ProcessPayment extends Action<Payment> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
private final PaymentService paymentService; private final PaymentService paymentService;
@@ -25,32 +26,31 @@ public class ProcessPayment extends Action<Payment> {
this.paymentService = paymentService; this.paymentService = paymentService;
} }
public Function<Payment, Payment> getFunction() { public Payment apply(Payment payment) {
return payment -> { // Validations
// Validations Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED,
Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED, PaymentStatus.PAYMENT_PENDING,
PaymentStatus.PAYMENT_PENDING, PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed");
PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed"); Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED,
Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED, "Payment must be connected to an order");
"Payment must be connected to an order");
payment.setStatus(PaymentStatus.PAYMENT_PROCESSED); payment.setStatus(PaymentStatus.PAYMENT_PROCESSED);
payment = paymentService.update(payment); payment = paymentService.update(payment);
try { try {
// Trigger the payment processed event // Trigger the payment processed event
payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment)); payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment));
} catch (Exception ex) { } catch (Exception ex) {
log.error("Payment could not be processed", ex); log.error("Payment could not be processed", ex);
finalizePayment(payment, PAYMENT_FAILED); payment = finalizePayment(payment, PAYMENT_FAILED);
throw ex; } finally {
} finally { if(payment.getStatus() != PAYMENT_FAILED) {
// Handle the result asynchronously // Handle the result asynchronously
finalizePayment(payment, PAYMENT_SUCCEEDED); payment = finalizePayment(payment, PAYMENT_SUCCEEDED);
} }
}
return payment; return payment;
};
} }
private Payment finalizePayment(Payment payment, PaymentStatus paymentStatus) { private Payment finalizePayment(Payment payment, PaymentStatus paymentStatus) {

View File

@@ -91,14 +91,12 @@ public class Payment extends AbstractEntity<PaymentEvent, Long> {
@Command(method = "connectOrder", controller = PaymentController.class) @Command(method = "connectOrder", controller = PaymentController.class)
public Payment connectOrder(Long orderId) { public Payment connectOrder(Long orderId) {
return getAction(ConnectOrder.class) return getAction(ConnectOrder.class)
.getFunction()
.apply(this, orderId); .apply(this, orderId);
} }
@Command(method = "processPayment", controller = PaymentController.class) @Command(method = "processPayment", controller = PaymentController.class)
public Payment processPayment() { public Payment processPayment() {
return getAction(ProcessPayment.class) return getAction(ProcessPayment.class)
.getFunction()
.apply(this); .apply(this);
} }

View File

@@ -23,11 +23,12 @@ spring:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
output: output:
contentType: 'application/json' contentType: 'application/json'

View File

@@ -37,7 +37,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -22,11 +22,12 @@ spring:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
input: input:
contentType: 'application/json' contentType: 'application/json'

View File

@@ -51,7 +51,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -12,8 +12,6 @@ import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.BiFunction;
import static demo.inventory.domain.InventoryStatus.RESERVATION_CONNECTED; import static demo.inventory.domain.InventoryStatus.RESERVATION_CONNECTED;
/** /**
@@ -33,34 +31,31 @@ public class ReserveInventory extends Action<Inventory> {
this.inventoryService = inventoryService; this.inventoryService = inventoryService;
} }
public BiFunction<Inventory, Long, Inventory> getFunction() { public Inventory apply(Inventory inventory, Long reservationId) {
return (inventory, reservationId) -> { Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED,
Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED, "Inventory must be in a reservation connected state");
"Inventory must be in a reservation connected state"); Assert.isTrue(inventory.getReservation() == null,
Assert.isTrue(inventory.getReservation() == null, "There is already a reservation attached to the inventory");
"There is already a reservation attached to the inventory");
Reservation reservation = reservationService.get(reservationId); Reservation reservation = reservationService.get(reservationId);
Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist"); Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist");
try { try {
// Trigger the reservation connected event // Trigger the reservation connected event
inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory)); inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory));
} catch (Exception ex) { } catch (Exception ex) {
log.error("Could not connect reservation to inventory", ex); log.error("Could not connect reservation to inventory", ex);
inventory.setReservation(null); inventory.setReservation(null);
inventory.setStatus(InventoryStatus.RESERVATION_PENDING); inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
inventoryService.update(inventory); inventory = inventoryService.update(inventory);
throw ex; } finally {
} finally { if (inventory.getStatus() == RESERVATION_CONNECTED && inventory.getReservation() != null) {
if (inventory.getStatus() == RESERVATION_CONNECTED && inventory.getReservation() != null) { inventory.setStatus(InventoryStatus.INVENTORY_RESERVED);
inventory.setStatus(InventoryStatus.INVENTORY_RESERVED); inventory = inventoryService.update(inventory);
inventory = inventoryService.update(inventory); inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.INVENTORY_RESERVED, inventory));
inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.INVENTORY_RESERVED, inventory));
}
} }
}
return inventory; return inventory;
};
} }
} }

View File

@@ -7,8 +7,7 @@ import demo.inventory.domain.InventoryStatus;
import demo.reservation.domain.ReservationService; import demo.reservation.domain.ReservationService;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.function.BiFunction;
/** /**
* Updates the status of a {@link Inventory} entity. * Updates the status of a {@link Inventory} entity.
@@ -16,6 +15,7 @@ import java.util.function.BiFunction;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class UpdateInventoryStatus extends Action<Inventory> { public class UpdateInventoryStatus extends Action<Inventory> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
@@ -27,24 +27,20 @@ public class UpdateInventoryStatus extends Action<Inventory> {
this.inventoryService = inventoryService; this.inventoryService = inventoryService;
} }
public BiFunction<Inventory, InventoryStatus, Inventory> getFunction() { public Inventory apply(Inventory inventory, InventoryStatus inventoryStatus) {
return (inventory, inventoryStatus) -> { // Save rollback status
InventoryStatus rollbackStatus = inventory.getStatus();
// Save rollback status try {
InventoryStatus rollbackStatus = inventory.getStatus(); // Update status
inventory.setStatus(inventoryStatus);
inventory = inventoryService.update(inventory);
} catch (Exception ex) {
log.error("Could not update the status", ex);
inventory.setStatus(rollbackStatus);
inventory = inventoryService.update(inventory);
}
try { return inventory;
// Update status
inventory.setStatus(inventoryStatus);
inventory = inventoryService.update(inventory);
} catch (Exception ex) {
log.error("Could not update the status", ex);
inventory.setStatus(rollbackStatus);
inventory = inventoryService.update(inventory);
throw ex;
}
return inventory;
};
} }
} }

View File

@@ -87,14 +87,12 @@ public class Inventory extends AbstractEntity<InventoryEvent, Long> {
@Command(method = "reserve", controller = InventoryController.class) @Command(method = "reserve", controller = InventoryController.class)
public Inventory reserve(Long reservationId) { public Inventory reserve(Long reservationId) {
return getAction(ReserveInventory.class) return getAction(ReserveInventory.class)
.getFunction()
.apply(this, reservationId); .apply(this, reservationId);
} }
@Command(method = "updateInventoryStatus", controller = InventoryController.class) @Command(method = "updateInventoryStatus", controller = InventoryController.class)
public Inventory updateStatus(InventoryStatus status) { public Inventory updateStatus(InventoryStatus status) {
return getAction(UpdateInventoryStatus.class) return getAction(UpdateInventoryStatus.class)
.getFunction()
.apply(this, status); .apply(this, status);
} }

View File

@@ -105,7 +105,7 @@ public class InventoryService extends Service<Inventory, Long> {
Boolean lock = false; Boolean lock = false;
try { try {
lock = inventoryLock.tryLock(30, 5000, TimeUnit.MILLISECONDS); lock = inventoryLock.tryLock(1, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted while acquiring lock on inventory", e); log.error("Interrupted while acquiring lock on inventory", e);
} }

View File

@@ -21,7 +21,6 @@ import org.springframework.util.Assert;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.function.Function;
import static demo.reservation.event.ReservationEventType.*; import static demo.reservation.event.ReservationEventType.*;
@@ -41,67 +40,73 @@ public class ConnectInventory extends Action<Reservation> {
this.inventoryService = inventoryService; this.inventoryService = inventoryService;
} }
public Function<Reservation, Reservation> getFunction() { public Reservation apply(Reservation reservation) {
return (reservation) -> { Assert.isTrue(reservation.getStatus() == ReservationStatus.ORDER_CONNECTED,
Assert.isTrue(reservation.getStatus() == ReservationStatus.ORDER_CONNECTED, "Reservation must be in an order connected state");
"Reservation must be in an order connected state");
ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
// Set reservation to pending // Set reservation to pending
reservation.setStatus(ReservationStatus.RESERVATION_PENDING); reservation.setStatus(ReservationStatus.RESERVATION_PENDING);
reservation = reservationService.update(reservation); reservation = reservationService.update(reservation);
// Get available inventory and connect reservation in an atomic transaction // Get available inventory and connect reservation in an atomic transaction
Inventory inventory = inventoryService.findAvailableInventory(reservation); Inventory inventory = inventoryService.findAvailableInventory(reservation);
try { try {
if (inventory == null) { if (inventory == null) {
// Inventory is out of stock, fail the reservation process // Inventory is out of stock, fail the reservation process
reservation.setStatus(ReservationStatus.RESERVATION_FAILED); reservation.setStatus(ReservationStatus.RESERVATION_FAILED);
reservation = reservationService.update(reservation);
// Trigger reservation failed event
reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
// Throw the out of stock exception
throw new OutOfStockException("Inventory for reservation is unavailable in warehouse: "
.concat(reservation.getId().toString()));
}
// Set inventory on reservation and mark successful
reservation.setInventory(inventory);
reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
reservation = reservationService.update(reservation); reservation = reservationService.update(reservation);
// Trigger the inventory connected event // Trigger reservation failed event
reservation.sendAsyncEvent(new ReservationEvent(INVENTORY_CONNECTED, reservation), reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
reservation.getInventory().getId().withRel("inventory"));
} catch (Exception ex) {
log.error("Could not connect reservation to order", ex);
if (reservation.getStatus() != ReservationStatus.RESERVATION_FAILED) {
// Rollback the reservation attempt
if (inventory != null) {
inventory.setReservation(null);
inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
inventory = inventoryService.update(inventory);
}
reservation.setInventory(null); // Throw the out of stock exception
reservation.setStatus(ReservationStatus.ORDER_CONNECTED); throw new OutOfStockException("Inventory for reservation is unavailable in warehouse: "
reservation = reservationService.update(reservation); .concat(reservation.getId().toString()));
}
throw ex;
} finally {
if (reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) {
Link inventoryLink = reservation.getInventory().getId().withRel("inventory");
Link orderLink = getRemoteLink("order-web", "/v1/orders/{id}", reservation.getOrderId(), "order");
reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_SUCCEEDED, reservation), inventoryLink, orderLink);
}
} }
return reservation; inventory.setReservation(reservation);
}; inventory.setStatus(InventoryStatus.RESERVATION_CONNECTED);
inventory = inventoryService.update(inventory);
// Set inventory on reservation and mark successful
reservation.setInventory(inventory);
reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
reservation = reservationService.update(reservation);
// Trigger the inventory connected event
reservation.sendAsyncEvent(new ReservationEvent(INVENTORY_CONNECTED, reservation),
reservation.getInventory().getId().withRel("inventory"));
} catch (Exception ex) {
log.error("Could not connect reservation to order", ex);
if (reservation.getStatus() != ReservationStatus.RESERVATION_FAILED) {
// Rollback the reservation attempt
if (inventory != null) {
inventory.setReservation(null);
inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
inventoryService.update(inventory);
}
reservation.setInventory(null);
reservation.setStatus(ReservationStatus.ORDER_CONNECTED);
reservation = reservationService.update(reservation);
}
throw ex;
} finally {
if (reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED) {
Link inventoryLink = reservation.getInventory().getId().withRel("inventory");
Link orderLink = getRemoteLink("order-web", "/v1/orders/{id}", reservation.getOrderId(), "order");
reservation
.sendAsyncEvent(new ReservationEvent(RESERVATION_SUCCEEDED, reservation), inventoryLink,
orderLink);
}
}
return reservation;
} }
private Link getRemoteLink(String service, String relative, Object identifier, String rel) { private Link getRemoteLink(String service, String relative, Object identifier, String rel) {

View File

@@ -9,42 +9,40 @@ import demo.reservation.event.ReservationEvent;
import demo.reservation.event.ReservationEventType; import demo.reservation.event.ReservationEventType;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.BiFunction;
/** /**
* Connects an {@link Reservation} to an Order. * Connects an {@link Reservation} to an Order.
* *
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ConnectOrder extends Action<Reservation> { public class ConnectOrder extends Action<Reservation> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Reservation, Long, Reservation> getFunction() { public Reservation apply(Reservation reservation, Long orderId) {
return (reservation, orderId) -> { Assert.isTrue(reservation
Assert.isTrue(reservation.getStatus() == ReservationStatus.RESERVATION_CREATED, "Reservation must be in a created state"); .getStatus() == ReservationStatus.RESERVATION_CREATED, "Reservation must be in a created state");
ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
// Connect the order // Connect the order
reservation.setOrderId(orderId); reservation.setOrderId(orderId);
reservation.setStatus(ReservationStatus.ORDER_CONNECTED); reservation.setStatus(ReservationStatus.ORDER_CONNECTED);
reservation = reservationService.update(reservation);
try {
// Trigger the order connected event
reservation.sendAsyncEvent(new ReservationEvent(ReservationEventType.ORDER_CONNECTED, reservation));
} catch (Exception ex) {
log.error("Could not connect reservation to order", ex);
reservation.setOrderId(null);
reservation.setStatus(ReservationStatus.RESERVATION_CREATED);
reservation = reservationService.update(reservation); reservation = reservationService.update(reservation);
}
try { return reservation;
// Trigger the order connected event
reservation.sendAsyncEvent(new ReservationEvent(ReservationEventType.ORDER_CONNECTED, reservation));
} catch (Exception ex) {
log.error("Could not connect reservation to order", ex);
reservation.setOrderId(null);
reservation.setStatus(ReservationStatus.RESERVATION_CREATED);
reservationService.update(reservation);
throw ex;
}
return reservation;
};
} }
} }

View File

@@ -12,10 +12,9 @@ import demo.reservation.domain.ReservationStatus;
import demo.reservation.event.ReservationEvent; import demo.reservation.event.ReservationEvent;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.function.Function;
import static demo.inventory.event.InventoryEventType.INVENTORY_RELEASED; import static demo.inventory.event.InventoryEventType.INVENTORY_RELEASED;
import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED; import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED;
@@ -25,6 +24,7 @@ import static demo.reservation.event.ReservationEventType.RESERVATION_FAILED;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ReleaseInventory extends Action<Reservation> { public class ReleaseInventory extends Action<Reservation> {
private final Logger log = Logger.getLogger(this.getClass()); private final Logger log = Logger.getLogger(this.getClass());
private final InventoryService inventoryService; private final InventoryService inventoryService;
@@ -33,34 +33,32 @@ public class ReleaseInventory extends Action<Reservation> {
this.inventoryService = inventoryService; this.inventoryService = inventoryService;
} }
public Function<Reservation, Reservation> getFunction() { public Reservation apply(Reservation reservation) {
return (reservation) -> { Assert.isTrue(reservation.getStatus() != ReservationStatus.RESERVATION_FAILED,
Assert.isTrue(reservation.getStatus() == ReservationStatus.RESERVATION_SUCCEEDED, "Reservation is already in a failed state");
"Reservation must be in a succeeded state");
Assert.notNull(reservation.getInventory(), "The reservation has no connected inventory");
ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService(); ReservationService reservationService = reservation.getModule(ReservationModule.class).getDefaultService();
Inventory inventory = reservation.getInventory(); Inventory inventory = reservation.getInventory();
try { try {
// Remove the inventory and set the reservation to failed // Remove the inventory and set the reservation to failed
reservation.setInventory(null); reservation.setInventory(null);
reservation.setStatus(ReservationStatus.RESERVATION_FAILED); reservation.setStatus(ReservationStatus.RESERVATION_FAILED);
reservation = reservationService.update(reservation);
// Trigger the reservation failed event
reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation));
} catch (Exception ex) {
log.error("Could not release the reservation's inventory", ex);
if (reservation.getStatus() == ReservationStatus.RESERVATION_FAILED) {
// Rollback the attempt
reservation.setInventory(inventory);
reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
reservation = reservationService.update(reservation); reservation = reservationService.update(reservation);
}
// Trigger the reservation failed event } finally {
reservation.sendAsyncEvent(new ReservationEvent(RESERVATION_FAILED, reservation)); if (inventory != null && reservation.getStatus() != ReservationStatus.RESERVATION_SUCCEEDED) {
} catch (Exception ex) {
log.error("Could not release the reservation's inventory", ex);
if (reservation.getStatus() == ReservationStatus.RESERVATION_FAILED) {
// Rollback the attempt
reservation.setInventory(inventory);
reservation.setStatus(ReservationStatus.RESERVATION_SUCCEEDED);
reservation = reservationService.update(reservation);
}
throw ex;
} finally {
// Release the inventory // Release the inventory
inventory.setReservation(null); inventory.setReservation(null);
inventory.setStatus(InventoryStatus.RESERVATION_PENDING); inventory.setStatus(InventoryStatus.RESERVATION_PENDING);
@@ -69,8 +67,8 @@ public class ReleaseInventory extends Action<Reservation> {
// Trigger the inventory released event // Trigger the inventory released event
inventory.sendAsyncEvent(new InventoryEvent(INVENTORY_RELEASED, inventory)); inventory.sendAsyncEvent(new InventoryEvent(INVENTORY_RELEASED, inventory));
} }
}
return reservation; return reservation;
};
} }
} }

View File

@@ -111,21 +111,18 @@ public class Reservation extends AbstractEntity<ReservationEvent, Long> {
@Command(method = "connectInventory", controller = ReservationController.class) @Command(method = "connectInventory", controller = ReservationController.class)
public Reservation connectInventory() { public Reservation connectInventory() {
return getAction(ConnectInventory.class) return getAction(ConnectInventory.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "releaseInventory", controller = ReservationController.class) @Command(method = "releaseInventory", controller = ReservationController.class)
public Reservation releaseInventory() { public Reservation releaseInventory() {
return getAction(ReleaseInventory.class) return getAction(ReleaseInventory.class)
.getFunction()
.apply(this); .apply(this);
} }
@Command(method = "connectOrder", controller = ReservationController.class) @Command(method = "connectOrder", controller = ReservationController.class)
public Reservation connectOrder(Long orderId) { public Reservation connectOrder(Long orderId) {
return getAction(ConnectOrder.class) return getAction(ConnectOrder.class)
.getFunction()
.apply(this, orderId); .apply(this, orderId);
} }

View File

@@ -9,9 +9,9 @@ import demo.reservation.event.ReservationEventType;
import demo.warehouse.domain.Warehouse; import demo.warehouse.domain.Warehouse;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@@ -21,6 +21,7 @@ import java.util.stream.IntStream;
* @author Kenny Bastani * @author Kenny Bastani
*/ */
@Service @Service
@Transactional
public class ReserveOrder extends Action<Warehouse> { public class ReserveOrder extends Action<Warehouse> {
private final Logger log = Logger.getLogger(ReserveOrder.class); private final Logger log = Logger.getLogger(ReserveOrder.class);
@@ -30,25 +31,22 @@ public class ReserveOrder extends Action<Warehouse> {
this.reservationService = reservationService; this.reservationService = reservationService;
} }
public BiConsumer<Warehouse, Order> getConsumer() { public void apply(Warehouse warehouse, Order order) {
return (warehouse, order) -> { // Create reservations for each order item
List<Reservation> reservations = order.getLineItems().stream()
.map(item -> IntStream.rangeClosed(1, item.getQuantity())
.mapToObj(a -> new Reservation(item.getProductId(), order.getIdentity(), warehouse)))
.flatMap(a -> a)
.collect(Collectors.toList());
// Create reservations for each order item // Save the reservations
List<Reservation> reservations = order.getLineItems().stream() reservations = reservationService.create(reservations);
.map(item -> IntStream.rangeClosed(1, item.getQuantity())
.mapToObj(a -> new Reservation(item.getProductId(), order.getIdentity(), warehouse)))
.flatMap(a -> a)
.collect(Collectors.toList());
// Save the reservations // Trigger reservation requests for each order item
reservations = reservationService.create(reservations); reservations.forEach(r -> {
ReservationEvent event = new ReservationEvent(ReservationEventType.RESERVATION_REQUESTED, r);
// Trigger reservation requests for each order item event.add(order.getLink("self").withRel("order"));
reservations.forEach(r -> { r.sendAsyncEvent(event);
ReservationEvent event = new ReservationEvent(ReservationEventType.RESERVATION_REQUESTED, r); });
event.add(order.getLink("self").withRel("order"));
r.sendAsyncEvent(event);
});
};
} }
} }

View File

@@ -77,8 +77,7 @@ public class Warehouse extends AbstractEntity<WarehouseEvent, Long> {
@Command(method = "reserveOrder", controller = WarehouseController.class) @Command(method = "reserveOrder", controller = WarehouseController.class)
public Warehouse reserveOrder(Order order) { public Warehouse reserveOrder(Order order) {
getAction(ReserveOrder.class) getAction(ReserveOrder.class)
.getConsumer() .apply(this, order);
.accept(this, order);
return this; return this;
} }

View File

@@ -37,7 +37,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -26,6 +26,8 @@ import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static demo.order.domain.OrderStatus.ORDER_FAILED;
import static demo.order.domain.OrderStatus.RESERVATION_FAILED;
import static demo.order.domain.OrderStatus.RESERVATION_PENDING; import static demo.order.domain.OrderStatus.RESERVATION_PENDING;
/** /**
@@ -243,12 +245,24 @@ public class ReservationStateMachineConfig extends EnumStateMachineConfigurerAda
MediaTypes.HAL_JSON MediaTypes.HAL_JSON
); );
traverson.follow("self", "order", "commands", "completeReservation") Order order = traverson.follow("self", "order").toObject(Order.class);
.toObject(Order.class); Reservation reservation = null;
return traverson.follow("self") // Check order status and release inventory if it has failed
.toEntity(Reservation.class) if (order.getStatus() == RESERVATION_FAILED || order.getStatus() == ORDER_FAILED) {
.getBody(); reservation = traverson.follow("self", "commands", "releaseInventory")
.toObject(Reservation.class);
} else if (order.getStatus() == RESERVATION_PENDING) {
traverson.follow("self", "order", "commands", "completeReservation")
.toObject(Order.class);
}
if (reservation == null)
reservation = traverson.follow("self")
.toEntity(Reservation.class)
.getBody();
return reservation;
})); }));
} }

View File

@@ -33,11 +33,12 @@ spring:
--- ---
spring: spring:
profiles: docker profiles: docker
rabbitmq:
host: ${DOCKER_IP:192.168.99.100}
port: 5672
cloud: cloud:
stream: stream:
kafka:
binder:
brokers: ${DOCKER_IP:192.168.99.100}
zk-nodes: ${DOCKER_IP:192.168.99.100}
bindings: bindings:
warehouse: warehouse:
contentType: 'application/json' contentType: 'application/json'