Order and Payment

This commit is contained in:
Kenny Bastani
2016-12-23 13:39:53 -05:00
parent 03c18c0fe3
commit e1dc702107
78 changed files with 3059 additions and 51 deletions

View File

@@ -10,6 +10,15 @@ import demo.order.OrderStatus;
* @author kbastani
*/
public enum OrderEventType {
// TODO: Implement
ORDER_CREATED
ORDER_CREATED,
ACCOUNT_CONNECTED,
RESERVATION_PENDING,
INVENTORY_RESERVED,
RESERVATION_SUCCEEDED,
RESERVATION_FAILED,
PAYMENT_CREATED,
PAYMENT_CONNECTED,
PAYMENT_PENDING,
PAYMENT_SUCCEEDED,
PAYMENT_FAILED
}

View File

@@ -31,7 +31,7 @@ public class Order extends BaseEntity {
private Address shippingAddress;
public Order() {
this.status = OrderStatus.PURCHASED;
this.status = OrderStatus.ORDER_CREATED;
}
public Order(String accountNumber, Address shippingAddress) {

View File

@@ -1,6 +1,9 @@
package demo.order;
public enum OrderCommand {
// TODO: Create commands
TODO
CONNECT_ACCOUNT,
RESERVE_INVENTORY,
CREATE_PAYMENT,
CONNECT_PAYMENT,
PROCESS_PAYMENT
}

View File

@@ -76,10 +76,42 @@ public class OrderController {
.orElseThrow(() -> new RuntimeException("The order could not be found"));
}
@GetMapping(path = "/orders/{id}/commands/todo")
public ResponseEntity confirmOrder(@PathVariable Long id) {
@GetMapping(path = "/orders/{id}/commands/connectAccount")
public ResponseEntity connectAccount(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.TODO)))
orderService.applyCommand(id, OrderCommand.CONNECT_ACCOUNT)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@GetMapping(path = "/orders/{id}/commands/connectPayment")
public ResponseEntity connectPayment(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.CONNECT_PAYMENT)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@GetMapping(path = "/orders/{id}/commands/createPayment")
public ResponseEntity createPayment(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.CREATE_PAYMENT)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@GetMapping(path = "/orders/{id}/commands/processPayment")
public ResponseEntity processPayment(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.PROCESS_PAYMENT)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@GetMapping(path = "/orders/{id}/commands/reserveInventory")
public ResponseEntity reserveInventory(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.RESERVE_INVENTORY)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@@ -177,17 +209,20 @@ public class OrderController {
if (orderResource != null) {
commandResource.add(
getCommandLinkBuilder(id)
.slash("confirm")
.withRel("confirm"),
.slash("connectAccount")
.withRel("connectAccount"),
getCommandLinkBuilder(id)
.slash("activate")
.withRel("activate"),
.slash("reserveInventory")
.withRel("reserveInventory"),
getCommandLinkBuilder(id)
.slash("suspend")
.withRel("suspend"),
.slash("createPayment")
.withRel("createPayment"),
getCommandLinkBuilder(id)
.slash("archive")
.withRel("archive")
.slash("connectPayment")
.withRel("connectPayment"),
getCommandLinkBuilder(id)
.slash("processPayment")
.withRel("processPayment")
);
}

View File

@@ -1,9 +1,15 @@
package demo.order;
public enum OrderStatus {
PURCHASED,
PENDING,
CONFIRMED,
SHIPPED,
DELIVERED
ORDER_CREATED,
ACCOUNT_CONNECTED,
RESERVATION_PENDING,
INVENTORY_RESERVED,
RESERVATION_SUCCEEDED,
RESERVATION_FAILED,
PAYMENT_CREATED,
PAYMENT_CONNECTED,
PAYMENT_PENDING,
PAYMENT_SUCCEEDED,
PAYMENT_FAILED
}

View File

@@ -2,11 +2,15 @@ package demo.config;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.function.OrderFunction;
import demo.function.*;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.stream.OrderStream;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.action.Action;
@@ -15,6 +19,7 @@ import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import java.net.URI;
import java.util.EnumSet;
/**
@@ -23,7 +28,7 @@ import java.util.EnumSet;
* <p>
* A state machine provides a robust declarative language for describing the state of an {@link Order}
* resource given a sequence of ordered {@link demo.event.OrderEvents}. When an event is received
* in {@link demo.event.OrderEventStream}, an in-memory state machine is fully replicated given the
* in {@link OrderStream}, an in-memory state machine is fully replicated given the
* {@link demo.event.OrderEvents} attached to an {@link Order} resource.
*
* @author kbastani
@@ -44,7 +49,7 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
try {
// Describe the initial condition of the order state machine
states.withStates()
.initial(OrderStatus.PENDING)
.initial(OrderStatus.ORDER_CREATED)
.states(EnumSet.allOf(OrderStatus.class));
} catch (Exception e) {
throw new RuntimeException("State machine configuration failed", e);
@@ -63,12 +68,240 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderEventType> transitions) {
try {
// Describe state machine transitions for orders
// TODO: Configure state machine
transitions.withExternal()
.source(OrderStatus.ORDER_CREATED)
.target(OrderStatus.ORDER_CREATED)
.event(OrderEventType.ORDER_CREATED)
.action(orderCreated())
.and()
.withExternal()
.source(OrderStatus.ORDER_CREATED)
.target(OrderStatus.ACCOUNT_CONNECTED)
.event(OrderEventType.ACCOUNT_CONNECTED)
.action(accountConnected())
.and()
.withExternal()
.source(OrderStatus.ACCOUNT_CONNECTED)
.target(OrderStatus.RESERVATION_PENDING)
.event(OrderEventType.RESERVATION_PENDING)
.action(reservationPending())
.and()
.withExternal()
.source(OrderStatus.RESERVATION_PENDING)
.target(OrderStatus.RESERVATION_SUCCEEDED)
.event(OrderEventType.RESERVATION_SUCCEEDED)
.action(reservationSucceeded())
.and()
.withExternal()
.source(OrderStatus.RESERVATION_PENDING)
.target(OrderStatus.RESERVATION_FAILED)
.event(OrderEventType.RESERVATION_FAILED)
.action(reservationFailed())
.and()
.withExternal()
.source(OrderStatus.RESERVATION_SUCCEEDED)
.target(OrderStatus.PAYMENT_CREATED)
.event(OrderEventType.PAYMENT_CREATED)
.action(paymentCreated())
.and()
.withExternal()
.source(OrderStatus.PAYMENT_CREATED)
.target(OrderStatus.PAYMENT_CONNECTED)
.event(OrderEventType.PAYMENT_CONNECTED)
.action(paymentConnected())
.and()
.withExternal()
.source(OrderStatus.PAYMENT_CONNECTED)
.target(OrderStatus.PAYMENT_PENDING)
.event(OrderEventType.PAYMENT_PENDING)
.action(paymentPending())
.and()
.withExternal()
.source(OrderStatus.PAYMENT_PENDING)
.target(OrderStatus.PAYMENT_SUCCEEDED)
.event(OrderEventType.PAYMENT_SUCCEEDED)
.action(paymentSucceeded())
.and()
.withExternal()
.source(OrderStatus.PAYMENT_PENDING)
.target(OrderStatus.PAYMENT_FAILED)
.event(OrderEventType.PAYMENT_FAILED)
.action(paymentFailed());
} catch (Exception e) {
throw new RuntimeException("Could not configure state machine transitions", e);
}
}
@Bean
public Action<OrderStatus, OrderEventType> orderCreated() {
return context -> applyEvent(context,
new OrderCreated(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> paymentPending() {
return context -> applyEvent(context,
new PaymentPending(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> reservationPending() {
return context -> applyEvent(context,
new ReservationPending(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> paymentFailed() {
return context -> applyEvent(context,
new PaymentFailed(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> paymentSucceeded() {
return context -> applyEvent(context,
new PaymentSucceeded(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> paymentConnected() {
return context -> applyEvent(context,
new PaymentConnected(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> paymentCreated() {
return context -> applyEvent(context,
new PaymentCreated(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> reservationSucceeded() {
return context -> applyEvent(context,
new ReservationSucceeded(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> reservationFailed() {
return context -> applyEvent(context,
new ReservationSucceeded(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
@Bean
public Action<OrderStatus, OrderEventType> accountConnected() {
return context -> applyEvent(context,
new ReservationFailed(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Order.class)
.getBody();
}));
}
/**
* Functions are mapped to actions that are triggered during the replication of a state machine. Functions
* should only be executed after the state machine has completed replication. This method checks the state
@@ -78,12 +311,12 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
* The {@link OrderFunction} argument is only applied if an {@link OrderEvent} is provided as a
* message header in the {@link StateContext}.
*
* @param context is the state machine context that may include an {@link OrderEvent}
* @param context is the state machine context that may include an {@link OrderEvent}
* @param orderFunction is the order function to apply after the state machine has completed replication
* @return an {@link OrderEvent} only if this event has not yet been processed, otherwise returns null
*/
private OrderEvent applyEvent(StateContext<OrderStatus, OrderEventType> context,
OrderFunction orderFunction) {
OrderFunction orderFunction) {
OrderEvent orderEvent = null;
// Log out the progress of the state machine replication

View File

@@ -1,5 +1,15 @@
package demo.event;
public enum OrderEventType {
// TODO: Add event types
ORDER_CREATED,
ACCOUNT_CONNECTED,
RESERVATION_PENDING,
INVENTORY_RESERVED,
RESERVATION_SUCCEEDED,
RESERVATION_FAILED,
PAYMENT_CREATED,
PAYMENT_CONNECTED,
PAYMENT_PENDING,
PAYMENT_SUCCEEDED,
PAYMENT_FAILED
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class AccountConnected extends OrderFunction {
final private Logger log = Logger.getLogger(AccountConnected.class);
public AccountConnected(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for account connected...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class OrderCreated extends OrderFunction {
final private Logger log = Logger.getLogger(OrderCreated.class);
public OrderCreated(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for order created...");
return super.apply(event);
}
}

View File

@@ -46,6 +46,7 @@ public abstract class OrderFunction {
// Execute the lambda function
Order result = lambda.apply(event);
context.getExtendedState().getVariables().put("order", result);
log.info("Order function: " + event.getType());
return result;
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentConnected extends OrderFunction {
final private Logger log = Logger.getLogger(PaymentConnected.class);
public PaymentConnected(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for payment connected...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentCreated extends OrderFunction {
final private Logger log = Logger.getLogger(PaymentCreated.class);
public PaymentCreated(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for payment created...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentFailed extends OrderFunction {
final private Logger log = Logger.getLogger(PaymentFailed.class);
public PaymentFailed(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for payment failed...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentPending extends OrderFunction {
final private Logger log = Logger.getLogger(PaymentPending.class);
public PaymentPending(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for payment pending...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentSucceeded extends OrderFunction {
final private Logger log = Logger.getLogger(PaymentSucceeded.class);
public PaymentSucceeded(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for payment succeeded...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class ReservationFailed extends OrderFunction {
final private Logger log = Logger.getLogger(ReservationFailed.class);
public ReservationFailed(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for reservation failed...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class ReservationPending extends OrderFunction {
final private Logger log = Logger.getLogger(ReservationPending.class);
public ReservationPending(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for reservation pending...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class ReservationSucceeded extends OrderFunction {
final private Logger log = Logger.getLogger(ReservationSucceeded.class);
public ReservationSucceeded(StateContext<OrderStatus, OrderEventType> context, Function<OrderEvent, Order> lambda) {
super(context, lambda);
}
/**
* Apply an {@link OrderEvent} to the lambda function that was provided through the
* constructor of this {@link OrderFunction}.
*
* @param event is the {@link OrderEvent} to apply to the lambda function
*/
@Override
public Order apply(OrderEvent event) {
log.info("Executing workflow for reservation succeeded...");
return super.apply(event);
}
}

View File

@@ -1,17 +1,16 @@
package demo.order;
/**
* A simple domain class for the {@link LineItem} concept in the order context.
*
* @author Kenny Bastani
* @author Josh Long
*/
public class LineItem {
import java.io.Serializable;
public class LineItem implements Serializable {
private String name, productId;
private Integer quantity;
private Double price, tax;
public LineItem() {
}
public LineItem(String name, String productId, Integer quantity,
Double price, Double tax) {
this.name = name;

View File

@@ -1,14 +1,14 @@
package demo.order;
/**
* Describes the state of an {@link Order}.
*
* @author Kenny Bastani
* @author Josh Long
*/
public enum OrderStatus {
PENDING,
CONFIRMED,
SHIPPED,
DELIVERED
ORDER_CREATED,
ACCOUNT_CONNECTED,
RESERVATION_PENDING,
RESERVATION_SUCCEEDED,
RESERVATION_FAILED,
PAYMENT_CREATED,
PAYMENT_CONNECTED,
PAYMENT_PENDING,
PAYMENT_SUCCEEDED,
PAYMENT_FAILED
}

View File

@@ -1,5 +1,7 @@
package demo.event;
package demo.stream;
import demo.event.EventService;
import demo.event.OrderEvent;
import demo.order.Order;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
@@ -8,7 +10,7 @@ import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Profile;
/**
* The {@link OrderEventStream} monitors for a variety of {@link OrderEvent} domain
* The {@link OrderStream} monitors for a variety of {@link OrderEvent} domain
* events for an {@link Order}.
*
* @author kbastani
@@ -16,11 +18,11 @@ import org.springframework.context.annotation.Profile;
@EnableAutoConfiguration
@EnableBinding(Sink.class)
@Profile({ "cloud", "development" })
public class OrderEventStream {
public class OrderStream {
private EventService eventService;
public OrderEventStream(EventService eventService) {
public OrderStream(EventService eventService) {
this.eventService = eventService;
}

View File

@@ -8,8 +8,8 @@ spring:
stream:
bindings:
input:
destination: account
group: account-group
destination: order
group: order-group
contentType: 'application/json'
consumer:
durableSubscription: true

3
payment/README.md Normal file
View File

@@ -0,0 +1,3 @@
# Payment Microservice
This is the parent project that contains modules of a microservice deployment for the _Payment_ domain context. The two modules contained in this project are separated into separate deployment artifacts, one for synchronous HTTP-based interactions and one for asynchronous AMQP-based messaging.

View File

@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>payment-web</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>payment-web</name>
<parent>
<groupId>org.kbastani</groupId>
<artifactId>payment</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,14 @@
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableHypermediaSupport;
@SpringBootApplication
@EnableHypermediaSupport(type = {EnableHypermediaSupport.HypermediaType.HAL})
public class PaymentServiceApplication {
public static void main(String[] args) {
SpringApplication.run(PaymentServiceApplication.class, args);
}
}

View File

@@ -0,0 +1,46 @@
package demo.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Arrays;
@Configuration
@EnableCaching
public class CacheConfig {
@Bean
public JedisConnectionFactory redisConnectionFactory(
@Value("${spring.redis.port}") Integer redisPort,
@Value("${spring.redis.host}") String redisHost) {
JedisConnectionFactory redisConnectionFactory = new JedisConnectionFactory();
redisConnectionFactory.setHostName(redisHost);
redisConnectionFactory.setPort(redisPort);
return redisConnectionFactory;
}
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory cf) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(cf);
return redisTemplate;
}
@Bean
public CacheManager cacheManager(RedisTemplate redisTemplate) {
RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
cacheManager.setDefaultExpiration(50000);
cacheManager.setCacheNames(Arrays.asList("payments", "payment-events"));
cacheManager.setUsePrefix(true);
return cacheManager;
}
}

View File

@@ -0,0 +1,13 @@
package demo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
/**
* Enable JPA auditing on an empty configuration class to disable auditing on
*
*/
@Configuration
@EnableJpaAuditing
public class JpaConfig {
}

View File

@@ -0,0 +1,10 @@
package demo.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBinding(Source.class)
public class StreamConfig {
}

View File

@@ -0,0 +1,36 @@
package demo.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import java.util.Collections;
import java.util.List;
@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter {
private ObjectMapper objectMapper;
public WebMvcConfig(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
converter.setObjectMapper(objectMapper);
converters.add(converter);
}
@Bean
protected RestTemplate restTemplate(ObjectMapper objectMapper) {
MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
converter.setObjectMapper(objectMapper);
return new RestTemplate(Collections.singletonList(converter));
}
}

View File

@@ -0,0 +1,48 @@
package demo.domain;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import org.springframework.hateoas.ResourceSupport;
import javax.persistence.EntityListeners;
import javax.persistence.MappedSuperclass;
import java.io.Serializable;
@MappedSuperclass
@EntityListeners(AuditingEntityListener.class)
public class BaseEntity extends ResourceSupport implements Serializable {
@CreatedDate
private Long createdAt;
@LastModifiedDate
private Long lastModified;
public BaseEntity() {
}
public Long getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Long createdAt) {
this.createdAt = createdAt;
}
public Long getLastModified() {
return lastModified;
}
public void setLastModified(Long lastModified) {
this.lastModified = lastModified;
}
@Override
public String toString() {
return "BaseEntity{" +
"createdAt=" + createdAt +
", lastModified=" + lastModified +
'}';
}
}

View File

@@ -0,0 +1,6 @@
package demo.event;
public enum ConsistencyModel {
BASE,
ACID
}

View File

@@ -0,0 +1,39 @@
package demo.event;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Optional;
@RestController
@RequestMapping("/v1")
public class EventController {
private final EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
@PostMapping(path = "/events/{id}")
public ResponseEntity createEvent(@RequestBody PaymentEvent event, @PathVariable Long id) {
return Optional.ofNullable(eventService.createEvent(id, event, ConsistencyModel.ACID))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new IllegalArgumentException("Event creation failed"));
}
@PutMapping(path = "/events/{id}")
public ResponseEntity updateEvent(@RequestBody PaymentEvent event, @PathVariable Long id) {
return Optional.ofNullable(eventService.updateEvent(id, event))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new IllegalArgumentException("Event update failed"));
}
@GetMapping(path = "/events/{id}")
public ResponseEntity getEvent(@PathVariable Long id) {
return Optional.ofNullable(eventService.getEvent(id))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElse(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}

View File

@@ -0,0 +1,10 @@
package demo.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
public interface EventRepository extends JpaRepository<PaymentEvent, Long> {
Page<PaymentEvent> findPaymentEventsByPaymentId(@Param("paymentId") Long paymentId, Pageable pageable);
}

View File

@@ -0,0 +1,214 @@
package demo.event;
import demo.payment.Payment;
import demo.payment.PaymentController;
import org.apache.log4j.Logger;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.data.domain.PageRequest;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.http.RequestEntity;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* The {@link EventService} provides transactional service methods for {@link PaymentEvent}
* entities of the Payment Service. Payment domain events are generated with a {@link PaymentEventType},
* and action logs are appended to the {@link PaymentEvent}.
*
* @author kbastani
*/
@Service
@CacheConfig(cacheNames = {"payment-events"})
public class EventService {
private final Logger log = Logger.getLogger(EventService.class);
private final EventRepository eventRepository;
private final Source paymentStreamSource;
private final RestTemplate restTemplate;
public EventService(EventRepository eventRepository, Source paymentStreamSource, RestTemplate restTemplate) {
this.eventRepository = eventRepository;
this.paymentStreamSource = paymentStreamSource;
this.restTemplate = restTemplate;
}
/**
* Create a new {@link PaymentEvent} and append it to the event log of the referenced {@link Payment}.
* After the {@link PaymentEvent} has been persisted, send the event to the payment stream. Events can
* be raised as a blocking or non-blocking operation depending on the {@link ConsistencyModel}.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param event is the {@link PaymentEvent} to create
* @param consistencyModel is the desired consistency model for the response
* @return an {@link PaymentEvent} that has been appended to the {@link Payment}'s event log
*/
public PaymentEvent createEvent(Long paymentId, PaymentEvent event, ConsistencyModel consistencyModel) {
event = createEvent(paymentId, event);
return raiseEvent(event, consistencyModel);
}
/**
* Raise an {@link PaymentEvent} that attempts to transition the state of an {@link Payment}.
*
* @param event is an {@link PaymentEvent} that will be raised
* @param consistencyModel is the consistency model for this request
* @return an {@link PaymentEvent} that has been appended to the {@link Payment}'s event log
*/
public PaymentEvent raiseEvent(PaymentEvent event, ConsistencyModel consistencyModel) {
switch (consistencyModel) {
case BASE:
asyncRaiseEvent(event);
break;
case ACID:
event = raiseEvent(event);
break;
}
return event;
}
/**
* Raise an asynchronous {@link PaymentEvent} by sending an AMQP message to the payment stream. Any
* state changes will be applied to the {@link Payment} outside of the current HTTP request context.
* <p>
* Use this operation when a workflow can be processed asynchronously outside of the current HTTP
* request context.
*
* @param event is an {@link PaymentEvent} that will be raised
*/
private void asyncRaiseEvent(PaymentEvent event) {
// Append the payment event to the stream
paymentStreamSource.output()
.send(MessageBuilder
.withPayload(getPaymentEventResource(event))
.build());
}
/**
* Raise a synchronous {@link PaymentEvent} by sending a HTTP request to the payment stream. The response
* is a blocking operation, which ensures that the result of a multi-step workflow will not return until
* the transaction reaches a consistent state.
* <p>
* Use this operation when the result of a workflow must be returned within the current HTTP request context.
*
* @param event is an {@link PaymentEvent} that will be raised
* @return an {@link PaymentEvent} which contains the consistent state of an {@link Payment}
*/
private PaymentEvent raiseEvent(PaymentEvent event) {
try {
// Create a new request entity
RequestEntity<Resource<PaymentEvent>> requestEntity = RequestEntity.post(
URI.create("http://localhost:8081/v1/events"))
.contentType(MediaTypes.HAL_JSON)
.body(getPaymentEventResource(event), Resource.class);
// Update the payment entity's status
Payment result = restTemplate.exchange(requestEntity, Payment.class)
.getBody();
log.info(result);
event.setPayment(result);
} catch (Exception ex) {
log.error(ex);
}
return event;
}
/**
* Create a new {@link PaymentEvent} and publish it to the payment stream.
*
* @param event is the {@link PaymentEvent} to publish to the payment stream
* @return a hypermedia {@link PaymentEvent} resource
*/
@CacheEvict(cacheNames = "payment-events", key = "#id.toString()")
public PaymentEvent createEvent(Long id, PaymentEvent event) {
// Save new event
event = addEvent(event);
Assert.notNull(event, "The event could not be appended to the payment");
return event;
}
/**
* Get an {@link PaymentEvent} with the supplied identifier.
*
* @param id is the unique identifier for the {@link PaymentEvent}
* @return an {@link PaymentEvent}
*/
public Resource<PaymentEvent> getEvent(Long id) {
return getPaymentEventResource(eventRepository.findOne(id));
}
/**
* Update an {@link PaymentEvent} with the supplied identifier.
*
* @param id is the unique identifier for the {@link PaymentEvent}
* @param event is the {@link PaymentEvent} to update
* @return the updated {@link PaymentEvent}
*/
@CacheEvict(cacheNames = "payment-events", key = "#event.getPayment().getPaymentId().toString()")
public PaymentEvent updateEvent(Long id, PaymentEvent event) {
Assert.notNull(id);
Assert.isTrue(event.getId() == null || Objects.equals(id, event.getId()));
return eventRepository.save(event);
}
/**
* Get {@link PaymentEvents} for the supplied {@link Payment} identifier.
*
* @param id is the unique identifier of the {@link Payment}
* @return a list of {@link PaymentEvent} wrapped in a hypermedia {@link PaymentEvents} resource
*/
@Cacheable(cacheNames = "payment-events", key = "#id.toString()")
public List<PaymentEvent> getPaymentEvents(Long id) {
return eventRepository.findPaymentEventsByPaymentId(id,
new PageRequest(0, Integer.MAX_VALUE)).getContent();
}
/**
* Gets a hypermedia resource for a {@link PaymentEvent} entity.
*
* @param event is the {@link PaymentEvent} to enrich with hypermedia
* @return a hypermedia resource for the supplied {@link PaymentEvent} entity
*/
private Resource<PaymentEvent> getPaymentEventResource(PaymentEvent event) {
return new Resource<PaymentEvent>(event, Arrays.asList(
linkTo(PaymentController.class)
.slash("events")
.slash(event.getEventId())
.withSelfRel(),
linkTo(PaymentController.class)
.slash("payments")
.slash(event.getPayment().getPaymentId())
.withRel("payment")));
}
/**
* Add a {@link PaymentEvent} to an {@link Payment} entity.
*
* @param event is the {@link PaymentEvent} to append to an {@link Payment} entity
* @return the newly appended {@link PaymentEvent} entity
*/
@CacheEvict(cacheNames = "payment-events", key = "#event.getPayment().getPaymentId().toString()")
private PaymentEvent addEvent(PaymentEvent event) {
event = eventRepository.saveAndFlush(event);
return event;
}
}

View File

@@ -0,0 +1,73 @@
package demo.event;
import com.fasterxml.jackson.annotation.JsonIgnore;
import demo.payment.Payment;
import demo.domain.BaseEntity;
import javax.persistence.*;
/**
* The domain event {@link PaymentEvent} tracks the type and state of events as
* applied to the {@link Payment} domain object. This event resource can be used
* to event source the aggregate state of {@link Payment}.
* <p>
* This event resource also provides a transaction log that can be used to append
* actions to the event.
*
* @author kbastani
*/
@Entity
public class PaymentEvent extends BaseEntity {
@Id
@GeneratedValue
private Long id;
@Enumerated(EnumType.STRING)
private PaymentEventType type;
@OneToOne(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JsonIgnore
private Payment payment;
public PaymentEvent() {
}
public PaymentEvent(PaymentEventType type) {
this.type = type;
}
@JsonIgnore
public Long getEventId() {
return id;
}
public void setEventId(Long id) {
this.id = id;
}
public PaymentEventType getType() {
return type;
}
public void setType(PaymentEventType type) {
this.type = type;
}
public Payment getPayment() {
return payment;
}
public void setPayment(Payment payment) {
this.payment = payment;
}
@Override
public String toString() {
return "PaymentEvent{" +
"id=" + id +
", type=" + type +
", payment=" + payment +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,18 @@
package demo.event;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
/**
* The {@link PaymentEventType} represents a collection of possible events that describe
* state transitions of {@link PaymentStatus} on the {@link Payment} aggregate.
*
* @author kbastani
*/
public enum PaymentEventType {
PAYMENT_CREATED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,
PAYMENT_SUCCEEDED
}

View File

@@ -0,0 +1,74 @@
package demo.event;
import com.fasterxml.jackson.annotation.JsonIgnore;
import demo.payment.Payment;
import demo.payment.PaymentController;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.LinkBuilder;
import org.springframework.hateoas.Resources;
import java.io.Serializable;
import java.util.List;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* The {@link PaymentEvents} is a hypermedia collection of {@link PaymentEvent} resources.
*
* @author kbastani
*/
public class PaymentEvents extends Resources<PaymentEvent> implements Serializable {
private Long paymentId;
/**
* Create a new {@link PaymentEvents} hypermedia resources collection for an {@link Payment}.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param content is the collection of {@link PaymentEvents} attached to the {@link Payment}
*/
public PaymentEvents(Long paymentId, List<PaymentEvent> content) {
this(content);
this.paymentId = paymentId;
// Add hypermedia links to resources parent
add(linkTo(PaymentController.class)
.slash("payments")
.slash(paymentId)
.slash("events")
.withSelfRel(),
linkTo(PaymentController.class)
.slash("payments")
.slash(paymentId)
.withRel("payment"));
LinkBuilder linkBuilder = linkTo(EventController.class);
// Add hypermedia links to each item of the collection
content.stream().parallel().forEach(event -> event.add(
linkBuilder.slash("events")
.slash(event.getEventId())
.withSelfRel()
));
}
/**
* Creates a {@link Resources} instance with the given content and {@link Link}s (optional).
*
* @param content must not be {@literal null}.
* @param links the links to be added to the {@link Resources}.
*/
private PaymentEvents(Iterable<PaymentEvent> content, Link... links) {
super(content, links);
}
/**
* Get the {@link Payment} identifier that the {@link PaymentEvents} apply to.
*
* @return the payment identifier
*/
@JsonIgnore
public Long getPaymentId() {
return paymentId;
}
}

View File

@@ -0,0 +1,106 @@
package demo.payment;
import com.fasterxml.jackson.annotation.JsonIgnore;
import demo.domain.BaseEntity;
import demo.event.PaymentEvent;
import javax.persistence.*;
import java.util.HashSet;
import java.util.Set;
/**
* The {@link Payment} domain object contains information related to
* a user's payment. The status of an payment is event sourced using
* events logged to the {@link PaymentEvent} collection attached to
* this resource.
*
* @author kbastani
*/
@Entity
public class Payment extends BaseEntity {
@Id
@GeneratedValue
private Long id;
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private Set<PaymentEvent> events = new HashSet<>();
@Enumerated(value = EnumType.STRING)
private PaymentStatus status;
private Double amount;
private Long orderId;
@Enumerated(value = EnumType.STRING)
private PaymentMethod paymentMethod;
public Payment() {
status = PaymentStatus.PAYMENT_CREATED;
}
public Payment(Double amount, PaymentMethod paymentMethod) {
this.amount = amount;
this.paymentMethod = paymentMethod;
}
@JsonIgnore
public Long getPaymentId() {
return id;
}
public void setPaymentId(Long id) {
this.id = id;
}
@JsonIgnore
public Set<PaymentEvent> getEvents() {
return events;
}
public void setEvents(Set<PaymentEvent> events) {
this.events = events;
}
public PaymentStatus getStatus() {
return status;
}
public void setStatus(PaymentStatus status) {
this.status = status;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public PaymentMethod getPaymentMethod() {
return paymentMethod;
}
public void setPaymentMethod(PaymentMethod paymentMethod) {
this.paymentMethod = paymentMethod;
}
@JsonIgnore
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
@Override
public String toString() {
return "Payment{" +
"id=" + id +
", events=" + events +
", status=" + status +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,13 @@
package demo.payment;
/**
* The {@link PaymentCommand} represents an action that can be performed to an
* {@link Payment} aggregate. Commands initiate an action that can mutate the state of
* an payment entity as it transitions between {@link PaymentStatus} values.
*
* @author kbastani
*/
public enum PaymentCommand {
CONNECT_ORDER,
PROCESS_PAYMENT
}

View File

@@ -0,0 +1,12 @@
package demo.payment;
import org.springframework.hateoas.ResourceSupport;
/**
* A hypermedia resource that describes the collection of commands that
* can be applied to a {@link Payment} aggregate.
*
* @author kbastani
*/
public class PaymentCommands extends ResourceSupport {
}

View File

@@ -0,0 +1,248 @@
package demo.payment;
import demo.event.PaymentEvent;
import demo.event.PaymentEvents;
import demo.event.EventController;
import demo.event.EventService;
import org.springframework.hateoas.LinkBuilder;
import org.springframework.hateoas.Resource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.*;
import java.util.Optional;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
@RestController
@RequestMapping("/v1")
public class PaymentController {
private final PaymentService paymentService;
private final EventService eventService;
public PaymentController(PaymentService paymentService, EventService eventService) {
this.paymentService = paymentService;
this.eventService = eventService;
}
@PostMapping(path = "/payments")
public ResponseEntity createPayment(@RequestBody Payment payment) {
return Optional.ofNullable(createPaymentResource(payment))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new RuntimeException("Payment creation failed"));
}
@PutMapping(path = "/payments/{id}")
public ResponseEntity updatePayment(@RequestBody Payment payment, @PathVariable Long id) {
return Optional.ofNullable(updatePaymentResource(id, payment))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("Payment update failed"));
}
@GetMapping(path = "/payments/{id}")
public ResponseEntity getPayment(@PathVariable Long id) {
return Optional.ofNullable(getPaymentResource(id))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElse(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@DeleteMapping(path = "/payments/{id}")
public ResponseEntity deletePayment(@PathVariable Long id) {
return Optional.ofNullable(paymentService.deletePayment(id))
.map(e -> new ResponseEntity<>(HttpStatus.NO_CONTENT))
.orElseThrow(() -> new RuntimeException("Payment deletion failed"));
}
@GetMapping(path = "/payments/{id}/events")
public ResponseEntity getPaymentEvents(@PathVariable Long id) {
return Optional.of(getPaymentEventResources(id))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("Could not get payment events"));
}
@PostMapping(path = "/payments/{id}/events")
public ResponseEntity createPayment(@PathVariable Long id, @RequestBody PaymentEvent event) {
return Optional.ofNullable(appendEventResource(id, event))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new RuntimeException("Append payment event failed"));
}
@GetMapping(path = "/payments/{id}/commands")
public ResponseEntity getPaymentCommands(@PathVariable Long id) {
return Optional.ofNullable(getCommandsResource(id))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The payment could not be found"));
}
@GetMapping(path = "/payments/{id}/commands/connectOrder")
public ResponseEntity connectOrder(@PathVariable Long id) {
return Optional.ofNullable(getPaymentResource(
paymentService.applyCommand(id, PaymentCommand.CONNECT_ORDER)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@GetMapping(path = "/payments/{id}/commands/processPayment")
public ResponseEntity processPayment(@PathVariable Long id) {
return Optional.ofNullable(getPaymentResource(
paymentService.applyCommand(id, PaymentCommand.PROCESS_PAYMENT)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
/**
* Retrieves a hypermedia resource for {@link Payment} with the specified identifier.
*
* @param id is the unique identifier for looking up the {@link Payment} entity
* @return a hypermedia resource for the fetched {@link Payment}
*/
private Resource<Payment> getPaymentResource(Long id) {
Resource<Payment> paymentResource = null;
// Get the payment for the provided id
Payment payment = paymentService.getPayment(id);
// If the payment exists, wrap the hypermedia response
if (payment != null)
paymentResource = getPaymentResource(payment);
return paymentResource;
}
/**
* Creates a new {@link Payment} entity and persists the result to the repository.
*
* @param payment is the {@link Payment} model used to create a new payment
* @return a hypermedia resource for the newly created {@link Payment}
*/
private Resource<Payment> createPaymentResource(Payment payment) {
Assert.notNull(payment, "Payment body must not be null");
// Create the new payment
payment = paymentService.registerPayment(payment);
return getPaymentResource(payment);
}
/**
* Update a {@link Payment} entity for the provided identifier.
*
* @param id is the unique identifier for the {@link Payment} update
* @param payment is the entity representation containing any updated {@link Payment} fields
* @return a hypermedia resource for the updated {@link Payment}
*/
private Resource<Payment> updatePaymentResource(Long id, Payment payment) {
return getPaymentResource(paymentService.updatePayment(id, payment));
}
/**
* Appends an {@link PaymentEvent} domain event to the event log of the {@link Payment}
* aggregate with the specified paymentId.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param event is the {@link PaymentEvent} that attempts to alter the state of the {@link Payment}
* @return a hypermedia resource for the newly appended {@link PaymentEvent}
*/
private Resource<PaymentEvent> appendEventResource(Long paymentId, PaymentEvent event) {
Resource<PaymentEvent> eventResource = null;
event = paymentService.appendEvent(paymentId, event);
if (event != null) {
eventResource = new Resource<>(event,
linkTo(EventController.class)
.slash("events")
.slash(event.getEventId())
.withSelfRel(),
linkTo(PaymentController.class)
.slash("payments")
.slash(paymentId)
.withRel("payment")
);
}
return eventResource;
}
/**
* Get the {@link PaymentCommand} hypermedia resource that lists the available commands that can be applied
* to an {@link Payment} entity.
*
* @param id is the {@link Payment} identifier to provide command links for
* @return an {@link PaymentCommands} with a collection of embedded command links
*/
private PaymentCommands getCommandsResource(Long id) {
// Get the payment resource for the identifier
Resource<Payment> paymentResource = getPaymentResource(id);
// Create a new payment commands hypermedia resource
PaymentCommands commandResource = new PaymentCommands();
// Add payment command hypermedia links
if (paymentResource != null) {
commandResource.add(
getCommandLinkBuilder(id)
.slash("connectOrder")
.withRel("connectOrder"),
getCommandLinkBuilder(id)
.slash("processPayment")
.withRel("processPayment")
);
}
return commandResource;
}
/**
* Get {@link PaymentEvents} for the supplied {@link Payment} identifier.
*
* @param id is the unique identifier of the {@link Payment}
* @return a list of {@link PaymentEvent} wrapped in a hypermedia {@link PaymentEvents} resource
*/
private PaymentEvents getPaymentEventResources(Long id) {
return new PaymentEvents(id, eventService.getPaymentEvents(id));
}
/**
* Generate a {@link LinkBuilder} for generating the {@link PaymentCommands}.
*
* @param id is the unique identifier for a {@link Payment}
* @return a {@link LinkBuilder} for the {@link PaymentCommands}
*/
private LinkBuilder getCommandLinkBuilder(Long id) {
return linkTo(PaymentController.class)
.slash("payments")
.slash(id)
.slash("commands");
}
/**
* Get a hypermedia enriched {@link Payment} entity.
*
* @param payment is the {@link Payment} to enrich with hypermedia links
* @return is a hypermedia enriched resource for the supplied {@link Payment} entity
*/
private Resource<Payment> getPaymentResource(Payment payment) {
Resource<Payment> paymentResource;
// Prepare hypermedia response
paymentResource = new Resource<>(payment,
linkTo(PaymentController.class)
.slash("payments")
.slash(payment.getPaymentId())
.withSelfRel(),
linkTo(PaymentController.class)
.slash("payments")
.slash(payment.getPaymentId())
.slash("events")
.withRel("events"),
getCommandLinkBuilder(payment.getPaymentId())
.withRel("commands")
);
return paymentResource;
}
}

View File

@@ -0,0 +1,5 @@
package demo.payment;
public enum PaymentMethod {
CREDIT_CARD
}

View File

@@ -0,0 +1,9 @@
package demo.payment;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
public interface PaymentRepository extends JpaRepository<Payment, Long> {
Payment findPaymentByOrderId(@Param("orderId") Long orderId);
}

View File

@@ -0,0 +1,174 @@
package demo.payment;
import demo.event.ConsistencyModel;
import demo.event.EventService;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.Objects;
/**
* The {@link PaymentService} provides transactional support for managing {@link Payment}
* entities. This service also provides event sourcing support for {@link PaymentEvent}.
* Events can be appended to an {@link Payment}, which contains a append-only log of
* actions that can be used to support remediation for distributed transactions that encountered
* a partial failure.
*
* @author kbastani
*/
@Service
@CacheConfig(cacheNames = {"payments"})
public class PaymentService {
private final PaymentRepository paymentRepository;
private final EventService eventService;
private final CacheManager cacheManager;
public PaymentService(PaymentRepository paymentRepository, EventService eventService, CacheManager cacheManager) {
this.paymentRepository = paymentRepository;
this.eventService = eventService;
this.cacheManager = cacheManager;
}
@CacheEvict(cacheNames = "payments", key = "#payment.getPaymentId().toString()")
public Payment registerPayment(Payment payment) {
payment = createPayment(payment);
cacheManager.getCache("payments")
.evict(payment.getPaymentId());
// Trigger the payment creation event
PaymentEvent event = appendEvent(payment.getPaymentId(),
new PaymentEvent(PaymentEventType.PAYMENT_CREATED));
// Attach payment identifier
event.getPayment().setPaymentId(payment.getPaymentId());
// Return the result
return event.getPayment();
}
/**
* Create a new {@link Payment} entity.
*
* @param payment is the {@link Payment} to create
* @return the newly created {@link Payment}
*/
@CacheEvict(cacheNames = "payments", key = "#payment.getPaymentId().toString()")
public Payment createPayment(Payment payment) {
// Save the payment to the repository
payment = paymentRepository.saveAndFlush(payment);
return payment;
}
/**
* Get an {@link Payment} entity for the supplied identifier.
*
* @param id is the unique identifier of a {@link Payment} entity
* @return an {@link Payment} entity
*/
@Cacheable(cacheNames = "payments", key = "#id.toString()")
public Payment getPayment(Long id) {
return paymentRepository.findOne(id);
}
/**
* Update an {@link Payment} entity with the supplied identifier.
*
* @param id is the unique identifier of the {@link Payment} entity
* @param payment is the {@link Payment} containing updated fields
* @return the updated {@link Payment} entity
*/
@CachePut(cacheNames = "payments", key = "#id.toString()")
public Payment updatePayment(Long id, Payment payment) {
Assert.notNull(id, "Payment id must be present in the resource URL");
Assert.notNull(payment, "Payment request body cannot be null");
if (payment.getPaymentId() != null) {
Assert.isTrue(Objects.equals(id, payment.getPaymentId()),
"The payment id in the request body must match the resource URL");
} else {
payment.setPaymentId(id);
}
Assert.state(paymentRepository.exists(id),
"The payment with the supplied id does not exist");
Payment currentPayment = paymentRepository.findOne(id);
currentPayment.setStatus(payment.getStatus());
return paymentRepository.save(currentPayment);
}
/**
* Delete the {@link Payment} with the supplied identifier.
*
* @param id is the unique identifier for the {@link Payment}
*/
@CacheEvict(cacheNames = "payments", key = "#id.toString()")
public Boolean deletePayment(Long id) {
Assert.state(paymentRepository.exists(id),
"The payment with the supplied id does not exist");
this.paymentRepository.delete(id);
return true;
}
/**
* Append a new {@link PaymentEvent} to the {@link Payment} reference for the supplied identifier.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param event is the {@link PaymentEvent} to append to the {@link Payment} entity
* @return the newly appended {@link PaymentEvent}
*/
public PaymentEvent appendEvent(Long paymentId, PaymentEvent event) {
return appendEvent(paymentId, event, ConsistencyModel.ACID);
}
/**
* Append a new {@link PaymentEvent} to the {@link Payment} reference for the supplied identifier.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param event is the {@link PaymentEvent} to append to the {@link Payment} entity
* @return the newly appended {@link PaymentEvent}
*/
public PaymentEvent appendEvent(Long paymentId, PaymentEvent event, ConsistencyModel consistencyModel) {
Payment payment = getPayment(paymentId);
Assert.notNull(payment, "The payment with the supplied id does not exist");
event.setPayment(payment);
event = eventService.createEvent(paymentId, event);
payment.getEvents().add(event);
paymentRepository.saveAndFlush(payment);
eventService.raiseEvent(event, consistencyModel);
return event;
}
/**
* Apply an {@link PaymentCommand} to the {@link Payment} with a specified identifier.
*
* @param id is the unique identifier of the {@link Payment}
* @param paymentCommand is the command to apply to the {@link Payment}
* @return a hypermedia resource containing the updated {@link Payment}
*/
@CachePut(cacheNames = "payments", key = "#id.toString()")
public Payment applyCommand(Long id, PaymentCommand paymentCommand) {
Payment payment = getPayment(id);
Assert.notNull(payment, "The payment for the supplied id could not be found");
PaymentStatus status = payment.getStatus();
// TODO: Implement
return payment;
}
}

View File

@@ -0,0 +1,16 @@
package demo.payment;
/**
* The {@link PaymentStatus} describes the state of an {@link Payment}.
* The aggregate state of a {@link Payment} is sourced from attached domain
* events in the form of {@link demo.event.PaymentEvent}.
*
* @author kbastani
*/
public enum PaymentStatus {
PAYMENT_CREATED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,
PAYMENT_SUCCEEDED
}

View File

@@ -0,0 +1,17 @@
spring:
profiles:
active: development
---
spring:
profiles: development
cloud:
stream:
bindings:
output:
destination: payment
contentType: 'application/json'
redis:
host: localhost
port: 6379
server:
port: 8080

View File

@@ -0,0 +1,4 @@
spring:
application:
name: payment-web
---

View File

@@ -0,0 +1,43 @@
package demo.payment;
import demo.event.EventService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import static org.mockito.BDDMockito.given;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@RunWith(SpringRunner.class)
@WebMvcTest(PaymentController.class)
public class PaymentControllerTest {
@Autowired
private MockMvc mvc;
@MockBean
private PaymentService paymentService;
@MockBean
private EventService eventService;
@Test
public void getUserPaymentResourceShouldReturnPayment() throws Exception {
String content = "{\"paymentMethod\": \"CREDIT_CARD\", \"amount\": 42.0 }";
Payment payment = new Payment(42.0, PaymentMethod.CREDIT_CARD);
given(this.paymentService.getPayment(1L))
.willReturn(payment);
this.mvc.perform(get("/v1/payments/1").accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk()).andExpect(content().json(content));
}
}

View File

@@ -0,0 +1,45 @@
package demo.payment;
import demo.event.EventService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cache.CacheManager;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
@RunWith(SpringRunner.class)
public class PaymentServiceTests {
@MockBean
private EventService eventService;
@MockBean
private PaymentRepository paymentRepository;
@MockBean
private CacheManager cacheManager;
private PaymentService paymentService;
@Before
public void before() {
paymentService = new PaymentService(paymentRepository, eventService, cacheManager);
}
@Test
public void getPaymentReturnsPayment() throws Exception {
Payment expected = new Payment(42.0, PaymentMethod.CREDIT_CARD);
given(this.paymentRepository.findOne(1L)).willReturn(expected);
Payment actual = paymentService.getPayment(1L);
assertThat(actual).isNotNull();
assertThat(actual.getPaymentMethod()).isEqualTo(PaymentMethod.CREDIT_CARD);
assertThat(actual.getAmount()).isEqualTo(42.0);
}
}

View File

@@ -0,0 +1 @@
INSERT INTO PAYMENT(ID, FIRST_NAME, LAST_NAME, EMAIL) values (1, 'John', 'Doe', 'john.doe@example.com');

View File

@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>payment-worker</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>payment-worker</name>
<parent>
<groupId>org.kbastani</groupId>
<artifactId>payment</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>${spring-statemachine-core.version}</version>
</dependency>
<dependency>
<groupId>org.kbastani</groupId>
<artifactId>spring-boot-starter-aws-lambda</artifactId>
<version>${spring-boot-starter-aws-lambda.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk-sts.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>${json-path.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>${aws-java-sdk-sts.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,16 @@
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableHypermediaSupport;
import org.springframework.hateoas.config.EnableHypermediaSupport.HypermediaType;
@SpringBootApplication
@EnableHypermediaSupport(type = {HypermediaType.HAL})
public class PaymentStreamModuleApplication {
public static void main(String[] args) {
SpringApplication.run(PaymentStreamModuleApplication.class, args);
}
}

View File

@@ -0,0 +1,25 @@
package demo.config;
import amazon.aws.AWSLambdaConfigurerAdapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import demo.function.LambdaFunctions;
import demo.util.LambdaUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Configuration
@Profile("cloud")
public class AwsLambdaConfig {
@Bean
public LambdaFunctions lambdaInvoker(AWSLambdaConfigurerAdapter configurerAdapter) {
return configurerAdapter
.getFunctionInstance(LambdaFunctions.class);
}
@Bean
public LambdaUtil lambdaUtil(ObjectMapper objectMapper) {
return new LambdaUtil(objectMapper);
}
}

View File

@@ -0,0 +1,220 @@
package demo.config;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.function.*;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.action.Action;
import org.springframework.statemachine.config.EnableStateMachineFactory;
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import java.net.URI;
import java.util.EnumSet;
/**
* A configuration adapter for describing a {@link StateMachine} factory that maps actions to functional
* expressions. Actions are executed during transitions between a source state and a target state.
* <p>
* A state machine provides a robust declarative language for describing the state of an {@link Payment}
* resource given a sequence of ordered {@link demo.event.PaymentEvents}. When an event is received
* in {@link demo.event.PaymentEventStream}, an in-memory state machine is fully replicated given the
* {@link demo.event.PaymentEvents} attached to an {@link Payment} resource.
*
* @author kbastani
*/
@Configuration
@EnableStateMachineFactory
public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<PaymentStatus, PaymentEventType> {
final private Logger log = Logger.getLogger(StateMachineConfig.class);
/**
* Configures the initial conditions of a new in-memory {@link StateMachine} for {@link Payment}.
*
* @param states is the {@link StateMachineStateConfigurer} used to describe the initial condition
*/
@Override
public void configure(StateMachineStateConfigurer<PaymentStatus, PaymentEventType> states) {
try {
// Describe the initial condition of the payment state machine
states.withStates()
.initial(PaymentStatus.PAYMENT_CREATED)
.states(EnumSet.allOf(PaymentStatus.class));
} catch (Exception e) {
throw new RuntimeException("State machine configuration failed", e);
}
}
/**
* Configures the {@link StateMachine} that describes how {@link PaymentEventType} drives the state
* of an {@link Payment}. Events are applied as transitions from a source {@link PaymentStatus} to
* a target {@link PaymentStatus}. An {@link Action} is attached to each transition, which maps to a
* function that is executed in the context of an {@link PaymentEvent}.
*
* @param transitions is the {@link StateMachineTransitionConfigurer} used to describe state transitions
*/
@Override
public void configure(StateMachineTransitionConfigurer<PaymentStatus, PaymentEventType> transitions) {
try {
// Describe state machine transitions for payments
transitions.withExternal()
.source(PaymentStatus.PAYMENT_CREATED)
.target(PaymentStatus.PAYMENT_CREATED)
.event(PaymentEventType.PAYMENT_CREATED)
.action(paymentCreated())
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_CREATED)
.target(PaymentStatus.PAYMENT_PENDING)
.event(PaymentEventType.PAYMENT_PENDING)
.action(paymentPending())
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_PENDING)
.target(PaymentStatus.PAYMENT_PROCESSED)
.event(PaymentEventType.PAYMENT_PROCESSED)
.action(paymentProcessed())
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_PROCESSED)
.target(PaymentStatus.PAYMENT_SUCCEEDED)
.event(PaymentEventType.PAYMENT_SUCCEEDED)
.action(paymentSucceeded())
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_PROCESSED)
.target(PaymentStatus.PAYMENT_FAILED)
.event(PaymentEventType.PAYMENT_FAILED)
.action(paymentFailed());
} catch (Exception e) {
throw new RuntimeException("Could not configure state machine transitions", e);
}
}
/**
* Functions are mapped to actions that are triggered during the replication of a state machine. Functions
* should only be executed after the state machine has completed replication. This method checks the state
* context of the machine for an {@link PaymentEvent}, which signals that the state machine is finished
* replication.
* <p>
* The {@link PaymentFunction} argument is only applied if an {@link PaymentEvent} is provided as a
* message header in the {@link StateContext}.
*
* @param context is the state machine context that may include an {@link PaymentEvent}
* @param paymentFunction is the payment function to apply after the state machine has completed replication
* @return an {@link PaymentEvent} only if this event has not yet been processed, otherwise returns null
*/
private PaymentEvent applyEvent(StateContext<PaymentStatus, PaymentEventType> context, PaymentFunction paymentFunction) {
PaymentEvent paymentEvent = null;
// Log out the progress of the state machine replication
log.info("Replicate event: " + context.getMessage().getPayload());
// The machine is finished replicating when an PaymentEvent is found in the message header
if (context.getMessageHeader("event") != null) {
paymentEvent = (PaymentEvent) context.getMessageHeader("event");
log.info("State machine replicated: " + paymentEvent.getType());
// Apply the provided function to the PaymentEvent
paymentFunction.apply(paymentEvent);
}
return paymentEvent;
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentCreated() {
return context -> applyEvent(context,
new PaymentCreated(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentPending() {
return context -> applyEvent(context,
new PaymentPending(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentProcessed() {
return context -> applyEvent(context,
new PaymentProcessed(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentSucceeded() {
return context -> applyEvent(context,
new PaymentSucceeded(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentFailed() {
return context -> applyEvent(context,
new PaymentFailed(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
}));
}
}

View File

@@ -0,0 +1,36 @@
package demo.domain;
import org.springframework.hateoas.ResourceSupport;
public class BaseEntity extends ResourceSupport {
private Long createdAt;
private Long lastModified;
public BaseEntity() {
}
public Long getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Long createdAt) {
this.createdAt = createdAt;
}
public Long getLastModified() {
return lastModified;
}
public void setLastModified(Long lastModified) {
this.lastModified = lastModified;
}
@Override
public String toString() {
return "BaseEntity{" +
"createdAt=" + createdAt +
", lastModified=" + lastModified +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,28 @@
package demo.event;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Optional;
@RestController
@RequestMapping("/v1")
public class EventController {
private EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
@PostMapping(path = "/events")
public ResponseEntity handleEvent(@RequestBody PaymentEvent event) {
return Optional.ofNullable(eventService.apply(event))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new RuntimeException("Apply event failed"));
}
}

View File

@@ -0,0 +1,82 @@
package demo.event;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import demo.state.StateMachineService;
import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@Service
public class EventService {
final private Logger log = Logger.getLogger(EventService.class);
final private StateMachineService stateMachineService;
public EventService(StateMachineService stateMachineService) {
this.stateMachineService = stateMachineService;
}
public Payment apply(PaymentEvent paymentEvent) {
Payment result;
log.info("Payment event received: " + paymentEvent.getLink("self").getHref());
// Generate a state machine for computing the state of the payment resource
StateMachine<PaymentStatus, PaymentEventType> stateMachine =
stateMachineService.getStateMachine();
// Follow the hypermedia link to fetch the attached payment
Traverson traverson = new Traverson(
URI.create(paymentEvent.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
// Get the event log for the attached payment resource
PaymentEvents events = traverson.follow("events")
.toEntity(PaymentEvents.class)
.getBody();
// Prepare payment event message headers
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("event", paymentEvent);
// Replicate the current state of the payment resource
events.getContent()
.stream()
.sorted((a1, a2) -> a1.getCreatedAt().compareTo(a2.getCreatedAt()))
.forEach(e -> {
MessageHeaders headers = new MessageHeaders(null);
// Check to see if this is the current event
if (e.getLink("self").equals(paymentEvent.getLink("self"))) {
headers = new MessageHeaders(headerMap);
}
// Send the event to the state machine
stateMachine.sendEvent(MessageBuilder.createMessage(e.getType(), headers));
});
// Get result
Map<Object, Object> context = stateMachine.getExtendedState()
.getVariables();
// Get the payment result
result = (Payment) context.getOrDefault("payment", null);
// Destroy the state machine
stateMachine.stop();
return result;
}
}

View File

@@ -0,0 +1,41 @@
package demo.event;
import demo.payment.Payment;
import demo.domain.BaseEntity;
/**
* The domain event {@link PaymentEvent} tracks the type and state of events as
* applied to the {@link Payment} domain object. This event resource can be used
* to event source the aggregate state of {@link Payment}.
* <p>
* This event resource also provides a transaction log that can be used to append
* actions to the event.
*
* @author kbastani
*/
public class PaymentEvent extends BaseEntity {
private PaymentEventType type;
public PaymentEvent() {
}
public PaymentEvent(PaymentEventType type) {
this.type = type;
}
public PaymentEventType getType() {
return type;
}
public void setType(PaymentEventType type) {
this.type = type;
}
@Override
public String toString() {
return "PaymentEvent{" +
"type=" + type +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,40 @@
package demo.event;
import demo.payment.Payment;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Profile;
import org.springframework.statemachine.StateMachine;
/**
* The {@link PaymentEventStream} monitors for a variety of {@link PaymentEvent} domain
* events for an {@link Payment}.
*
* @author kbastani
*/
@EnableAutoConfiguration
@EnableBinding(Sink.class)
@Profile({ "cloud", "development" })
public class PaymentEventStream {
private EventService eventService;
public PaymentEventStream(EventService eventService) {
this.eventService = eventService;
}
/**
* Listens to a stream of incoming {@link PaymentEvent} messages. For each
* new message received, replicate an in-memory {@link StateMachine} that
* reproduces the current state of the {@link Payment} resource that is the
* subject of the {@link PaymentEvent}.
*
* @param paymentEvent is the {@link Payment} domain event to process
*/
@StreamListener(Sink.INPUT)
public void streamListerner(PaymentEvent paymentEvent) {
eventService.apply(paymentEvent);
}
}

View File

@@ -0,0 +1,18 @@
package demo.event;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
/**
* The {@link PaymentEventType} represents a collection of possible events that describe
* state transitions of {@link PaymentStatus} on the {@link Payment} aggregate.
*
* @author kbastani
*/
public enum PaymentEventType {
PAYMENT_CREATED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,
PAYMENT_SUCCEEDED
}

View File

@@ -0,0 +1,11 @@
package demo.event;
import org.springframework.hateoas.Resources;
/**
* The {@link PaymentEvents} is a hypermedia collection of {@link PaymentEvent} resources.
*
* @author kbastani
*/
public class PaymentEvents extends Resources<PaymentEvent> {
}

View File

@@ -0,0 +1,5 @@
package demo.function;
public interface LambdaFunctions {
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentCreated extends PaymentFunction {
final private Logger log = Logger.getLogger(PaymentCreated.class);
public PaymentCreated(StateContext<PaymentStatus, PaymentEventType> context, Function<PaymentEvent, Payment> lambda) {
super(context, lambda);
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
@Override
public Payment apply(PaymentEvent event) {
log.info("Executing workflow for payment created...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentFailed extends PaymentFunction {
final private Logger log = Logger.getLogger(PaymentFailed.class);
public PaymentFailed(StateContext<PaymentStatus, PaymentEventType> context, Function<PaymentEvent, Payment> lambda) {
super(context, lambda);
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
@Override
public Payment apply(PaymentEvent event) {
log.info("Executing workflow for payment failed...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,51 @@
package demo.function;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link PaymentFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.payment.Payment} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public abstract class PaymentFunction {
final private Logger log = Logger.getLogger(PaymentFunction.class);
final protected StateContext<PaymentStatus, PaymentEventType> context;
final protected Function<PaymentEvent, Payment> lambda;
/**
* Create a new instance of a class that extends {@link PaymentFunction}, supplying
* a state context and a lambda function used to apply {@link PaymentEvent} to a provided
* action.
*
* @param context is the {@link StateContext} for a replicated state machine
* @param lambda is the lambda function describing an action that consumes an {@link PaymentEvent}
*/
public PaymentFunction(StateContext<PaymentStatus, PaymentEventType> context,
Function<PaymentEvent, Payment> lambda) {
this.context = context;
this.lambda = lambda;
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
public Payment apply(PaymentEvent event) {
// Execute the lambda function
Payment result = lambda.apply(event);
context.getExtendedState().getVariables().put("payment", result);
return result;
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentPending extends PaymentFunction {
final private Logger log = Logger.getLogger(PaymentPending.class);
public PaymentPending(StateContext<PaymentStatus, PaymentEventType> context, Function<PaymentEvent, Payment> lambda) {
super(context, lambda);
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
@Override
public Payment apply(PaymentEvent event) {
log.info("Executing workflow for payment pending...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentProcessed extends PaymentFunction {
final private Logger log = Logger.getLogger(PaymentProcessed.class);
public PaymentProcessed(StateContext<PaymentStatus, PaymentEventType> context, Function<PaymentEvent, Payment> lambda) {
super(context, lambda);
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
@Override
public Payment apply(PaymentEvent event) {
log.info("Executing workflow for payment processed...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class PaymentSucceeded extends PaymentFunction {
final private Logger log = Logger.getLogger(PaymentSucceeded.class);
public PaymentSucceeded(StateContext<PaymentStatus, PaymentEventType> context, Function<PaymentEvent, Payment> lambda) {
super(context, lambda);
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
@Override
public Payment apply(PaymentEvent event) {
log.info("Executing workflow for payment succeeded...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,46 @@
package demo.payment;
import demo.domain.BaseEntity;
public class Payment extends BaseEntity {
private Double amount;
private PaymentMethod paymentMethod;
private PaymentStatus status;
public Payment() {
}
public PaymentStatus getStatus() {
return status;
}
public void setStatus(PaymentStatus status) {
this.status = status;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public PaymentMethod getPaymentMethod() {
return paymentMethod;
}
public void setPaymentMethod(PaymentMethod paymentMethod) {
this.paymentMethod = paymentMethod;
}
@Override
public String toString() {
return "Payment{" +
"amount=" + amount +
", paymentMethod=" + paymentMethod +
", status=" + status +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,5 @@
package demo.payment;
public enum PaymentMethod {
CREDIT_CARD
}

View File

@@ -0,0 +1,16 @@
package demo.payment;
/**
* The {@link PaymentStatus} describes the state of an {@link Payment}.
* The aggregate state of a {@link Payment} is sourced from attached domain
* events in the form of {@link demo.event.PaymentEvent}.
*
* @author kbastani
*/
public enum PaymentStatus {
PAYMENT_CREATED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,
PAYMENT_SUCCEEDED
}

View File

@@ -0,0 +1,42 @@
package demo.state;
import demo.payment.PaymentStatus;
import demo.event.PaymentEventType;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.config.StateMachineFactory;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* The {@link StateMachineService} provides factory access to get new state machines for
* replicating the state of an {@link demo.payment.Payment} from {@link demo.event.PaymentEvents}.
*
* @author kbastani
*/
@Service
public class StateMachineService {
private final StateMachineFactory<PaymentStatus, PaymentEventType> factory;
public StateMachineService(StateMachineFactory<PaymentStatus, PaymentEventType> factory) {
this.factory = factory;
}
/**
* Create a new state machine that is initially configured and ready for replicating
* the state of an {@link demo.payment.Payment} from a sequence of {@link demo.event.PaymentEvent}.
*
* @return a new instance of {@link StateMachine}
*/
public StateMachine<PaymentStatus, PaymentEventType> getStateMachine() {
// Create a new state machine in its initial state
StateMachine<PaymentStatus, PaymentEventType> stateMachine =
factory.getStateMachine(UUID.randomUUID().toString());
// Start the new state machine
stateMachine.start();
return stateMachine;
}
}

View File

@@ -0,0 +1,29 @@
package demo.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
@Component
public class LambdaUtil {
private ObjectMapper objectMapper;
public LambdaUtil(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public HashMap objectToMap(Object object) {
HashMap result = null;
try {
result = objectMapper.readValue(objectMapper.writeValueAsString(object), HashMap.class);
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}

View File

@@ -0,0 +1,24 @@
spring:
profiles:
active: development
---
spring:
profiles: development
cloud:
stream:
bindings:
input:
destination: payment
group: payment-group
contentType: 'application/json'
consumer:
durableSubscription: true
server:
port: 8081
amazon:
aws:
access-key-id: replace
access-key-secret: replace
---
spring:
profiles: test

View File

@@ -0,0 +1,4 @@
spring:
application:
name: payment-worker
---

View File

@@ -0,0 +1,18 @@
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@ActiveProfiles("test")
public class PaymentStreamModuleApplicationTests {
@Test
public void contextLoads() {
}
}

24
payment/pom.xml Normal file
View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>payment</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>payment</name>
<parent>
<groupId>org.kbastani</groupId>
<artifactId>event-stream-processing-microservices</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<modules>
<module>payment-web</module>
<module>payment-worker</module>
</modules>
</project>

View File

@@ -29,6 +29,7 @@
<module>spring-boot-starters</module>
<module>account</module>
<module>order</module>
<module>payment</module>
</modules>
<dependencies>