Fault tolerance

This commit is contained in:
Kenny Bastani
2017-01-03 23:28:24 -05:00
parent 7ea6969792
commit 6529de43d4
25 changed files with 426 additions and 151 deletions

View File

@@ -4,14 +4,15 @@ import demo.account.domain.Account;
import demo.account.domain.AccountModule;
import demo.account.domain.AccountService;
import demo.account.domain.AccountStatus;
import demo.domain.Action;
import demo.account.event.AccountEvent;
import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.*;
@@ -22,8 +23,9 @@ import static demo.account.domain.AccountStatus.*;
*/
@Service
public class ActivateAccount extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass());
public Consumer<Account> getConsumer() {
public Function<Account, Account> getFunction() {
return (account) -> {
Assert.isTrue(account.getStatus() != ACCOUNT_ACTIVE, "The account is already active");
Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED)
@@ -32,12 +34,29 @@ public class ActivateAccount extends Action<Account> {
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
Account result;
AccountStatus status = account.getStatus();
// Activate the account
account.setStatus(AccountStatus.ACCOUNT_ACTIVE);
account = accountService.update(account);
// Trigger the account activated event
account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account));
try {
// Trigger the account activated event
result = account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED, account)).getEntity();
result.setIdentity(account.getIdentity());
} catch (Exception ex) {
log.error("Account could not be activated", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return result;
};
}
}

View File

@@ -4,13 +4,14 @@ import demo.account.domain.Account;
import demo.account.domain.AccountModule;
import demo.account.domain.AccountService;
import demo.account.domain.AccountStatus;
import demo.domain.Action;
import demo.account.event.AccountEvent;
import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.function.Consumer;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED;
@@ -23,7 +24,9 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ARCHIVED;
@Service
public class ArchiveAccount extends Action<Account> {
public Consumer<Account> getConsumer() {
private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() {
return (account) -> {
Assert.isTrue(account.getStatus() != ACCOUNT_ARCHIVED, "The account is already archived");
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived");
@@ -31,12 +34,29 @@ public class ArchiveAccount extends Action<Account> {
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
Account result;
AccountStatus status = account.getStatus();
// Archive the account
account.setStatus(AccountStatus.ACCOUNT_ARCHIVED);
account = accountService.update(account);
// Trigger the account archived event
account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account));
try {
// Trigger the account archived event
result = account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED, account)).getEntity();
result.setIdentity(account.getIdentity());
} catch (Exception ex) {
log.error("Account could not be archived", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return result;
};
}
}

View File

@@ -4,13 +4,14 @@ import demo.account.domain.Account;
import demo.account.domain.AccountModule;
import demo.account.domain.AccountService;
import demo.account.domain.AccountStatus;
import demo.domain.Action;
import demo.account.event.AccountEvent;
import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.function.Consumer;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.ACCOUNT_CONFIRMED;
import static demo.account.domain.AccountStatus.ACCOUNT_PENDING;
@@ -23,7 +24,9 @@ import static demo.account.domain.AccountStatus.ACCOUNT_PENDING;
@Service
public class ConfirmAccount extends Action<Account> {
public Consumer<Account> getConsumer() {
private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() {
return (account) -> {
Assert.isTrue(account.getStatus() != ACCOUNT_CONFIRMED, "The account has already been confirmed");
Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed");
@@ -31,12 +34,29 @@ public class ConfirmAccount extends Action<Account> {
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
// Confirm the account
Account result;
AccountStatus status = account.getStatus();
// Activate the account
account.setStatus(AccountStatus.ACCOUNT_CONFIRMED);
account = accountService.update(account);
// Trigger the account confirmed
account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account));
try {
// Trigger the account confirmed event
result = account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED, account)).getEntity();
result.setIdentity(account.getIdentity());
} catch (Exception ex) {
log.error("Account could not be confirmed", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return result;
};
}
}

View File

