Last git commit of 2016

This commit is contained in:
Kenny Bastani
2016-12-31 15:06:11 -05:00
parent 9b00cb9f8e
commit 3f42f42368
67 changed files with 469 additions and 297 deletions

View File

@@ -29,7 +29,7 @@ public class ActivateAccount extends Action<Account> {
Assert.isTrue(Arrays.asList(ACCOUNT_CONFIRMED, ACCOUNT_SUSPENDED, ACCOUNT_ARCHIVED)
.contains(account.getStatus()), "The account cannot be activated");
AccountService accountService = account.getProvider(AccountModule.class)
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
// Activate the account

View File

@@ -26,7 +26,7 @@ public class ArchiveAccount extends Action<Account> {
return (account) -> {
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be archived");
AccountService accountService = account.getProvider(AccountModule.class)
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
// Archive the account

View File

@@ -26,7 +26,7 @@ public class ConfirmAccount extends Action<Account> {
return (account) -> {
Assert.isTrue(account.getStatus() == ACCOUNT_PENDING, "The account has already been confirmed");
AccountService accountService = account.getProvider(AccountModule.class)
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
// Confirm the account

View File

@@ -26,7 +26,7 @@ public class SuspendAccount extends Action<Account> {
return (account) -> {
Assert.isTrue(account.getStatus() == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");
AccountService accountService = account.getProvider(AccountModule.class)
AccountService accountService = account.getModule(AccountModule.class)
.getDefaultService();
// Suspend the account

View File

@@ -140,9 +140,9 @@ public class Account extends AbstractEntity<AccountEvent, Long> {
*/
@Override
@SuppressWarnings("unchecked")
public <T extends Module<A>, A extends Aggregate<AccountEvent, Long>> T getProvider() throws
public <T extends Module<A>, A extends Aggregate<AccountEvent, Long>> T getModule() throws
IllegalArgumentException {
AccountModule accountProvider = getProvider(AccountModule.class);
AccountModule accountProvider = getModule(AccountModule.class);
return (T) accountProvider;
}

View File

@@ -1,5 +1,6 @@
package demo.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
@@ -24,6 +25,7 @@ public class WebMvcConfig extends WebMvcConfigurerAdapter {
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
converter.setObjectMapper(objectMapper);
converters.add(converter);
}

View File

@@ -76,7 +76,7 @@ public class Order extends Aggregate<OrderEvent, Long> {
}
public Order post() {
OrderModule orderProvider = getProvider();
OrderModule orderProvider = getModule();
return orderProvider.getDefaultService()
.create(this);
}
@@ -115,9 +115,9 @@ public class Order extends Aggregate<OrderEvent, Long> {
*/
@Override
@SuppressWarnings("unchecked")
public <T extends Module<A>, A extends Aggregate<OrderEvent, Long>> T getProvider() throws
public <T extends Module<A>, A extends Aggregate<OrderEvent, Long>> T getModule() throws
IllegalArgumentException {
OrderModule orderProvider = getProvider(OrderModule.class);
OrderModule orderProvider = getModule(OrderModule.class);
return (T) orderProvider;
}

View File

@@ -4,6 +4,8 @@ spring:
---
spring:
profiles: development
jackson:
default-property-inclusion: non_null
cloud:
stream:
bindings:

View File

@@ -13,8 +13,10 @@ spring:
contentType: 'application/json'
consumer:
durableSubscription: true
jackson:
default-property-inclusion: non_null
server:
port: 8081
port: 0
amazon:
aws:
access-key-id: replace

View File

@@ -1,5 +1,6 @@
package demo.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
@@ -24,6 +25,7 @@ public class WebMvcConfig extends WebMvcConfigurerAdapter {
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
converter.setObjectMapper(objectMapper);
converters.add(converter);
}

View File

@@ -21,7 +21,7 @@ public class ConnectAccount extends Action<Order> {
public BiConsumer<Order, Long> getConsumer() {
return (order, accountId) -> {
OrderService orderService = order.getProvider(OrderModule.class)
OrderService orderService = order.getModule(OrderModule.class)
.getDefaultService();
// Connect the account

View File

@@ -22,7 +22,7 @@ public class ConnectPayment extends Action<Order> {
public BiConsumer<Order, Long> getConsumer() {
return (order, paymentId) -> {
OrderService orderService = order.getProvider(OrderModule.class)
OrderService orderService = order.getModule(OrderModule.class)
.getDefaultService();
// Connect the account

View File

@@ -42,7 +42,7 @@ public class CreatePayment extends Action<Order> {
Assert.isTrue(order.getPaymentId() == null, "Payment has already been created");
Assert.isTrue(order.getStatus() == OrderStatus.ACCOUNT_CONNECTED, "Account must be connected first");
OrderService orderService = order.getProvider(OrderModule.class)
OrderService orderService = order.getModule(OrderModule.class)
.getDefaultService();
Payment payment = new Payment();

View File

@@ -0,0 +1,40 @@
package demo.order.action;
import demo.domain.Action;
import demo.order.domain.Order;
import demo.order.domain.OrderModule;
import demo.payment.domain.Payment;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.function.Consumer;
/**
* Processes a {@link Payment} for an {@link Order}.
*
* @author Kenny Bastani
*/
@Service
public class DeleteOrder extends Action<Order> {
private RestTemplate restTemplate;
public DeleteOrder(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
public Consumer<Order> getConsumer() {
return (order) -> {
// Delete payment
if (order.getPaymentId() != null) {
String href = "http://payment-web/v1/payments/" + order.getPaymentId();
restTemplate.delete(href);
}
// Delete order
order.getModule(OrderModule.class)
.getDefaultService()
.delete(order.getIdentity());
};
}
}

View File

@@ -52,14 +52,14 @@ public class OrderController {
@RequestMapping(path = "/orders/{id}")
public ResponseEntity getOrder(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(id))
return Optional.ofNullable(getOrderResource(orderService.get(id)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElse(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@DeleteMapping(path = "/orders/{id}")
public ResponseEntity deleteOrder(@PathVariable Long id) {
return Optional.ofNullable(orderService.delete(id))
return Optional.ofNullable(orderService.get(id).delete())
.map(e -> new ResponseEntity<>(HttpStatus.NO_CONTENT))
.orElseThrow(() -> new RuntimeException("Order deletion failed"));
}
@@ -132,19 +132,6 @@ public class OrderController {
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
/**
* Retrieves a hypermedia resource for {@link Order} with the specified identifier.
*
* @param id is the unique identifier for looking up the {@link Order} entity
* @return a hypermedia resource for the fetched {@link Order}
*/
private Resource<Order> getOrderResource(Long id) {
// Get the order for the provided id
Order order = orderService.get(id);
return getOrderResource(order);
}
/**
* Creates a new {@link Order} entity and persists the result to the repository.
*
@@ -225,7 +212,7 @@ public class OrderController {
* @return is a hypermedia enriched resource for the supplied {@link Order} entity
*/
private Resource<Order> getOrderResource(Order order) {
Assert.notNull(order, "Order must not be null");
if(order == null) return null;
// Add command link
order.add(linkBuilder("getCommands", order.getIdentity()).withRel("commands"));

View File

@@ -135,6 +135,14 @@ public class Order extends AbstractEntity<OrderEvent, Long> {
return this;
}
public boolean delete() {
getAction(DeleteOrder.class)
.getConsumer()
.accept(this);
return true;
}
@JsonIgnore
public Double calculateTotal() {
return getLineItems()

View File

@@ -10,6 +10,8 @@ spring:
output:
destination: order
contentType: 'application/json'
jackson:
default-property-inclusion: non_null
server:
port: 0
events:

View File

@@ -1,12 +1,13 @@
package demo.config;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.function.*;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.payment.Payment;
import demo.stream.OrderStream;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import demo.order.event.OrderEvents;
import demo.payment.domain.Payment;
import demo.order.event.OrderEventProcessor;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -30,9 +31,9 @@ import java.util.Map;
* expressions. Actions are executed during transitions between a source state and a target state.
* <p>
* A state machine provides a robust declarative language for describing the state of an {@link Order}
* resource given a sequence of ordered {@link demo.event.OrderEvents}. When an event is received
* in {@link OrderStream}, an in-memory state machine is fully replicated given the
* {@link demo.event.OrderEvents} attached to an {@link Order} resource.
* resource given a sequence of ordered {@link OrderEvents}. When an event is received
* in {@link OrderEventProcessor}, an in-memory state machine is fully replicated given the
* {@link OrderEvents} attached to an {@link Order} resource.
*
* @author kbastani
*/
@@ -50,7 +51,6 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
@Override
public void configure(StateMachineStateConfigurer<OrderStatus, OrderEventType> states) {
try {
// Describe the initial condition of the order state machine
states.withStates()
.initial(OrderStatus.ORDER_CREATED)
.states(EnumSet.allOf(OrderStatus.class));
@@ -59,6 +59,32 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
}
}
/**
* Functions are mapped to actions that are triggered during the replication of a state machine. Functions
* should only be executed after the state machine has completed replication. This method checks the state
* context of the machine for an {@link OrderEvent}, which signals that the state machine is finished
* replication.
* <p>
* The {@link OrderFunction} argument is only applied if an {@link OrderEvent} is provided as a
* message header in the {@link StateContext}.
*
* @param context is the state machine context that may include an {@link OrderEvent}
* @param orderFunction is the order function to apply after the state machine has completed replication
* @return an {@link OrderEvent} only if this event has not yet been processed, otherwise returns null
*/
private OrderEvent applyEvent(StateContext<OrderStatus, OrderEventType> context, OrderFunction orderFunction) {
OrderEvent event = null;
log.info(String.format("Replicate event: %s", context.getMessage().getPayload()));
if (context.getMessageHeader("event") != null) {
event = context.getMessageHeaders().get("event", OrderEvent.class);
log.info(String.format("State replication complete: %s", event.getType()));
orderFunction.apply(event);
}
return event;
}
/**
* Configures the {@link StateMachine} that describes how {@link OrderEventType} drives the state
* of an {@link Order}. Events are applied as transitions from a source {@link OrderStatus} to
@@ -253,20 +279,26 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
MediaTypes.HAL_JSON
);
Payment payment = paymentResource.follow("self")
.toEntity(Payment.class)
.getBody();
Order order = orderResource.follow("self")
.toEntity(Order.class)
.getBody();
.toObject(Order.class);
Map<String, Object> template = new HashMap<String, Object>();
Map<String, Object> template = new HashMap<>();
template.put("orderId", order.getIdentity());
// Connect payment to order
Payment payment = paymentResource.follow("self", "commands", "connectOrder")
.withTemplateParameters(template)
.toObject(Payment.class);
template = new HashMap<>();
template.put("paymentId", payment.getPaymentId());
return orderResource.follow("commands", "connectPayment")
// Connect order to payment
order = orderResource.follow("commands", "connectPayment")
.withTemplateParameters(template)
.toObject(Order.class);
return order;
}));
}
@@ -321,36 +353,6 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
}));
}
/**
* Functions are mapped to actions that are triggered during the replication of a state machine. Functions
* should only be executed after the state machine has completed replication. This method checks the state
* context of the machine for an {@link OrderEvent}, which signals that the state machine is finished
* replication.
* <p>
* The {@link OrderFunction} argument is only applied if an {@link OrderEvent} is provided as a
* message header in the {@link StateContext}.
*
* @param context is the state machine context that may include an {@link OrderEvent}
* @param orderFunction is the order function to apply after the state machine has completed replication
* @return an {@link OrderEvent} only if this event has not yet been processed, otherwise returns null
*/
private OrderEvent applyEvent(StateContext<OrderStatus, OrderEventType> context,
OrderFunction orderFunction) {
OrderEvent orderEvent = null;
// Log out the progress of the state machine replication
log.info("Replicate event: " + context.getMessage().getPayload());
// The machine is finished replicating when an OrderEvent is found in the message header
if (context.getMessageHeader("event") != null) {
orderEvent = (OrderEvent) context.getMessageHeader("event");
log.info("State machine replicated: " + orderEvent.getType());
// Apply the provided function to the OrderEvent
orderFunction.apply(orderEvent);
}
return orderEvent;
}
}

View File

@@ -2,12 +2,12 @@ package demo.domain;
import org.springframework.hateoas.ResourceSupport;
public class BaseEntity extends ResourceSupport {
public class AbstractEntity extends ResourceSupport {
private Long createdAt;
private Long lastModified;
public BaseEntity() {
public AbstractEntity() {
}
public Long getCreatedAt() {

View File

@@ -1,84 +0,0 @@
package demo.event;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.state.StateMachineService;
import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@Service
public class EventService {
final private Logger log = Logger.getLogger(EventService.class);
final private StateMachineService stateMachineService;
public EventService(StateMachineService stateMachineService) {
this.stateMachineService = stateMachineService;
}
public Order apply(OrderEvent orderEvent) {
Order result;
log.info(orderEvent);
log.info("Order event received: " + orderEvent.getLink("self").getHref());
// Generate a state machine for computing the state of the order resource
StateMachine<OrderStatus, OrderEventType> stateMachine =
stateMachineService.getStateMachine();
// Follow the hypermedia link to fetch the attached order
Traverson traverson = new Traverson(
URI.create(orderEvent.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
// Get the event log for the attached order resource
OrderEvents events = traverson.follow("events")
.toEntity(OrderEvents.class)
.getBody();
// Prepare order event message headers
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("event", orderEvent);
// Replicate the current state of the order resource
events.getContent()
.stream()
.sorted((a1, a2) -> a1.getCreatedAt().compareTo(a2.getCreatedAt()))
.forEach(e -> {
MessageHeaders headers = new MessageHeaders(null);
// Check to see if this is the current event
if (e.getLink("self").equals(orderEvent.getLink("self"))) {
headers = new MessageHeaders(headerMap);
}
// Send the event to the state machine
stateMachine.sendEvent(MessageBuilder.createMessage(e.getType(), headers));
});
// Get result
Map<Object, Object> context = stateMachine.getExtendedState()
.getVariables();
// Get the order result
result = (Order) context.getOrDefault("order", null);
// Destroy the state machine
stateMachine.stop();
return result;
}
}

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
@@ -11,7 +11,7 @@ import java.util.function.Function;
/**
* The {@link OrderFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.order.Order} resource on to a function. Mapped functions
* state transitions on a {@link Order} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -1,9 +1,9 @@
package demo.function;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;

View File

@@ -0,0 +1,82 @@
package demo.order;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEventType;
import demo.order.event.OrderEvents;
import org.apache.log4j.Logger;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@Service
public class StateFactory {
final private Logger log = Logger.getLogger(StateFactory.class);
final private StateService stateService;
public StateFactory(StateService stateService) {
this.stateService = stateService;
}
public Order apply(OrderEvent orderEvent) {
Assert.notNull(orderEvent, "Cannot apply a null event");
Assert.notNull(orderEvent.getId(), "The event payload's identity link was not found");
StateMachine<OrderStatus, OrderEventType> stateMachine = getStateMachine(orderEvent);
stateMachine.stop();
return stateMachine.getExtendedState().get("order", Order.class);
}
private StateMachine<OrderStatus, OrderEventType> getStateMachine(OrderEvent orderEvent) {
Link eventId = orderEvent.getId();
log.info(String.format("Order event received: %s", eventId));
StateMachine<OrderStatus, OrderEventType> stateMachine;
Map<String, Object> contextMap;
OrderEvents eventLog;
eventLog = getEventLog(orderEvent);
contextMap = getEventHeaders(orderEvent);
stateMachine = stateService.newStateMachine();
// Replicate the aggregate state
eventLog.getContent().stream()
.sorted((a, b) -> a.getCreatedAt().compareTo(b.getCreatedAt()))
.forEach(e -> stateMachine.sendEvent(MessageBuilder.createMessage(e.getType(), e.getId()
.equals(eventId) ? new MessageHeaders(contextMap) : new MessageHeaders(null))));
return stateMachine;
}
private Map<String, Object> getEventHeaders(OrderEvent orderEvent) {
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("event", orderEvent);
return headerMap;
}
private OrderEvents getEventLog(OrderEvent event) {
// Follow the hypermedia link to fetch the attached order
Traverson traverson = new Traverson(
URI.create(event.getLink("order")
.getHref()),
MediaTypes.HAL_JSON
);
// Get the event log for the attached order resource
return traverson.follow("events")
.toEntity(OrderEvents.class)
.getBody();
}
}

View File

@@ -1,7 +1,10 @@
package demo.state;
package demo.order;
import demo.order.OrderStatus;
import demo.event.OrderEventType;
import demo.order.domain.Order;
import demo.order.domain.OrderStatus;
import demo.order.event.OrderEventType;
import demo.order.event.OrderEvent;
import demo.order.event.OrderEvents;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.config.StateMachineFactory;
import org.springframework.stereotype.Service;
@@ -9,27 +12,27 @@ import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* The {@link StateMachineService} provides factory access to get new state machines for
* replicating the state of an {@link demo.order.Order} from {@link demo.event.OrderEvents}.
* The {@link StateService} provides factory access to get new state machines for
* replicating the state of an {@link Order} from {@link OrderEvents}.
*
* @author kbastani
*/
@Service
public class StateMachineService {
public class StateService {
private final StateMachineFactory<OrderStatus, OrderEventType> factory;
public StateMachineService(StateMachineFactory<OrderStatus, OrderEventType> factory) {
public StateService(StateMachineFactory<OrderStatus, OrderEventType> factory) {
this.factory = factory;
}
/**
* Create a new state machine that is initially configured and ready for replicating
* the state of an {@link demo.order.Order} from a sequence of {@link demo.event.OrderEvent}.
* the state of an {@link Order} from a sequence of {@link OrderEvent}.
*
* @return a new instance of {@link StateMachine}
*/
public StateMachine<OrderStatus, OrderEventType> getStateMachine() {
public StateMachine<OrderStatus, OrderEventType> newStateMachine() {
// Create a new state machine in its initial state
StateMachine<OrderStatus, OrderEventType> stateMachine =
factory.getStateMachine(UUID.randomUUID().toString());

View File

@@ -1,5 +1,7 @@
package demo.event;
package demo.order.controller;
import demo.order.StateFactory;
import demo.order.event.OrderEvent;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
@@ -13,9 +15,9 @@ import java.util.Optional;
@RequestMapping("/v1")
public class EventController {
private EventService eventService;
private StateFactory eventService;
public EventController(EventService eventService) {
public EventController(StateFactory eventService) {
this.eventService = eventService;
}

View File

@@ -1,4 +1,4 @@
package demo.address;
package demo.order.domain;
import java.io.Serializable;

View File

@@ -1,4 +1,4 @@
package demo.address;
package demo.order.domain;
public enum AddressType {
SHIPPING,

View File

@@ -1,4 +1,4 @@
package demo.order;
package demo.order.domain;
import java.io.Serializable;

View File

@@ -1,16 +1,16 @@
package demo.order;
package demo.order.domain;
import demo.address.Address;
import demo.address.AddressType;
import demo.domain.BaseEntity;
import demo.event.OrderEvent;
import com.fasterxml.jackson.annotation.JsonProperty;
import demo.domain.AbstractEntity;
import demo.order.event.OrderEvent;
import org.springframework.hateoas.Link;
import java.util.HashSet;
import java.util.Set;
public class Order extends BaseEntity {
public class Order extends AbstractEntity {
private Long orderId;
private Long id;
private OrderStatus status;
@@ -23,19 +23,13 @@ public class Order extends BaseEntity {
public Order() {
}
public Order(String accountNumber, Address shippingAddress) {
this();
this.shippingAddress = shippingAddress;
if (shippingAddress.getAddressType() == null)
this.shippingAddress.setAddressType(AddressType.SHIPPING);
@JsonProperty("orderId")
public Long getIdentity() {
return this.id;
}
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long id) {
this.orderId = orderId;
public void setIdentity(Long id) {
this.id = id;
}
public OrderStatus getStatus() {
@@ -74,14 +68,13 @@ public class Order extends BaseEntity {
lineItems.add(lineItem);
}
/**
* Returns the {@link Link} with a rel of {@link Link#REL_SELF}.
*/
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", status=" + status +
", events=" + events +
", lineItems=" + lineItems +
", shippingAddress=" + shippingAddress +
"} " + super.toString();
public Link getId() {
return getLink("self");
}
}

View File

@@ -1,4 +1,4 @@
package demo.order;
package demo.order.domain;
public enum OrderStatus {
ORDER_CREATED,

View File

@@ -1,8 +1,8 @@
package demo.event;
package demo.order.event;
import demo.domain.BaseEntity;
import demo.domain.AbstractEntity;
public class OrderEvent extends BaseEntity {
public class OrderEvent extends AbstractEntity {
private OrderEventType type;

View File

@@ -1,8 +1,7 @@
package demo.stream;
package demo.order.event;
import demo.event.EventService;
import demo.event.OrderEvent;
import demo.order.Order;
import demo.order.StateFactory;
import demo.order.domain.Order;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@@ -10,24 +9,24 @@ import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Profile;
/**
* The {@link OrderStream} monitors for a variety of {@link OrderEvent} domain
* The {@link OrderEventProcessor} monitors for a variety of {@link OrderEvent} domain
* events for an {@link Order}.
*
* @author kbastani
*/
@EnableAutoConfiguration
@EnableBinding(Sink.class)
@Profile({ "cloud", "development" })
public class OrderStream {
@Profile({"cloud", "development"})
public class OrderEventProcessor {
private EventService eventService;
private StateFactory stateFactory;
public OrderStream(EventService eventService) {
this.eventService = eventService;
public OrderEventProcessor(StateFactory stateFactory) {
this.stateFactory = stateFactory;
}
@StreamListener(Sink.INPUT)
public void streamListener(OrderEvent orderEvent) {
eventService.apply(orderEvent);
stateFactory.apply(orderEvent);
}
}

View File

@@ -1,4 +1,4 @@
package demo.event;
package demo.order.event;
public enum OrderEventType {
ORDER_CREATED,

View File

@@ -1,4 +1,4 @@
package demo.event;
package demo.order.event;
import org.springframework.hateoas.Resources;

View File

@@ -1,8 +1,8 @@
package demo.payment;
package demo.payment.domain;
import demo.domain.BaseEntity;
import demo.domain.AbstractEntity;
public class Payment extends BaseEntity {
public class Payment extends AbstractEntity {
private Long paymentId;
private Double amount;

View File

@@ -1,4 +1,4 @@
package demo.payment;
package demo.payment.domain;
public enum PaymentMethod {
CREDIT_CARD

View File

@@ -1,7 +1,8 @@
package demo.payment;
package demo.payment.domain;
public enum PaymentStatus {
PAYMENT_CREATED,
ORDER_CONNECTED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,

View File

@@ -13,6 +13,8 @@ spring:
contentType: 'application/json'
consumer:
durableSubscription: true
jackson:
default-property-inclusion: non_null
server:
port: 0
amazon:
@@ -24,4 +26,6 @@ spring:
profiles: test
eureka:
client:
enabled: false
enabled: false
server:
port: 0

View File

@@ -1,4 +1,4 @@
spring:
application:
name: account-worker
name: order-worker
---

View File

@@ -1,5 +1,6 @@
package demo.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
@@ -24,6 +25,7 @@ public class WebMvcConfig extends WebMvcConfigurerAdapter {
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
converter.setObjectMapper(objectMapper);
converters.add(converter);
}

View File

@@ -2,6 +2,11 @@ package demo.payment.action;
import demo.domain.Action;
import demo.payment.domain.Payment;
import demo.payment.domain.PaymentModule;
import demo.payment.domain.PaymentService;
import demo.payment.domain.PaymentStatus;
import demo.payment.event.PaymentEvent;
import demo.payment.event.PaymentEventType;
import org.springframework.stereotype.Service;
import java.util.function.BiConsumer;
@@ -9,6 +14,17 @@ import java.util.function.BiConsumer;
@Service
public class ConnectOrder extends Action<Payment> {
public BiConsumer<Payment, Long> getConsumer() {
return Payment::setOrderId;
return (payment, orderId) -> {
PaymentService paymentService = payment.getModule(PaymentModule.class)
.getDefaultService();
// Connect the payment to the order
payment.setOrderId(orderId);
payment.setStatus(PaymentStatus.ORDER_CONNECTED);
payment = paymentService.update(payment);
// Trigger the payment connected
payment.sendAsyncEvent(new PaymentEvent(PaymentEventType.ORDER_CONNECTED, payment));
};
}
}

View File

@@ -6,17 +6,18 @@ import demo.event.Events;
import demo.payment.event.PaymentEvent;
import demo.payment.domain.Payment;
import demo.payment.domain.PaymentService;
import org.springframework.hateoas.ExposesResourceFor;
import org.springframework.hateoas.LinkBuilder;
import org.springframework.hateoas.Resource;
import org.springframework.hateoas.ResourceSupport;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.hateoas.*;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.*;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
@@ -32,10 +33,13 @@ public class PaymentController {
private final PaymentService paymentService;
private final EventService<PaymentEvent, Long> eventService;
private final DiscoveryClient discoveryClient;
public PaymentController(PaymentService paymentService, EventService<PaymentEvent, Long> eventService) {
public PaymentController(PaymentService paymentService, EventService<PaymentEvent, Long> eventService,
DiscoveryClient discoveryClient) {
this.paymentService = paymentService;
this.eventService = eventService;
this.discoveryClient = discoveryClient;
}
@RequestMapping(path = "/payments", method = RequestMethod.POST)
@@ -199,16 +203,23 @@ public class PaymentController {
private Resource<Payment> getPaymentResource(Payment payment) {
Assert.notNull(payment, "Payment must not be null");
if(!payment.hasLink("commands")) {
if (!payment.hasLink("commands")) {
// Add command link
payment.add(linkBuilder("getCommands", payment.getIdentity()).withRel("commands"));
}
if(!payment.hasLink("events")) {
if (!payment.hasLink("events")) {
// Add get events link
payment.add(linkBuilder("getPaymentEvents", payment.getIdentity()).withRel("events"));
}
// Add remote payment link
if (payment.getOrderId() != null) {
Link result = getRemoteLink("order-web", "/v1/orders/{id}", payment.getOrderId(), "order ");
if (result != null)
payment.add(result);
}
return new Resource<>(payment);
}
@@ -217,4 +228,19 @@ public class PaymentController {
payment.setIdentity(id);
return new Resource<>(payment.getCommands());
}
private Link getRemoteLink(String service, String relative, Object identifier, String rel) {
Link result = null;
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(service);
if (serviceInstances.size() > 0) {
ServiceInstance serviceInstance = serviceInstances.get(new Random().nextInt(serviceInstances.size()));
result = new Link(new UriTemplate(serviceInstance.getUri()
.toString()
.concat(relative)).with("id", TemplateVariable.VariableType.PATH_VARIABLE)
.expand(identifier)
.toString())
.withRel(rel);
}
return result;
}
}

View File

@@ -1,13 +1,12 @@
package demo.payment.domain;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import demo.domain.AbstractEntity;
import demo.domain.Command;
import demo.payment.event.PaymentEvent;
import demo.payment.action.ConnectOrder;
import demo.payment.action.ProcessPayment;
import demo.payment.controller.PaymentController;
import demo.payment.event.PaymentEvent;
import org.springframework.hateoas.Link;
import javax.persistence.*;
@@ -41,6 +40,7 @@ public class Payment extends AbstractEntity<PaymentEvent, Long> {
}
public Payment(Double amount, PaymentMethod paymentMethod) {
this();
this.amount = amount;
this.paymentMethod = paymentMethod;
}
@@ -80,7 +80,6 @@ public class Payment extends AbstractEntity<PaymentEvent, Long> {
this.paymentMethod = paymentMethod;
}
@JsonIgnore
public Long getOrderId() {
return orderId;
}

View File

@@ -19,11 +19,9 @@ import org.springframework.util.Assert;
public class PaymentService extends Service<Payment, Long> {
private final PaymentRepository paymentRepository;
private final EventService<PaymentEvent, Long> eventService;
public PaymentService(PaymentRepository paymentRepository, EventService<PaymentEvent, Long> eventService) {
this.paymentRepository = paymentRepository;
this.eventService = eventService;
}
public Payment registerPayment(Payment payment) {

View File

@@ -10,6 +10,7 @@ import demo.payment.event.PaymentEvent;
*/
public enum PaymentStatus {
PAYMENT_CREATED,
ORDER_CONNECTED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,

View File

@@ -11,6 +11,7 @@ import demo.payment.domain.PaymentStatus;
*/
public enum PaymentEventType {
PAYMENT_CREATED,
ORDER_CONNECTED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,

View File

@@ -10,6 +10,8 @@ spring:
output:
destination: payment
contentType: 'application/json'
jackson:
default-property-inclusion: non_null
redis:
host: localhost
port: 6379

View File

@@ -10,6 +10,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;
@@ -25,6 +27,9 @@ public class EventServiceTests {
@Autowired
private EventService<PaymentEvent, Long> eventService;
@MockBean
private DiscoveryClient discoveryClient;
@Test
public void getPaymentReturnsPayment() throws Exception {
Payment payment = new Payment(11.0, PaymentMethod.CREDIT_CARD);

View File

@@ -13,6 +13,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@@ -39,6 +40,9 @@ public class PaymentControllerTest {
@MockBean
private EventService<PaymentEvent, Long> eventService;
@MockBean
private DiscoveryClient discoveryClient;
@Test
public void getUserPaymentResourceShouldReturnPayment() throws Exception {
String content = "{\"paymentMethod\": \"CREDIT_CARD\", \"amount\": 42.0 }";

View File

@@ -10,6 +10,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@@ -26,6 +27,9 @@ public class PaymentServiceTests {
@MockBean
private PaymentRepository paymentRepository;
@MockBean
private DiscoveryClient discoveryClient;
private PaymentService paymentService;
@Before

View File

@@ -75,6 +75,12 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Paymen
.and()
.withExternal()
.source(PaymentStatus.PAYMENT_CREATED)
.target(PaymentStatus.ORDER_CONNECTED)
.event(PaymentEventType.ORDER_CONNECTED)
.action(orderConnected())
.and()
.withExternal()
.source(PaymentStatus.ORDER_CONNECTED)
.target(PaymentStatus.PAYMENT_PENDING)
.event(PaymentEventType.PAYMENT_PENDING)
.action(paymentPending())
@@ -149,6 +155,23 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Paymen
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> orderConnected() {
return context -> applyEvent(context,
new OrderConnected(context, event -> {
log.info(event.getType() + ": " + event.getLink("payment").getHref());
// Get the payment resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Payment.class)
.getBody();
}));
}
@Bean
public Action<PaymentStatus, PaymentEventType> paymentPending() {
return context -> applyEvent(context,

View File

@@ -11,6 +11,7 @@ import demo.payment.PaymentStatus;
*/
public enum PaymentEventType {
PAYMENT_CREATED,
ORDER_CONNECTED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,

View File

@@ -0,0 +1,31 @@
package demo.function;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import demo.payment.Payment;
import demo.payment.PaymentStatus;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
public class OrderConnected extends PaymentFunction {
final private Logger log = Logger.getLogger(OrderConnected.class);
public OrderConnected(StateContext<PaymentStatus, PaymentEventType> context, Function<PaymentEvent, Payment> lambda) {
super(context, lambda);
}
/**
* Apply an {@link PaymentEvent} to the lambda function that was provided through the
* constructor of this {@link PaymentFunction}.
*
* @param event is the {@link PaymentEvent} to apply to the lambda function
*/
@Override
public Payment apply(PaymentEvent event) {
log.info("Executing workflow for order connected...");
return super.apply(event);
}
}

View File

@@ -8,6 +8,7 @@ public class Payment extends BaseEntity {
private Double amount;
private PaymentMethod paymentMethod;
private PaymentStatus status;
private Long orderId;
public Payment() {
}
@@ -44,6 +45,14 @@ public class Payment extends BaseEntity {
this.paymentMethod = paymentMethod;
}
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
@Override
public String toString() {
return "Payment{" +

View File

@@ -9,6 +9,7 @@ package demo.payment;
*/
public enum PaymentStatus {
PAYMENT_CREATED,
ORDER_CONNECTED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,

View File

@@ -13,6 +13,8 @@ spring:
contentType: 'application/json'
consumer:
durableSubscription: true
jackson:
default-property-inclusion: non_null
server:
port: 0
amazon:

View File

@@ -46,7 +46,7 @@ public abstract class Aggregate<E extends Event, ID extends Serializable> extend
@JsonIgnore
protected <T extends Action<A>, A extends Aggregate> T getAction(
Class<T> actionType) throws IllegalArgumentException {
Module provider = getProvider();
Module provider = getModule();
Service service = provider.getDefaultService();
return (T) service.getAction(actionType);
}
@@ -59,8 +59,8 @@ public abstract class Aggregate<E extends Event, ID extends Serializable> extend
*/
@SuppressWarnings("unchecked")
@JsonIgnore
public <T extends Module<A>, A extends Aggregate<E, ID>> T getProvider() throws IllegalArgumentException {
return getProvider((Class<T>) ResolvableType
public <T extends Module<A>, A extends Aggregate<E, ID>> T getModule() throws IllegalArgumentException {
return getModule((Class<T>) ResolvableType
.forClassWithGenerics(Module.class, ResolvableType.forInstance(this))
.getRawClass());
}
@@ -72,7 +72,7 @@ public abstract class Aggregate<E extends Event, ID extends Serializable> extend
* @throws IllegalArgumentException if the application context is unavailable or the provider does not exist
*/
@JsonIgnore
public <T extends Module<A>, A extends Aggregate<E, ID>> T getProvider(Class<T> providerType) throws
public <T extends Module<A>, A extends Aggregate<E, ID>> T getModule(Class<T> providerType) throws
IllegalArgumentException {
Assert.notNull(applicationContext, "The application context is unavailable");
T provider = applicationContext.getBean(providerType);
@@ -167,13 +167,13 @@ public abstract class Aggregate<E extends Event, ID extends Serializable> extend
@SuppressWarnings("unchecked")
@JsonIgnore
protected Service<Aggregate<E, ID>, ID> getEntityService() {
return (Service<Aggregate<E, ID>, ID>) getProvider().getDefaultService();
return (Service<Aggregate<E, ID>, ID>) getModule().getDefaultService();
}
@SuppressWarnings("unchecked")
@JsonIgnore
protected EventService<E, ID> getEventService() {
return (EventService<E, ID>) getProvider().getDefaultEventService();
return (EventService<E, ID>) getModule().getDefaultEventService();
}
public static class CommandResources extends ResourceSupport {

View File

@@ -42,18 +42,18 @@ public class ProviderTests {
@Test
public void testGetProviderReturnsProvider() {
assertNotNull(new EmptyAggregate().getProvider(EmptyProvider.class));
assertNotNull(new EmptyAggregate().getModule(EmptyProvider.class));
}
@Test
public void testGetServiceReturnsService() {
EmptyProvider provider = new EmptyAggregate().getProvider(EmptyProvider.class);
EmptyProvider provider = new EmptyAggregate().getModule(EmptyProvider.class);
assertNotNull(provider.getEmptyService());
}
@Test
public void testGetActionReturnsAction() {
EmptyProvider provider = new EmptyAggregate().getProvider(EmptyProvider.class);
EmptyProvider provider = new EmptyAggregate().getModule(EmptyProvider.class);
EmptyService service = provider.getEmptyService();
assertNotNull(service.getAction(EmptyAction.class));
}
@@ -61,7 +61,7 @@ public class ProviderTests {
@Test
public void testProcessCommandChangesStatus() {
EmptyAggregate aggregate = new EmptyAggregate(0L, AggregateStatus.CREATED);
EmptyProvider provider = new EmptyAggregate().getProvider(EmptyProvider.class);
EmptyProvider provider = new EmptyAggregate().getModule(EmptyProvider.class);
EmptyService service = provider.getEmptyService();
EmptyAction emptyAction = service.getAction(EmptyAction.class);
emptyAction.getConsumer().accept(aggregate);
@@ -107,7 +107,7 @@ public class ProviderTests {
@Command(controller = EmptyController.class, method = "emptyAction")
public void emptyAction() {
EmptyProvider emptyProvider = this.getProvider();
EmptyProvider emptyProvider = this.getModule();
emptyProvider.getEmptyService()
.getAction(EmptyAction.class)
.getConsumer()