@@ -1,14 +1,18 @@
package demo.account.action;
import com.fasterxml.jackson.databind.ObjectMapper;
import demo.account.domain.Account;
import demo.domain.Action;
import demo.order.domain.Order;
import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.web.client.RestClientResponseException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -24,23 +28,43 @@ import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
@Service
public class PostOrder extends Action<Account> {
private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Account, Order, Order> getFunction() {
return (account, order) -> {
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "Only active accounts can create an order");
order = order.post();
// Create traverson for the new order
Traverson traverson = new Traverson(URI.create(order.getLink("self")
.getHref()), MediaTypes.HAL_JSON);
try {
// Create traverson for the new order
Traverson traverson = new Traverson(URI.create(order.getLink("self")
.getHref()), MediaTypes.HAL_JSON);
Map<String, Object> params = new HashMap<>();
params.put("accountId", account.getIdentity());
Map<String, Object> params = new HashMap<>();
params.put("accountId", account.getIdentity());
order = traverson.follow("commands", "connectAccount")
.withTemplateParameters(params)
.toObject(Order.class);
order = traverson.follow("commands", "connectAccount")
.withTemplateParameters(params)
.toObject(Order.class);
} catch (RestClientResponseException ex) {
log.error("New order could not be posted for the account", ex);
throw new IllegalStateException(getHttpStatusMessage(ex));
}
return order;
};
}
private String getHttpStatusMessage(RestClientResponseException ex) {
Map<String, String> errorMap = new HashMap<>();
try {
errorMap = new ObjectMapper()
.readValue(ex.getResponseBodyAsString(), errorMap
.getClass());
} catch (IOException e) {
e.printStackTrace();
}
return errorMap.getOrDefault("message", null);
}
}

View File

@@ -4,13 +4,14 @@ import demo.account.domain.Account;
import demo.account.domain.AccountModule;
import demo.account.domain.AccountService;
import demo.account.domain.AccountStatus;
import demo.domain.Action;
import demo.account.event.AccountEvent;
import demo.account.event.AccountEventType;
import demo.domain.Action;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.function.Consumer;
import java.util.function.Function;
import static demo.account.domain.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED;
@@ -23,20 +24,39 @@ import static demo.account.domain.AccountStatus.ACCOUNT_SUSPENDED;
@Service
public class SuspendAccount extends Action<Account> {
public Consumer<Account> getConsumer() {
private final Logger log = Logger.getLogger(this.getClass());
public Function<Account, Account> getFunction() {
return (account) -> {
Assert.isTrue(account.getStatus() != ACCOUNT_SUSPENDED, "The account is already suspended");
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
Account result;
AccountStatus status = account.getStatus();
// Suspend the account
account.setStatus(AccountStatus.ACCOUNT_SUSPENDED);
account = accountService.update(account);
// Trigger the account suspended event
account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account));
try {
// Trigger the account suspended event
result = account.sendEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED, account)).getEntity();
result.setIdentity(account.getIdentity());
} catch (Exception ex) {
log.error("Account could not be suspended", ex);
// Rollback the operation
account.setStatus(status);
accountService.update(account);
throw ex;
}
return result;
};
}
}

View File

@@ -94,34 +94,30 @@ public class Account extends AbstractEntity<AccountEvent, Long> {
@Command(method = "activate", controller = AccountController.class)
public Account activate() {
getAction(ActivateAccount.class)
.getConsumer()
.accept(this);
return this;
return getAction(ActivateAccount.class)
.getFunction()
.apply(this);
}
@Command(method = "archive", controller = AccountController.class)
public Account archive() {
getAction(ArchiveAccount.class)
.getConsumer()
.accept(this);
return this;
return getAction(ArchiveAccount.class)
.getFunction()
.apply(this);
}
@Command(method = "confirm", controller = AccountController.class)
public Account confirm() {
getAction(ConfirmAccount.class)
.getConsumer()
.accept(this);
return this;
return getAction(ConfirmAccount.class)
.getFunction()
.apply(this);
}
@Command(method = "suspend", controller = AccountController.class)
public Account suspend() {
getAction(SuspendAccount.class)
.getConsumer()
.accept(this);
return this;
return getAction(SuspendAccount.class)
.getFunction()
.apply(this);
}
@Command(method = "postOrder", controller = AccountController.class)

View File

@@ -37,7 +37,7 @@ public class AccountService extends Service<Account, Long> {
log.error("Account registration failed", ex);
// Rollback the account creation
delete(account.getIdentity());
throw new IllegalStateException("Account registration failed", ex);
throw ex;
}
// Return the result

View File

@@ -28,7 +28,7 @@ public class AccountEvent extends Event<Account, AccountEventType, Long> {
@Enumerated(EnumType.STRING)
private AccountEventType type;
@OneToOne(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@OneToOne(cascade = CascadeType.DETACH, fetch = FetchType.LAZY)
@JsonIgnore
private Account entity;

View File

@@ -16,6 +16,8 @@ import java.util.Set;
public class Order extends Aggregate<OrderEvent, Long> {
private Long id;
private Long createdAt;
private Long lastModified;
private List<OrderEvent> orderEvents = new ArrayList<>();
private OrderStatus status;
private Set<LineItem> lineItems = new HashSet<>();
@@ -35,6 +37,22 @@ public class Order extends Aggregate<OrderEvent, Long> {
this.shippingAddress.setAddressType(AddressType.SHIPPING);
}
public Long getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Long createdAt) {
this.createdAt = createdAt;
}
public Long getLastModified() {
return lastModified;
}
public void setLastModified(Long lastModified) {
this.lastModified = lastModified;
}
public OrderStatus getStatus() {
return status;
}

View File

@@ -1,16 +1,24 @@
package demo.order.domain;
import com.fasterxml.jackson.databind.ObjectMapper;
import demo.domain.Service;
import org.apache.log4j.Logger;
import org.springframework.hateoas.TemplateVariable;
import org.springframework.hateoas.UriTemplate;
import org.springframework.http.HttpMethod;
import org.springframework.http.RequestEntity;
import org.springframework.web.client.RestClientResponseException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@org.springframework.stereotype.Service
public class OrderService extends Service<Order, Long> {
private RestTemplate restTemplate;
private final Logger log = Logger.getLogger(this.getClass());
private final RestTemplate restTemplate;
public OrderService(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
@@ -18,36 +26,87 @@ public class OrderService extends Service<Order, Long> {
@Override
public Order get(Long orderId) {
return restTemplate.getForObject(new UriTemplate("http://order-web/v1/orders/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(orderId), Order.class);
Order result;
try {
result = restTemplate.getForObject(new UriTemplate("http://order-web/v1/orders/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(orderId), Order.class);
} catch (RestClientResponseException ex) {
log.error("Get order failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
@Override
public Order create(Order order) {
return restTemplate.postForObject(new UriTemplate("http://order-web/v1/orders").expand(),
order, Order.class);
Order result;
try {
result = restTemplate.postForObject(new UriTemplate("http://order-web/v1/orders").expand(),
order, Order.class);
} catch (RestClientResponseException ex) {
log.error("Create order failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
@Override
public Order update(Order order) {
return restTemplate.exchange(new RequestEntity<>(order, HttpMethod.PUT, new UriTemplate
("http://order-web/v1/orders/{id}").with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(order.getIdentity())), Order.class)
.getBody();
Order result;
try {
result = restTemplate.exchange(new RequestEntity<>(order, HttpMethod.PUT,
new UriTemplate("http://order-web/v1/orders/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(order.getIdentity())), Order.class).getBody();
} catch (RestClientResponseException ex) {
log.error("Update order failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
@Override
public boolean delete(Long orderId) {
restTemplate.delete(new UriTemplate("http://order-web/v1/orders/{id}").with("id", TemplateVariable
.VariableType.PATH_VARIABLE)
.expand(orderId));
try {
restTemplate.delete(new UriTemplate("http://order-web/v1/orders/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE).expand(orderId));
} catch (RestClientResponseException ex) {
log.error("Delete order failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return true;
}
public Orders findOrdersByAccountId(Long accountId) {
return restTemplate.getForObject(new UriTemplate("http://order-web/v1/orders/search/findOrdersByAccountId")
.with("accountId", TemplateVariable.VariableType.REQUEST_PARAM)
.expand(accountId), Orders.class);
Orders result;
try {
result = restTemplate
.getForObject(new UriTemplate("http://order-web/v1/orders/search/findOrdersByAccountId")
.with("accountId", TemplateVariable.VariableType.REQUEST_PARAM)
.expand(accountId), Orders.class);
} catch (RestClientResponseException ex) {
log.error("Delete order failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
private String getHttpStatusMessage(RestClientResponseException ex) {
Map<String, String> errorMap = new HashMap<>();
try {
errorMap = new ObjectMapper()
.readValue(ex.getResponseBodyAsString(), errorMap
.getClass());
} catch (IOException e) {
e.printStackTrace();
}
return errorMap.getOrDefault("message", null);
}
}

View File

@@ -42,7 +42,7 @@ public class ConnectAccount extends Action<Order> {
order.setAccountId(null);
order.setStatus(OrderStatus.ORDER_CREATED);
orderService.update(order);
throw new IllegalStateException("Could not connect order to account", ex);
throw ex;
}
return result;

View File

@@ -44,7 +44,7 @@ public class ConnectPayment extends Action<Order> {
order.setPaymentId(null);
order.setStatus(OrderStatus.ORDER_CREATED);
orderService.update(order);
throw new IllegalStateException("Could not connect payment to order", ex);
throw ex;
}
return result;

View File

@@ -68,7 +68,7 @@ public class CreatePayment extends Action<Order> {
order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
orderService.update(order);
throw new IllegalStateException("Payment creation failed", ex);
throw ex;
}
return result;

View File

@@ -103,9 +103,10 @@ public class OrderController {
@RequestMapping(path = "/orders/{id}/commands/createPayment")
public ResponseEntity createPayment(@PathVariable Long id) {
return Optional.ofNullable(orderService.get(id)
.createPayment())
.map(e -> new ResponseEntity<>(getOrderResource(e), HttpStatus.OK))
return Optional.of(orderService.get(id))
.map(Order::createPayment)
.map(this::getOrderResource)
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@@ -214,25 +215,25 @@ public class OrderController {
private Resource<Order> getOrderResource(Order order) {
if (order == null) return null;
if (order.getLink("commands") == null) {
if (!order.hasLink("commands")) {
// Add command link
order.add(linkBuilder("getCommands", order.getIdentity()).withRel("commands"));
}
if (order.getLink("events") == null) {
if (!order.hasLink("events")) {
// Add get events link
order.add(linkBuilder("getOrderEvents", order.getIdentity()).withRel("events"));
}
// Add remote account link
if (order.getAccountId() != null && order.getLink("account") == null) {
if (order.getAccountId() != null && !order.hasLink("account")) {
Link result = getRemoteLink("account-web", "/v1/accounts/{id}", order.getAccountId(), "account");
if (result != null)
order.add(result);
}
// Add remote payment link
if (order.getPaymentId() != null && order.getLink("payment") == null) {
if (order.getPaymentId() != null && !order.hasLink("payment")) {
Link result = getRemoteLink("payment-web", "/v1/payments/{id}", order.getPaymentId(), "payment");
if (result != null)
order.add(result);

View File

@@ -28,7 +28,7 @@ public class OrderEvent extends Event<Order, OrderEventType, Long> {
@Enumerated(EnumType.STRING)
private OrderEventType type;
@OneToOne(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@OneToOne(cascade = CascadeType.DETACH, fetch = FetchType.LAZY)
@JsonIgnore
private Order entity;

View File

@@ -1,16 +1,24 @@
package demo.payment.domain;
import com.fasterxml.jackson.databind.ObjectMapper;
import demo.domain.Service;
import org.apache.log4j.Logger;
import org.springframework.hateoas.TemplateVariable;
import org.springframework.hateoas.UriTemplate;
import org.springframework.http.HttpMethod;
import org.springframework.http.RequestEntity;
import org.springframework.web.client.RestClientResponseException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@org.springframework.stereotype.Service
public class PaymentService extends Service<Payment, Long> {
private RestTemplate restTemplate;
private final Logger log = Logger.getLogger(this.getClass());
private final RestTemplate restTemplate;
public PaymentService(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
@@ -18,30 +26,72 @@ public class PaymentService extends Service<Payment, Long> {
@Override
public Payment get(Long paymentId) {
return restTemplate.getForObject(new UriTemplate("http://payment-web/v1/payments/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(paymentId), Payment.class);
Payment result;
try {
result = restTemplate.getForObject(new UriTemplate("http://payment-web/v1/payments/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(paymentId), Payment.class);
} catch (RestClientResponseException ex) {
log.error("Get payment failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
@Override
public Payment create(Payment payment) {
return restTemplate.postForObject(new UriTemplate("http://payment-web/v1/payments").expand(),
payment, Payment.class);
Payment result;
try {
result = restTemplate.postForObject(new UriTemplate("http://payment-web/v1/payments").expand(),
payment, Payment.class);
} catch (RestClientResponseException ex) {
log.error("Create payment failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
@Override
public Payment update(Payment payment) {
return restTemplate.exchange(new RequestEntity<>(payment, HttpMethod.PUT, new UriTemplate
("http://payment-web/v1/payments/{id}").with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(payment.getIdentity())), Payment.class)
.getBody();
Payment result;
try {
result = restTemplate.exchange(new RequestEntity<>(payment, HttpMethod.PUT,
new UriTemplate("http://payment-web/v1/payments/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(payment.getIdentity())), Payment.class).getBody();
} catch (RestClientResponseException ex) {
log.error("Update payment failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return result;
}
@Override
public boolean delete(Long paymentId) {
restTemplate.delete(new UriTemplate("http://payment-web/v1/payments/{id}").with("id", TemplateVariable
.VariableType.PATH_VARIABLE)
.expand(paymentId));
try {
restTemplate.delete(new UriTemplate("http://payment-web/v1/payments/{id}")
.with("id", TemplateVariable.VariableType.PATH_VARIABLE).expand(paymentId));
} catch (RestClientResponseException ex) {
log.error("Delete payment failed", ex);
throw new IllegalStateException(getHttpStatusMessage(ex), ex);
}
return true;
}
private String getHttpStatusMessage(RestClientResponseException ex) {
Map<String, String> errorMap = new HashMap<>();
try {
errorMap = new ObjectMapper()
.readValue(ex.getResponseBodyAsString(), errorMap
.getClass());
} catch (IOException e) {
e.printStackTrace();
}
return errorMap.getOrDefault("message", null);
}
}

View File

@@ -322,7 +322,7 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
@Bean
public Action<OrderStatus, OrderEventType> accountConnected() {
return context -> applyEvent(context,
new ReservationFailed(context, event -> {
new AccountConnected(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the order resource for the event
Traverson traverson = new Traverson(

View File

@@ -7,6 +7,7 @@ import demo.payment.domain.PaymentService;
import demo.payment.domain.PaymentStatus;
import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
@@ -14,6 +15,8 @@ import java.util.function.BiFunction;
@Service
public class ConnectOrder extends Action<Payment> {
private final Logger log = Logger.getLogger(this.getClass());
public BiFunction<Payment, Long, Payment> getFunction() {
return (payment, orderId) -> {
Assert.isTrue(payment
@@ -33,9 +36,13 @@ public class ConnectOrder extends Action<Payment> {
// Trigger the payment connected
result = payment.sendEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment)).getEntity();
} catch (IllegalStateException ex) {
log.error("Payment could not be connected to order", ex);
// Rollback operation
payment.setStatus(PaymentStatus.PAYMENT_CREATED);
payment.setOrderId(null);
paymentService.update(payment);
throw ex;
}

View File

@@ -2,31 +2,66 @@ package demo.payment.action;
import demo.domain.Action;
import demo.payment.domain.Payment;
import demo.payment.domain.PaymentModule;
import demo.payment.domain.PaymentService;
import demo.payment.domain.PaymentStatus;
import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Function;
import static demo.payment.domain.PaymentStatus.PAYMENT_FAILED;
import static demo.payment.domain.PaymentStatus.PAYMENT_SUCCEEDED;
@Service
public class ProcessPayment extends Action<Payment> {
public Consumer<Payment> getConsumer() {
private final Logger log = Logger.getLogger(this.getClass());
private final PaymentService paymentService;
public ProcessPayment(PaymentService paymentService) {
this.paymentService = paymentService;
}
public Function<Payment, Payment> getFunction() {
return payment -> {
// Validations
Assert.isTrue(!Arrays.asList(PaymentStatus.PAYMENT_SUCCEEDED,
Assert.isTrue(!Arrays.asList(PAYMENT_SUCCEEDED,
PaymentStatus.PAYMENT_PENDING,
PaymentStatus.PAYMENT_FAILED).contains(payment.getStatus()), "Payment has already been processed");
Assert.isTrue(payment.getStatus() == PaymentStatus.ORDER_CONNECTED,
"Payment must be connected to an order");
PaymentService paymentService = payment.getModule(PaymentModule.class)
.getDefaultService();
Payment result = null;
payment.setStatus(PaymentStatus.PAYMENT_PROCESSED);
paymentService.update(payment);
payment = paymentService.update(payment);
try {
// Trigger the payment processed event
result = payment.sendEvent(new PaymentEvent(PaymentEventType.PAYMENT_PROCESSED, payment)).getEntity();
} catch (Exception ex) {
log.error("Payment could not be processed", ex);
} finally {
// Handle the result asynchronously
if (result != null && result.getStatus() == PaymentStatus.PAYMENT_PROCESSED) {
payment = finalizePayment(payment, PAYMENT_SUCCEEDED);
} else {
payment = finalizePayment(payment, PAYMENT_FAILED);
}
}
return result;
};
}
private Payment finalizePayment(Payment payment, PaymentStatus paymentStatus) {
payment = paymentService.get(payment.getIdentity());
payment.setStatus(paymentStatus);
payment.sendAsyncEvent(new PaymentEvent(PaymentEventType
.valueOf(paymentStatus.toString()), payment));
return payment;
}
}

View File

@@ -132,8 +132,6 @@ public class PaymentController {
// Create the new payment
payment = paymentService.registerPayment(payment);
payment.getLinks().clear();
return new Resource<>(payment);
}

View File

@@ -97,11 +97,9 @@ public class Payment extends AbstractEntity<PaymentEvent, Long> {
@Command(method = "processPayment", controller = PaymentController.class)
public Payment processPayment() {
getAction(ProcessPayment.class)
.getConsumer()
.accept(this);
return this;
return getAction(ProcessPayment.class)
.getFunction()
.apply(this);
}
/**

View File

@@ -5,7 +5,9 @@ import demo.event.EventService;
import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType;
import demo.payment.repository.PaymentRepository;
import org.apache.log4j.Logger;
import org.springframework.util.Assert;
import org.springframework.web.client.ResourceAccessException;
/**
* The {@link PaymentService} provides transactional support for managing {@link Payment} entities. This service also
@@ -18,6 +20,7 @@ import org.springframework.util.Assert;
@org.springframework.stereotype.Service
public class PaymentService extends Service<Payment, Long> {
private final Logger log = Logger.getLogger(this.getClass());
private final PaymentRepository paymentRepository;
public PaymentService(PaymentRepository paymentRepository, EventService<PaymentEvent, Long> eventService) {
@@ -25,19 +28,23 @@ public class PaymentService extends Service<Payment, Long> {
}
public Payment registerPayment(Payment payment) {
Payment result;
payment = create(payment);
// Trigger the payment creation event
PaymentEvent event = payment.sendEvent(new PaymentEvent(PaymentEventType.PAYMENT_CREATED, payment));
// Attach payment identifier
event.getEntity()
.setIdentity(payment.getIdentity());
event.getEntity().getLinks().clear();
try {
// Handle a synchronous event flow
result = payment.sendEvent(new PaymentEvent(PaymentEventType.PAYMENT_CREATED, payment)).getEntity();
result.setIdentity(payment.getIdentity());
} catch (Exception ex) {
log.error("Payment creation failed", ex);
// Rollback the payment creation
delete(payment.getIdentity());
throw new ResourceAccessException(ex.getMessage());
}
// Return the result
return event.getEntity();
return result;
}
public Payment get(Long id) {
@@ -62,7 +69,7 @@ public class PaymentService extends Service<Payment, Long> {
currentPayment.setOrderId(payment.getOrderId());
currentPayment.setAmount(payment.getAmount());
return paymentRepository.save(currentPayment);
return paymentRepository.saveAndFlush(currentPayment);
}
public boolean delete(Long id) {

View File

@@ -26,7 +26,7 @@ public class PaymentEvent extends Event<Payment, PaymentEventType, Long> {
@Enumerated(EnumType.STRING)
private PaymentEventType type;
@OneToOne(cascade = CascadeType.MERGE, fetch = FetchType.LAZY)
@OneToOne(cascade = CascadeType.DETACH, fetch = FetchType.LAZY)
@JsonIgnore
private Payment entity;

View File

@@ -1,10 +1,10 @@
package demo.config;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.function.*;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -17,6 +17,7 @@ import org.springframework.statemachine.config.EnableStateMachineFactory;
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.EnumSet;
@@ -81,12 +82,6 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Paymen
.and()
.withExternal()
.source(PaymentStatus.ORDER_CONNECTED)
.target(PaymentStatus.PAYMENT_PENDING)
.event(PaymentEventType.PAYMENT_PENDING)
.action(paymentPending())
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_PENDING)
.target(PaymentStatus.PAYMENT_PROCESSED)
.event(PaymentEventType.PAYMENT_PROCESSED)
.action(paymentProcessed())
@@ -95,13 +90,13 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Paymen
.source(PaymentStatus.PAYMENT_PROCESSED)
.target(PaymentStatus.PAYMENT_SUCCEEDED)
.event(PaymentEventType.PAYMENT_SUCCEEDED)
.action(paymentSucceeded())
.action(paymentSucceeded(new RestTemplate()))
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_PROCESSED)
.target(PaymentStatus.PAYMENT_FAILED)
.event(PaymentEventType.PAYMENT_FAILED)
.action(paymentFailed());
.action(paymentFailed(new RestTemplate()));
} catch (Exception e) {
throw new RuntimeException("Could not configure state machine transitions", e);
}
@@ -120,7 +115,8 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Paymen
* @param paymentFunction is the payment function to apply after the state machine has completed replication
* @return an {@link PaymentEvent} only if this event has not yet been processed, otherwise returns null
*/
private PaymentEvent applyEvent(StateContext<PaymentStatus, PaymentEventType> context, PaymentFunction paymentFunction) {
private PaymentEvent applyEvent(StateContext<PaymentStatus, PaymentEventType> context, PaymentFunction
paymentFunction) {
PaymentEvent paymentEvent = null;
// Log out the progress of the state machine replication
@@ -207,37 +203,37 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Paymen
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentSucceeded() {
public Action<PaymentStatus, PaymentEventType> paymentSucceeded(RestTemplate restTemplate) {
return context -> applyEvent(context,
new PaymentSucceeded(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
return updatePaymentStatus(restTemplate, event, PaymentStatus.PAYMENT_SUCCEEDED);
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentFailed() {
public Action<PaymentStatus, PaymentEventType> paymentFailed(RestTemplate restTemplate) {
return context -> applyEvent(context,
new PaymentFailed(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
return updatePaymentStatus(restTemplate, event, PaymentStatus.PAYMENT_FAILED);
}));
}
private Payment updatePaymentStatus(RestTemplate restTemplate, PaymentEvent event, PaymentStatus status) {
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
Payment payment = traverson.follow("self")
.toEntity(Payment.class)
.getBody();
payment.setStatus(status);
restTemplate.put(payment.getLink("self").getHref(), payment);
return payment;
}
}

View File

@@ -11,9 +11,12 @@ import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
@@ -50,13 +53,17 @@ class EventServiceImpl<T extends Event, ID extends Serializable> implements Even
.contentType(MediaTypes.HAL_JSON)
.body(new Resource<T>(event), Resource.class);
// Send the event to the event stream processor
E entity = (E) restTemplate.exchange(requestEntity, event.getEntity()
.getClass())
.getBody();
try {
// Send the event to the event stream processor
ResponseEntity<E> response = restTemplate.exchange(requestEntity, (Class<E>) event.getEntity().getClass());
E entity = response.getBody();
// Set the applied entity reference to the event
event.setEntity(entity);
// Set the applied entity reference to the event
event.setEntity(entity);
} catch (Exception ex) {
log.error(ex);
throw new ResourceAccessException(ex.getMessage(), new IOException(ex));
}
return event;
}