Creating a spring boot starter for event sourcing

This commit is contained in:
Kenny Bastani
2016-12-26 06:12:21 -05:00
parent e1dc702107
commit 6f022b8ee6
49 changed files with 1125 additions and 473 deletions

View File

@@ -2,9 +2,7 @@ package demo.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
@@ -12,8 +10,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import java.util.Arrays;
@Configuration
@EnableCaching
public class CacheConfig {
@Bean

View File

@@ -8,6 +8,7 @@ import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.data.domain.PageRequest;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.http.RequestEntity;
@@ -68,7 +69,10 @@ public class EventService {
* @param consistencyModel is the consistency model for this request
* @return an {@link OrderEvent} that has been appended to the {@link Order}'s event log
*/
public OrderEvent raiseEvent(OrderEvent event, ConsistencyModel consistencyModel) {
public OrderEvent raiseEvent(OrderEvent event, ConsistencyModel consistencyModel, Link... links) {
// Add embedded links
event.add(links);
switch (consistencyModel) {
case BASE:
asyncRaiseEvent(event);
@@ -189,7 +193,7 @@ public class EventService {
* @return a hypermedia resource for the supplied {@link OrderEvent} entity
*/
private Resource<OrderEvent> getOrderEventResource(OrderEvent event) {
return new Resource<OrderEvent>(event, Arrays.asList(
event.add(Arrays.asList(
linkTo(OrderController.class)
.slash("events")
.slash(event.getEventId())
@@ -198,6 +202,7 @@ public class EventService {
.slash("orders")
.slash(event.getOrder().getOrderId())
.withRel("order")));
return new Resource<OrderEvent>(event, event.getLinks());
}
/**

View File

@@ -16,7 +16,8 @@ public class Order extends BaseEntity {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String accountNumber;
private Long accountId;
private Long paymentId;
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private Set<OrderEvent> events = new HashSet<>();
@@ -34,9 +35,9 @@ public class Order extends BaseEntity {
this.status = OrderStatus.ORDER_CREATED;
}
public Order(String accountNumber, Address shippingAddress) {
public Order(Long accountId, Address shippingAddress) {
this();
this.accountNumber = accountNumber;
this.accountId = accountId;
this.shippingAddress = shippingAddress;
if (shippingAddress.getAddressType() == null)
this.shippingAddress.setAddressType(AddressType.SHIPPING);
@@ -51,12 +52,22 @@ public class Order extends BaseEntity {
this.id = id;
}
public String getAccountNumber() {
return accountNumber;
@JsonIgnore
public Long getAccountId() {
return accountId;
}
public void setAccountNumber(String accountNumber) {
this.accountNumber = accountNumber;
public void setAccountId(Long accountId) {
this.accountId = accountId;
}
@JsonIgnore
public Long getPaymentId() {
return paymentId;
}
public void setPaymentId(Long paymentId) {
this.paymentId = paymentId;
}
public OrderStatus getStatus() {
@@ -100,8 +111,8 @@ public class Order extends BaseEntity {
public String toString() {
return "Order{" +
"id=" + id +
", accountNumber='" + accountNumber + '\'' +
", events=" + events +
", accountId=" + accountId +
", paymentId=" + paymentId +
", status=" + status +
", lineItems=" + lineItems +
", shippingAddress=" + shippingAddress +

View File

@@ -1,11 +1,10 @@
package demo.order;
import demo.event.OrderEvent;
import demo.event.OrderEvents;
import demo.event.EventController;
import demo.event.EventService;
import org.springframework.hateoas.LinkBuilder;
import org.springframework.hateoas.Resource;
import demo.event.OrderEvent;
import demo.event.OrderEvents;
import org.springframework.hateoas.*;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
@@ -77,17 +76,15 @@ public class OrderController {
}
@GetMapping(path = "/orders/{id}/commands/connectAccount")
public ResponseEntity connectAccount(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.CONNECT_ACCOUNT)))
public ResponseEntity connectAccount(@PathVariable Long id, @RequestParam(value = "accountId") Long accountId) {
return Optional.ofNullable(getOrderResource(orderService.connectAccount(id, accountId)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@GetMapping(path = "/orders/{id}/commands/connectPayment")
public ResponseEntity connectPayment(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.CONNECT_PAYMENT)))
public ResponseEntity connectPayment(@PathVariable Long id, @RequestParam(value = "paymentId") Long paymentId) {
return Optional.ofNullable(getOrderResource(orderService.connectPayment(id, paymentId)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@@ -95,7 +92,7 @@ public class OrderController {
@GetMapping(path = "/orders/{id}/commands/createPayment")
public ResponseEntity createPayment(@PathVariable Long id) {
return Optional.ofNullable(getOrderResource(
orderService.applyCommand(id, OrderCommand.CREATE_PAYMENT)))
orderService.createPayment(id)))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("The command could not be applied"));
}
@@ -154,7 +151,7 @@ public class OrderController {
/**
* Update a {@link Order} entity for the provided identifier.
*
* @param id is the unique identifier for the {@link Order} update
* @param id is the unique identifier for the {@link Order} update
* @param order is the entity representation containing any updated {@link Order} fields
* @return a hypermedia resource for the updated {@link Order}
*/
@@ -167,7 +164,7 @@ public class OrderController {
* aggregate with the specified orderId.
*
* @param orderId is the unique identifier for the {@link Order}
* @param event is the {@link OrderEvent} that attempts to alter the state of the {@link Order}
* @param event is the {@link OrderEvent} that attempts to alter the state of the {@link Order}
* @return a hypermedia resource for the newly appended {@link OrderEvent}
*/
private Resource<OrderEvent> appendEventResource(Long orderId, OrderEvent event) {
@@ -208,18 +205,28 @@ public class OrderController {
// Add order command hypermedia links
if (orderResource != null) {
commandResource.add(
getCommandLinkBuilder(id)
.slash("connectAccount")
.withRel("connectAccount"),
new Link(new UriTemplate(
getCommandLinkBuilder(id)
.slash("connectAccount")
.toUri()
.toString(),
new TemplateVariables(
new TemplateVariable("accountId",
TemplateVariable.VariableType.REQUEST_PARAM))), "connectAccount"),
getCommandLinkBuilder(id)
.slash("reserveInventory")
.withRel("reserveInventory"),
getCommandLinkBuilder(id)
.slash("createPayment")
.withRel("createPayment"),
getCommandLinkBuilder(id)
.slash("connectPayment")
.withRel("connectPayment"),
new Link(new UriTemplate(
getCommandLinkBuilder(id)
.slash("connectPayment")
.toUri()
.toString(),
new TemplateVariables(
new TemplateVariable("paymentId",
TemplateVariable.VariableType.REQUEST_PARAM))), "connectPayment"),
getCommandLinkBuilder(id)
.slash("processPayment")
.withRel("processPayment")
@@ -276,6 +283,12 @@ public class OrderController {
.withRel("commands")
);
if (order.getAccountId() != null)
orderResource.add(new Link("http://account-service/v1/accounts/" + order.getAccountId(), "account"));
if (order.getPaymentId() != null)
orderResource.add(new Link("http://localhost:8082/v1/payments/" + order.getPaymentId(), "payment"));
return orderResource;
}
}

View File

@@ -4,28 +4,39 @@ import demo.event.ConsistencyModel;
import demo.event.EventService;
import demo.event.OrderEvent;
import demo.event.OrderEventType;
import org.springframework.cache.CacheManager;
import demo.payment.Payment;
import demo.payment.PaymentMethod;
import org.apache.log4j.Logger;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.http.RequestEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
@Service
@CacheConfig(cacheNames = {"orders"})
public class OrderService {
private final Logger log = Logger.getLogger(OrderService.class);
private final OrderRepository orderRepository;
private final EventService eventService;
private final CacheManager cacheManager;
private final RestTemplate restTemplate;
public OrderService(OrderRepository orderRepository, EventService eventService, CacheManager cacheManager) {
public OrderService(OrderRepository orderRepository, EventService eventService, RestTemplate restTemplate) {
this.orderRepository = orderRepository;
this.eventService = eventService;
this.cacheManager = cacheManager;
this.restTemplate = restTemplate;
}
@CacheEvict(cacheNames = "orders", key = "#order.getOrderId().toString()")
@@ -33,8 +44,7 @@ public class OrderService {
order = createOrder(order);
cacheManager.getCache("orders")
.evict(order.getOrderId());
//cacheManager.getCache("orders").evict(order.getOrderId());
// Trigger the order creation event
OrderEvent event = appendEvent(order.getOrderId(),
@@ -76,7 +86,7 @@ public class OrderService {
/**
* Update an {@link Order} entity with the supplied identifier.
*
* @param id is the unique identifier of the {@link Order} entity
* @param id is the unique identifier of the {@link Order} entity
* @param order is the {@link Order} containing updated fields
* @return the updated {@link Order} entity
*/
@@ -96,12 +106,13 @@ public class OrderService {
"The order with the supplied id does not exist");
Order currentOrder = orderRepository.findOne(id);
currentOrder.setAccountNumber(order.getAccountNumber());
currentOrder.setAccountId(order.getAccountId());
currentOrder.setPaymentId(order.getPaymentId());
currentOrder.setLineItems(order.getLineItems());
currentOrder.setShippingAddress(order.getShippingAddress());
currentOrder.setStatus(order.getStatus());
return orderRepository.save(currentOrder);
return orderRepository.saveAndFlush(currentOrder);
}
/**
@@ -121,7 +132,19 @@ public class OrderService {
* Append a new {@link OrderEvent} to the {@link Order} reference for the supplied identifier.
*
* @param orderId is the unique identifier for the {@link Order}
* @param event is the {@link OrderEvent} to append to the {@link Order} entity
* @param event is the {@link OrderEvent} to append to the {@link Order} entity
* @param links is the optional {@link Link} to embed in the {@link org.springframework.hateoas.Resource}
* @return the newly appended {@link OrderEvent}
*/
public OrderEvent appendEvent(Long orderId, OrderEvent event, Link... links) {
return appendEvent(orderId, event, ConsistencyModel.ACID, links);
}
/**
* Append a new {@link OrderEvent} to the {@link Order} reference for the supplied identifier.
*
* @param orderId is the unique identifier for the {@link Order}
* @param event is the {@link OrderEvent} to append to the {@link Order} entity
* @return the newly appended {@link OrderEvent}
*/
public OrderEvent appendEvent(Long orderId, OrderEvent event) {
@@ -132,24 +155,25 @@ public class OrderService {
* Append a new {@link OrderEvent} to the {@link Order} reference for the supplied identifier.
*
* @param orderId is the unique identifier for the {@link Order}
* @param event is the {@link OrderEvent} to append to the {@link Order} entity
* @param event is the {@link OrderEvent} to append to the {@link Order} entity
* @return the newly appended {@link OrderEvent}
*/
public OrderEvent appendEvent(Long orderId, OrderEvent event, ConsistencyModel consistencyModel) {
public OrderEvent appendEvent(Long orderId, OrderEvent event, ConsistencyModel consistencyModel, Link... links) {
Order order = getOrder(orderId);
Assert.notNull(order, "The order with the supplied id does not exist");
event.setOrder(order);
event = eventService.createEvent(orderId, event);
order.getEvents().add(event);
orderRepository.saveAndFlush(order);
eventService.raiseEvent(event, consistencyModel);
order = orderRepository.saveAndFlush(order);
event.setOrder(order);
eventService.raiseEvent(event, consistencyModel, links);
return event;
}
/**
* Apply an {@link OrderCommand} to the {@link Order} with a specified identifier.
*
* @param id is the unique identifier of the {@link Order}
* @param id is the unique identifier of the {@link Order}
* @param orderCommand is the command to apply to the {@link Order}
* @return a hypermedia resource containing the updated {@link Order}
*/
@@ -165,4 +189,105 @@ public class OrderService {
return order;
}
public Order connectAccount(Long id, Long accountId) {
// Get the order
Order order = getOrder(id);
// Connect the account
order.setAccountId(accountId);
order.setStatus(OrderStatus.ACCOUNT_CONNECTED);
order = updateOrder(id, order);
//cacheManager.getCache("orders").evict(id);
// Trigger the account connected event
OrderEvent event = appendEvent(order.getOrderId(),
new OrderEvent(OrderEventType.ACCOUNT_CONNECTED));
// Set non-serializable fields
event.getOrder().setAccountId(order.getAccountId());
event.getOrder().setPaymentId(order.getPaymentId());
event.getOrder().setOrderId(order.getOrderId());
// Return the result
return event.getOrder();
}
public Order connectPayment(Long id, Long paymentId) {
// Get the order
Order order = getOrder(id);
// Connect the account
order.setPaymentId(paymentId);
order.setStatus(OrderStatus.PAYMENT_CONNECTED);
order = updateOrder(id, order);
// cacheManager.getCache("orders").evict(id);
// Trigger the account connected event
OrderEvent event = appendEvent(order.getOrderId(),
new OrderEvent(OrderEventType.PAYMENT_CONNECTED));
// Set non-serializable fields
event.getOrder().setAccountId(order.getAccountId());
event.getOrder().setPaymentId(order.getPaymentId());
event.getOrder().setOrderId(order.getOrderId());
// Return the result
return event.getOrder();
}
public Order createPayment(Long id) {
// Get the order
Order order = getOrder(id);
Payment payment = new Payment();
// Calculate payment amount
payment.setAmount(order.getLineItems()
.stream()
.mapToDouble(a -> (a.getPrice() + a.getTax()) * a.getQuantity())
.sum());
// Set payment method
payment.setPaymentMethod(PaymentMethod.CREDIT_CARD);
// Create a new request entity
RequestEntity<Resource<Payment>> requestEntity = RequestEntity.post(
URI.create("http://localhost:8082/v1/payments"))
.contentType(MediaTypes.HAL_JSON)
.body(new Resource<Payment>(payment), Resource.class);
// Update the order entity's status
payment = restTemplate.exchange(requestEntity, Payment.class)
.getBody();
log.info(payment);
// Update the status
order.setStatus(OrderStatus.PAYMENT_CREATED);
order = updateOrder(id, order);
// cacheManager.getCache("orders").evict(id);
// Trigger the account connected event
OrderEvent event = appendEvent(order.getOrderId(),
new OrderEvent(OrderEventType.PAYMENT_CREATED),
new Link(payment.getId().getHref(), "payment"));
// Set non-serializable fields
event.getOrder()
.setAccountId(Optional.ofNullable(event.getOrder().getAccountId())
.orElse(order.getAccountId()));
event.getOrder()
.setPaymentId(Optional.ofNullable(event.getOrder().getPaymentId())
.orElse(order.getPaymentId()));
event.getOrder().setOrderId(order.getOrderId());
// Return the result
return event.getOrder();
}
}

View File

@@ -0,0 +1,51 @@
package demo.payment;
import demo.domain.BaseEntity;
public class Payment extends BaseEntity {
private Double amount;
private PaymentMethod paymentMethod;
private PaymentStatus status;
public Payment() {
}
public Payment(Double amount, PaymentMethod paymentMethod) {
this.amount = amount;
this.paymentMethod = paymentMethod;
}
public PaymentStatus getStatus() {
return status;
}
public void setStatus(PaymentStatus status) {
this.status = status;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public PaymentMethod getPaymentMethod() {
return paymentMethod;
}
public void setPaymentMethod(PaymentMethod paymentMethod) {
this.paymentMethod = paymentMethod;
}
@Override
public String toString() {
return "Payment{" +
"amount=" + amount +
", paymentMethod=" + paymentMethod +
", status=" + status +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,5 @@
package demo.payment;
public enum PaymentMethod {
CREDIT_CARD
}

View File

@@ -0,0 +1,9 @@
package demo.payment;
public enum PaymentStatus {
PAYMENT_CREATED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,
PAYMENT_SUCCEEDED
}

View File

@@ -5,6 +5,7 @@ import demo.event.OrderEventType;
import demo.function.*;
import demo.order.Order;
import demo.order.OrderStatus;
import demo.payment.Payment;
import demo.stream.OrderStream;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Bean;
@@ -21,6 +22,8 @@ import org.springframework.statemachine.config.builders.StateMachineTransitionCo
import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
/**
* A configuration adapter for describing a {@link StateMachine} factory that maps actions to functional
@@ -99,7 +102,7 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
.action(reservationFailed())
.and()
.withExternal()
.source(OrderStatus.RESERVATION_SUCCEEDED)
.source(OrderStatus.ACCOUNT_CONNECTED)
.target(OrderStatus.PAYMENT_CREATED)
.event(OrderEventType.PAYMENT_CREATED)
.action(paymentCreated())
@@ -240,14 +243,30 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderS
new PaymentCreated(context, event -> {
log.info(event.getType() + ": " + event.getLink("order").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
Traverson paymentResource = new Traverson(
URI.create(event.getLink("payment").getHref()),
MediaTypes.HAL_JSON
);
Traverson orderResource = new Traverson(
URI.create(event.getLink("order").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
Payment payment = paymentResource.follow("self")
.toEntity(Payment.class)
.getBody();
Order order = orderResource.follow("self")
.toEntity(Order.class)
.getBody();
Map<String, Object> template = new HashMap<String, Object>();
template.put("paymentId", payment.getPaymentId());
return orderResource.follow("commands", "connectPayment")
.withTemplateParameters(template)
.toObject(Order.class);
}));
}

View File

@@ -11,7 +11,6 @@ import java.util.Set;
public class Order extends BaseEntity {
private Long orderId;
private String accountNumber;
private OrderStatus status;
@@ -26,7 +25,6 @@ public class Order extends BaseEntity {
public Order(String accountNumber, Address shippingAddress) {
this();
this.accountNumber = accountNumber;
this.shippingAddress = shippingAddress;
if (shippingAddress.getAddressType() == null)
this.shippingAddress.setAddressType(AddressType.SHIPPING);
@@ -40,14 +38,6 @@ public class Order extends BaseEntity {
this.orderId = orderId;
}
public String getAccountNumber() {
return accountNumber;
}
public void setAccountNumber(String accountNumber) {
this.accountNumber = accountNumber;
}
public OrderStatus getStatus() {
return status;
}
@@ -87,9 +77,9 @@ public class Order extends BaseEntity {
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", accountNumber='" + accountNumber + '\'' +
"orderId=" + orderId +
", status=" + status +
", events=" + events +
", lineItems=" + lineItems +
", shippingAddress=" + shippingAddress +
"} " + super.toString();

View File

@@ -0,0 +1,56 @@
package demo.payment;
import demo.domain.BaseEntity;
public class Payment extends BaseEntity {
private Long paymentId;
private Double amount;
private PaymentMethod paymentMethod;
private PaymentStatus status;
public Payment() {
}
public Long getPaymentId() {
return paymentId;
}
public void setPaymentId(Long paymentId) {
this.paymentId = paymentId;
}
public PaymentStatus getStatus() {
return status;
}
public void setStatus(PaymentStatus status) {
this.status = status;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public PaymentMethod getPaymentMethod() {
return paymentMethod;
}
public void setPaymentMethod(PaymentMethod paymentMethod) {
this.paymentMethod = paymentMethod;
}
@Override
public String toString() {
return "Payment{" +
"paymentId=" + paymentId +
", amount=" + amount +
", paymentMethod=" + paymentMethod +
", status=" + status +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,5 @@
package demo.payment;
public enum PaymentMethod {
CREDIT_CARD
}

View File

@@ -0,0 +1,9 @@
package demo.payment;
public enum PaymentStatus {
PAYMENT_CREATED,
PAYMENT_PENDING,
PAYMENT_PROCESSED,
PAYMENT_FAILED,
PAYMENT_SUCCEEDED
}

View File

@@ -27,7 +27,7 @@ public class OrderStream {
}
@StreamListener(Sink.INPUT)
public void streamListerner(OrderEvent orderEvent) {
public void streamListener(OrderEvent orderEvent) {
eventService.apply(orderEvent);
}
}

View File

@@ -51,6 +51,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.kbastani</groupId>
<artifactId>spring-boot-starter-data-events</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>

View File

@@ -2,9 +2,7 @@ package demo.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
@@ -12,8 +10,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import java.util.Arrays;
@Configuration
@EnableCaching
public class CacheConfig {
@Bean

View File

@@ -1,39 +0,0 @@
package demo.event;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Optional;
@RestController
@RequestMapping("/v1")
public class EventController {
private final EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
@PostMapping(path = "/events/{id}")
public ResponseEntity createEvent(@RequestBody PaymentEvent event, @PathVariable Long id) {
return Optional.ofNullable(eventService.createEvent(id, event, ConsistencyModel.ACID))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new IllegalArgumentException("Event creation failed"));
}
@PutMapping(path = "/events/{id}")
public ResponseEntity updateEvent(@RequestBody PaymentEvent event, @PathVariable Long id) {
return Optional.ofNullable(eventService.updateEvent(id, event))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new IllegalArgumentException("Event update failed"));
}
@GetMapping(path = "/events/{id}")
public ResponseEntity getEvent(@PathVariable Long id) {
return Optional.ofNullable(eventService.getEvent(id))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElse(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}

View File

@@ -1,10 +0,0 @@
package demo.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
public interface EventRepository extends JpaRepository<PaymentEvent, Long> {
Page<PaymentEvent> findPaymentEventsByPaymentId(@Param("paymentId") Long paymentId, Pageable pageable);
}

View File

@@ -1,214 +0,0 @@
package demo.event;
import demo.payment.Payment;
import demo.payment.PaymentController;
import org.apache.log4j.Logger;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.data.domain.PageRequest;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.http.RequestEntity;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* The {@link EventService} provides transactional service methods for {@link PaymentEvent}
* entities of the Payment Service. Payment domain events are generated with a {@link PaymentEventType},
* and action logs are appended to the {@link PaymentEvent}.
*
* @author kbastani
*/
@Service
@CacheConfig(cacheNames = {"payment-events"})
public class EventService {
private final Logger log = Logger.getLogger(EventService.class);
private final EventRepository eventRepository;
private final Source paymentStreamSource;
private final RestTemplate restTemplate;
public EventService(EventRepository eventRepository, Source paymentStreamSource, RestTemplate restTemplate) {
this.eventRepository = eventRepository;
this.paymentStreamSource = paymentStreamSource;
this.restTemplate = restTemplate;
}
/**
* Create a new {@link PaymentEvent} and append it to the event log of the referenced {@link Payment}.
* After the {@link PaymentEvent} has been persisted, send the event to the payment stream. Events can
* be raised as a blocking or non-blocking operation depending on the {@link ConsistencyModel}.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param event is the {@link PaymentEvent} to create
* @param consistencyModel is the desired consistency model for the response
* @return an {@link PaymentEvent} that has been appended to the {@link Payment}'s event log
*/
public PaymentEvent createEvent(Long paymentId, PaymentEvent event, ConsistencyModel consistencyModel) {
event = createEvent(paymentId, event);
return raiseEvent(event, consistencyModel);
}
/**
* Raise an {@link PaymentEvent} that attempts to transition the state of an {@link Payment}.
*
* @param event is an {@link PaymentEvent} that will be raised
* @param consistencyModel is the consistency model for this request
* @return an {@link PaymentEvent} that has been appended to the {@link Payment}'s event log
*/
public PaymentEvent raiseEvent(PaymentEvent event, ConsistencyModel consistencyModel) {
switch (consistencyModel) {
case BASE:
asyncRaiseEvent(event);
break;
case ACID:
event = raiseEvent(event);
break;
}
return event;
}
/**
* Raise an asynchronous {@link PaymentEvent} by sending an AMQP message to the payment stream. Any
* state changes will be applied to the {@link Payment} outside of the current HTTP request context.
* <p>
* Use this operation when a workflow can be processed asynchronously outside of the current HTTP
* request context.
*
* @param event is an {@link PaymentEvent} that will be raised
*/
private void asyncRaiseEvent(PaymentEvent event) {
// Append the payment event to the stream
paymentStreamSource.output()
.send(MessageBuilder
.withPayload(getPaymentEventResource(event))
.build());
}
/**
* Raise a synchronous {@link PaymentEvent} by sending a HTTP request to the payment stream. The response
* is a blocking operation, which ensures that the result of a multi-step workflow will not return until
* the transaction reaches a consistent state.
* <p>
* Use this operation when the result of a workflow must be returned within the current HTTP request context.
*
* @param event is an {@link PaymentEvent} that will be raised
* @return an {@link PaymentEvent} which contains the consistent state of an {@link Payment}
*/
private PaymentEvent raiseEvent(PaymentEvent event) {
try {
// Create a new request entity
RequestEntity<Resource<PaymentEvent>> requestEntity = RequestEntity.post(
URI.create("http://localhost:8081/v1/events"))
.contentType(MediaTypes.HAL_JSON)
.body(getPaymentEventResource(event), Resource.class);
// Update the payment entity's status
Payment result = restTemplate.exchange(requestEntity, Payment.class)
.getBody();
log.info(result);
event.setPayment(result);
} catch (Exception ex) {
log.error(ex);
}
return event;
}
/**
* Create a new {@link PaymentEvent} and publish it to the payment stream.
*
* @param event is the {@link PaymentEvent} to publish to the payment stream
* @return a hypermedia {@link PaymentEvent} resource
*/
@CacheEvict(cacheNames = "payment-events", key = "#id.toString()")
public PaymentEvent createEvent(Long id, PaymentEvent event) {
// Save new event
event = addEvent(event);
Assert.notNull(event, "The event could not be appended to the payment");
return event;
}
/**
* Get an {@link PaymentEvent} with the supplied identifier.
*
* @param id is the unique identifier for the {@link PaymentEvent}
* @return an {@link PaymentEvent}
*/
public Resource<PaymentEvent> getEvent(Long id) {
return getPaymentEventResource(eventRepository.findOne(id));
}
/**
* Update an {@link PaymentEvent} with the supplied identifier.
*
* @param id is the unique identifier for the {@link PaymentEvent}
* @param event is the {@link PaymentEvent} to update
* @return the updated {@link PaymentEvent}
*/
@CacheEvict(cacheNames = "payment-events", key = "#event.getPayment().getPaymentId().toString()")
public PaymentEvent updateEvent(Long id, PaymentEvent event) {
Assert.notNull(id);
Assert.isTrue(event.getId() == null || Objects.equals(id, event.getId()));
return eventRepository.save(event);
}
/**
* Get {@link PaymentEvents} for the supplied {@link Payment} identifier.
*
* @param id is the unique identifier of the {@link Payment}
* @return a list of {@link PaymentEvent} wrapped in a hypermedia {@link PaymentEvents} resource
*/
@Cacheable(cacheNames = "payment-events", key = "#id.toString()")
public List<PaymentEvent> getPaymentEvents(Long id) {
return eventRepository.findPaymentEventsByPaymentId(id,
new PageRequest(0, Integer.MAX_VALUE)).getContent();
}
/**
* Gets a hypermedia resource for a {@link PaymentEvent} entity.
*
* @param event is the {@link PaymentEvent} to enrich with hypermedia
* @return a hypermedia resource for the supplied {@link PaymentEvent} entity
*/
private Resource<PaymentEvent> getPaymentEventResource(PaymentEvent event) {
return new Resource<PaymentEvent>(event, Arrays.asList(
linkTo(PaymentController.class)
.slash("events")
.slash(event.getEventId())
.withSelfRel(),
linkTo(PaymentController.class)
.slash("payments")
.slash(event.getPayment().getPaymentId())
.withRel("payment")));
}
/**
* Add a {@link PaymentEvent} to an {@link Payment} entity.
*
* @param event is the {@link PaymentEvent} to append to an {@link Payment} entity
* @return the newly appended {@link PaymentEvent} entity
*/
@CacheEvict(cacheNames = "payment-events", key = "#event.getPayment().getPaymentId().toString()")
private PaymentEvent addEvent(PaymentEvent event) {
event = eventRepository.saveAndFlush(event);
return event;
}
}

View File

@@ -2,33 +2,40 @@ package demo.event;
import com.fasterxml.jackson.annotation.JsonIgnore;
import demo.payment.Payment;
import demo.domain.BaseEntity;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.*;
/**
* The domain event {@link PaymentEvent} tracks the type and state of events as
* applied to the {@link Payment} domain object. This event resource can be used
* to event source the aggregate state of {@link Payment}.
* The domain event {@link PaymentEvent} tracks the type and state of events as applied to the {@link Payment} domain
* object. This event resource can be used to event source the aggregate state of {@link Payment}.
* <p>
* This event resource also provides a transaction log that can be used to append
* actions to the event.
* This event resource also provides a transaction log that can be used to append actions to the event.
*
* @author kbastani
*/
@Entity
public class PaymentEvent extends BaseEntity {
@EntityListeners(AuditingEntityListener.class)
public class PaymentEvent extends Event<Payment, PaymentEventType, Long> {
@Id
@GeneratedValue
private Long id;
@GeneratedValue(strategy = GenerationType.AUTO)
private Long eventId;
@Enumerated(EnumType.STRING)
private PaymentEventType type;
@OneToOne(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@OneToOne(cascade = CascadeType.MERGE, fetch = FetchType.LAZY)
@JsonIgnore
private Payment payment;
private Payment entity;
@CreatedDate
private Long createdAt;
@LastModifiedDate
private Long lastModified;
public PaymentEvent() {
}
@@ -37,37 +44,59 @@ public class PaymentEvent extends BaseEntity {
this.type = type;
}
@JsonIgnore
public PaymentEvent(PaymentEventType type, Payment entity) {
this.type = type;
this.entity = entity;
}
@Override
public Long getEventId() {
return id;
return eventId;
}
@Override
public void setEventId(Long id) {
this.id = id;
eventId = id;
}
@Override
public PaymentEventType getType() {
return type;
}
@Override
public void setType(PaymentEventType type) {
this.type = type;
}
public Payment getPayment() {
return payment;
}
public void setPayment(Payment payment) {
this.payment = payment;
@Override
public Payment getEntity() {
return entity;
}
@Override
public String toString() {
return "PaymentEvent{" +
"id=" + id +
", type=" + type +
", payment=" + payment +
"} " + super.toString();
public void setEntity(Payment entity) {
this.entity = entity;
}
@Override
public Long getCreatedAt() {
return createdAt;
}
@Override
public void setCreatedAt(Long createdAt) {
this.createdAt = createdAt;
}
@Override
public Long getLastModified() {
return lastModified;
}
@Override
public void setLastModified(Long lastModified) {
this.lastModified = lastModified;
}
}

View File

@@ -0,0 +1,4 @@
package demo.event;
public interface PaymentEventRepository extends EventRepository<PaymentEvent, Long> {
}

View File

@@ -1,74 +0,0 @@
package demo.event;
import com.fasterxml.jackson.annotation.JsonIgnore;
import demo.payment.Payment;
import demo.payment.PaymentController;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.LinkBuilder;
import org.springframework.hateoas.Resources;
import java.io.Serializable;
import java.util.List;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* The {@link PaymentEvents} is a hypermedia collection of {@link PaymentEvent} resources.
*
* @author kbastani
*/
public class PaymentEvents extends Resources<PaymentEvent> implements Serializable {
private Long paymentId;
/**
* Create a new {@link PaymentEvents} hypermedia resources collection for an {@link Payment}.
*
* @param paymentId is the unique identifier for the {@link Payment}
* @param content is the collection of {@link PaymentEvents} attached to the {@link Payment}
*/
public PaymentEvents(Long paymentId, List<PaymentEvent> content) {
this(content);
this.paymentId = paymentId;
// Add hypermedia links to resources parent
add(linkTo(PaymentController.class)
.slash("payments")
.slash(paymentId)
.slash("events")
.withSelfRel(),
linkTo(PaymentController.class)
.slash("payments")
.slash(paymentId)
.withRel("payment"));
LinkBuilder linkBuilder = linkTo(EventController.class);
// Add hypermedia links to each item of the collection
content.stream().parallel().forEach(event -> event.add(
linkBuilder.slash("events")
.slash(event.getEventId())
.withSelfRel()
));
}
/**
* Creates a {@link Resources} instance with the given content and {@link Link}s (optional).
*
* @param content must not be {@literal null}.
* @param links the links to be added to the {@link Resources}.
*/
private PaymentEvents(Iterable<PaymentEvent> content, Link... links) {
super(content, links);
}
/**
* Get the {@link Payment} identifier that the {@link PaymentEvents} apply to.
*
* @return the payment identifier
*/
@JsonIgnore
public Long getPaymentId() {
return paymentId;
}
}

View File

@@ -3,11 +3,14 @@ package demo.payment;
import com.fasterxml.jackson.annotation.JsonIgnore;
import demo.domain.BaseEntity;
import demo.event.PaymentEvent;
import org.springframework.hateoas.Link;
import javax.persistence.*;
import java.util.HashSet;
import java.util.Set;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* The {@link Payment} domain object contains information related to
* a user's payment. The status of an payment is event sourced using
@@ -40,11 +43,11 @@ public class Payment extends BaseEntity {
}
public Payment(Double amount, PaymentMethod paymentMethod) {
this();
this.amount = amount;
this.paymentMethod = paymentMethod;
}
@JsonIgnore
public Long getPaymentId() {
return id;
}
@@ -95,6 +98,17 @@ public class Payment extends BaseEntity {
this.orderId = orderId;
}
/**
* Returns the {@link Link} with a rel of {@link Link#REL_SELF}.
*/
@Override
public Link getId() {
return linkTo(PaymentController.class)
.slash("payments")
.slash(getPaymentId())
.withSelfRel();
}
@Override
public String toString() {
return "Payment{" +

View File

@@ -1,9 +1,7 @@
package demo.payment;
import demo.event.PaymentEvent;
import demo.event.PaymentEvents;
import demo.event.EventController;
import demo.event.EventService;
import demo.event.*;
import org.springframework.hateoas.ExposesResourceFor;
import org.springframework.hateoas.LinkBuilder;
import org.springframework.hateoas.Resource;
import org.springframework.http.HttpStatus;
@@ -17,12 +15,13 @@ import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
@RestController
@RequestMapping("/v1")
@ExposesResourceFor(Payment.class)
public class PaymentController {
private final PaymentService paymentService;
private final EventService eventService;
private final EventService<PaymentEvent, Long> eventService;
public PaymentController(PaymentService paymentService, EventService eventService) {
public PaymentController(PaymentService paymentService, EventService<PaymentEvent, Long> eventService) {
this.paymentService = paymentService;
this.eventService = eventService;
}
@@ -196,14 +195,8 @@ public class PaymentController {
return commandResource;
}
/**
* Get {@link PaymentEvents} for the supplied {@link Payment} identifier.
*
* @param id is the unique identifier of the {@link Payment}
* @return a list of {@link PaymentEvent} wrapped in a hypermedia {@link PaymentEvents} resource
*/
private PaymentEvents getPaymentEventResources(Long id) {
return new PaymentEvents(id, eventService.getPaymentEvents(id));
private Events getPaymentEventResources(Long id) {
return eventService.find(id);
}
/**

View File

@@ -4,6 +4,5 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
public interface PaymentRepository extends JpaRepository<Payment, Long> {
Payment findPaymentByOrderId(@Param("orderId") Long orderId);
}

View File

@@ -1,10 +1,9 @@
package demo.payment;
import demo.event.ConsistencyModel;
import demo.event.EventService;
import demo.util.ConsistencyModel;
import demo.event.PaymentEvent;
import demo.event.PaymentEventType;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
@@ -28,13 +27,11 @@ import java.util.Objects;
public class PaymentService {
private final PaymentRepository paymentRepository;
private final EventService eventService;
private final CacheManager cacheManager;
private final EventService<PaymentEvent, Long> eventService;
public PaymentService(PaymentRepository paymentRepository, EventService eventService, CacheManager cacheManager) {
public PaymentService(PaymentRepository paymentRepository, EventService<PaymentEvent, Long> eventService) {
this.paymentRepository = paymentRepository;
this.eventService = eventService;
this.cacheManager = cacheManager;
}
@CacheEvict(cacheNames = "payments", key = "#payment.getPaymentId().toString()")
@@ -42,18 +39,18 @@ public class PaymentService {
payment = createPayment(payment);
cacheManager.getCache("payments")
.evict(payment.getPaymentId());
// cacheManager.getCache("payments")
// .evict(payment.getPaymentId());
// Trigger the payment creation event
PaymentEvent event = appendEvent(payment.getPaymentId(),
new PaymentEvent(PaymentEventType.PAYMENT_CREATED));
// Attach payment identifier
event.getPayment().setPaymentId(payment.getPaymentId());
event.getEntity().setPaymentId(payment.getPaymentId());
// Return the result
return event.getPayment();
return event.getEntity();
}
/**
@@ -131,7 +128,7 @@ public class PaymentService {
* @return the newly appended {@link PaymentEvent}
*/
public PaymentEvent appendEvent(Long paymentId, PaymentEvent event) {
return appendEvent(paymentId, event, ConsistencyModel.ACID);
return appendEvent(paymentId, event, ConsistencyModel.BASE);
}
/**
@@ -142,13 +139,27 @@ public class PaymentService {
* @return the newly appended {@link PaymentEvent}
*/
public PaymentEvent appendEvent(Long paymentId, PaymentEvent event, ConsistencyModel consistencyModel) {
// Get the entity
Payment payment = getPayment(paymentId);
Assert.notNull(payment, "The payment with the supplied id does not exist");
event.setPayment(payment);
event = eventService.createEvent(paymentId, event);
event.setEntity(payment);
event = eventService.save(paymentId, event);
payment.getEvents().add(event);
paymentRepository.saveAndFlush(payment);
eventService.raiseEvent(event, consistencyModel);
// Raise the event using the supplied consistency model
switch (consistencyModel) {
case BASE:
eventService.sendAsync(event);
break;
case ACID:
event = eventService.send(event);
break;
}
return event;
}

View File

@@ -1,4 +1,4 @@
package demo.event;
package demo.util;
public enum ConsistencyModel {
BASE,

View File

@@ -14,4 +14,4 @@ spring:
host: localhost
port: 6379
server:
port: 8080
port: 8082

View File

@@ -0,0 +1,29 @@
package demo.payment;
import demo.event.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;
@RunWith(SpringRunner.class)
@SpringBootTest
public class EventServiceTests {
@Autowired
private PaymentRepository paymentRepository;
@Autowired
private EventService<PaymentEvent, Long> eventService;
@Test
public void getPaymentReturnsPayment() throws Exception {
Payment payment = new Payment(11.0, PaymentMethod.CREDIT_CARD);
payment = paymentRepository.saveAndFlush(payment);
eventService.save(new PaymentEvent(PaymentEventType.PAYMENT_CREATED, payment));
Events events = eventService.find(payment.getPaymentId());
Assert.notNull(events);
}
}

View File

@@ -1,6 +1,7 @@
package demo.payment;
import demo.event.EventService;
import demo.event.PaymentEvent;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -26,7 +27,7 @@ public class PaymentControllerTest {
private PaymentService paymentService;
@MockBean
private EventService eventService;
private EventService<PaymentEvent, Long> eventService;
@Test
public void getUserPaymentResourceShouldReturnPayment() throws Exception {

View File

@@ -1,11 +1,11 @@
package demo.payment;
import demo.event.EventService;
import demo.event.PaymentEvent;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cache.CacheManager;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -15,19 +15,16 @@ import static org.mockito.BDDMockito.given;
public class PaymentServiceTests {
@MockBean
private EventService eventService;
private EventService<PaymentEvent, Long> eventService;
@MockBean
private PaymentRepository paymentRepository;
@MockBean
private CacheManager cacheManager;
private PaymentService paymentService;
@Before
public void before() {
paymentService = new PaymentService(paymentRepository, eventService, cacheManager);
paymentService = new PaymentService(paymentRepository, eventService);
}
@Test

View File

@@ -34,7 +34,7 @@ public class PaymentEventStream {
* @param paymentEvent is the {@link Payment} domain event to process
*/
@StreamListener(Sink.INPUT)
public void streamListerner(PaymentEvent paymentEvent) {
public void streamListener(PaymentEvent paymentEvent) {
eventService.apply(paymentEvent);
}
}

View File

@@ -4,6 +4,7 @@ import demo.domain.BaseEntity;
public class Payment extends BaseEntity {
private Long paymentId;
private Double amount;
private PaymentMethod paymentMethod;
private PaymentStatus status;
@@ -11,6 +12,14 @@ public class Payment extends BaseEntity {
public Payment() {
}
public Long getPaymentId() {
return paymentId;
}
public void setPaymentId(Long paymentId) {
this.paymentId = paymentId;
}
public PaymentStatus getStatus() {
return status;
}

View File

@@ -14,7 +14,7 @@ spring:
consumer:
durableSubscription: true
server:
port: 8081
port: 8083
amazon:
aws:
access-key-id: replace

View File

@@ -17,7 +17,14 @@
<relativePath>../</relativePath>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<modules>
<module>spring-boot-starter-aws-lambda</module>
<module>spring-boot-starter-data-events</module>
</modules>
</project>

View File

@@ -0,0 +1,121 @@
# Spring Boot Starter Data Events
This starter project provides auto-configuration support classes for building event-driven Spring Data applications.
* Uses a familiar _Spring Data_ repository pattern for creating an `EventRepository<T, ID>`
* The `EventRepository` provides trait specific features for managing an event log that is attached to an existing domain entity
* Provides a set of event abstractions that can be extended to use any Spring Data repository (JPA, Mongo, Neo4j, Redis..)
* Provides an `EventService` bean that can be used to publish events to a _Spring Cloud Stream_ output channel
## Usage
In your Spring Boot project, add the starter project dependency to your class path. For Maven, add the following dependency to your `pom.xml`.
```xml
<dependencies>
<dependency>
<groupId>org.kbastani</groupId>
<artifactId>spring-boot-starter-data-events</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
...
</dependencies>
```
Next, configure your _Spring Cloud Stream_ output bindings. Add the following snippet to the `application.properties|yaml` file of your Spring Boot application. Replace the destination value with the name of your message channel for the event stream.
```yaml
spring:
cloud:
stream:
bindings:
output:
destination: payment
```
Next, you'll need to create a custom `Event` entity. The snippet below extends the provided `Event<T, E, ID>` interface. This example uses Spring Data JPA, but you can use any Spring Data project for implementing your event entities.
```java
@Entity
@EntityListeners(AuditingEntityListener.class)
public class PaymentEvent extends Event<Payment, PaymentEventType, Long> {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long eventId;
@Enumerated(EnumType.STRING)
private PaymentEventType type;
@OneToOne(cascade = CascadeType.MERGE, fetch = FetchType.LAZY)
@JsonIgnore
private Payment entity;
@CreatedDate
private Long createdAt;
@LastModifiedDate
private Long lastModified;
...
}
```
To start managing events you'll need to extend the `EventRepository<T, ID>` interface. The `PaymentEvent` is the JPA entity we defined in the last snippet.
```java
public interface PaymentEventRepository extends EventRepository<PaymentEvent, Long> {
}
```
That's it! You're ready to start sending domain events to the stream binding's output channel using the auto-configured `EventService`. The example snippet below shows how to create and append a new `PaymentEvent` to a `Payment` entity before publishing the event over AMQP to the configured event stream's output channel.
```java
@Service
public class PaymentService {
private final EventService<PaymentEvent, Long> eventService;
public PaymentController(EventService<PaymentEvent, Long> eventService) {
this.eventService = eventService;
}
public PaymentEvent appendCreateEvent(Payment payment) {
PaymentEvent paymentEvent = new PaymentEvent(PaymentEventType.PAYMENT_CREATED);
paymentEvent.setEntity(payment);
paymentEvent = eventService.save(event);
// Send the event to the Spring Cloud stream binding
eventService.sendAsync(paymentEvent);
}
...
}
```
A default `EventController` is also provided with the starter project. The `EventController` provides a basic REST API with hypermedia resource support for managing the `Event` log of a domain entity over HTTP. The following cURL snippet gets the `PaymentEvent` we created in the last example from the `EventController`.
```bash
curl -X GET "http://localhost:8082/v1/events/1"
```
Response:
```json
{
"eventId": 1,
"type": "PAYMENT_CREATED",
"createdAt": 1482749707006,
"lastModified": 1482749707006,
"_links": {
"self": {
"href": "http://localhost:8082/v1/events/1"
},
"payment": {
"href": "http://localhost:8082/v1/payments/1"
}
}
}
```
In the snippet above we can see the `EventController` responded with a `hal+json` formatted resource. Since the `PaymentEvent` has a reference to the `Payment` entity, we see a _payment_ link is available to fetch the related resource.

View File

@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-boot-starter-data-events</artifactId>
<packaging>jar</packaging>
<parent>
<groupId>org.kbastani</groupId>
<artifactId>spring-boot-starters</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,69 @@
package demo.event;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.ResourceSupport;
import org.springframework.hateoas.core.EvoInflectorRelProvider;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* Abstract implementation of the {@link Event} entity.
*
* @param <T> is the entity this {@link Event} applies to
* @param <E> is the type of event, typically an {@link Enum}
* @param <ID> is the unique identifier type used to persist the {@link Event}
* @author Kenny Bastani
* @see org.springframework.stereotype.Repository
* @see ResourceSupport
*/
public abstract class Event<T extends ResourceSupport, E, ID extends Serializable> extends ResourceSupport {
public Event() {
}
public abstract ID getEventId();
public abstract void setEventId(ID eventId);
public abstract E getType();
public abstract void setType(E type);
public abstract T getEntity();
public abstract void setEntity(T entity);
public abstract Long getCreatedAt();
public abstract void setCreatedAt(Long createdAt);
public abstract Long getLastModified();
public abstract void setLastModified(Long lastModified);
@Override
@SuppressWarnings("unchecked")
public List<Link> getLinks() {
List<Link> links = super.getLinks().stream().collect(Collectors.toList());
links.add(getId());
Class<T> clazz = (Class<T>) ((ParameterizedTypeImpl)
this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
links.add(getEntity().getId().withRel(new EvoInflectorRelProvider().getItemResourceRelFor(clazz)));
return links;
}
@Override
public String toString() {
return String.format("links: %s", getLinks().toString());
}
@Override
public Link getId() {
return linkTo(EventController.class).slash("events").slash(getEventId()).withSelfRel();
}
}

View File

@@ -0,0 +1,35 @@
package demo.event;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
/**
* This class auto-configures a {@link EventServiceImpl} bean.
*
* @author Kenny Bastani
*/
@Configuration
@ConditionalOnClass({ EventRepository.class, Source.class, RestTemplate.class })
@EnableConfigurationProperties(EventProperties.class)
public class EventAutoConfig {
private EventRepository eventRepository;
private Source source;
private RestTemplate restTemplate;
public EventAutoConfig(EventRepository eventRepository, Source source, RestTemplate restTemplate) {
this.eventRepository = eventRepository;
this.source = source;
this.restTemplate = restTemplate;
}
@SuppressWarnings("unchecked")
@Bean
public EventService eventService() {
return new EventServiceImpl(eventRepository, source, restTemplate);
}
}

View File

@@ -0,0 +1,45 @@
package demo.event;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.io.Serializable;
import java.util.Optional;
/**
* The default controller for managing {@link Event} entities.
*
* @author Kenny Bastani
*/
@RestController
@RequestMapping("/v1")
public class EventController<T extends Event, ID extends Serializable> {
private final EventService<T, Long> eventService;
public EventController(EventService<T, Long> eventService) {
this.eventService = eventService;
}
@PostMapping(path = "/events/{id}")
public ResponseEntity createEvent(@RequestBody T event, @PathVariable Long id) {
return Optional.ofNullable(eventService.save(id, event))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new RuntimeException("Event creation failed"));
}
@PutMapping(path = "/events/{id}")
public ResponseEntity updateEvent(@RequestBody T event, @PathVariable Long id) {
return Optional.ofNullable(eventService.save(id, event))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElseThrow(() -> new RuntimeException("Event update failed"));
}
@GetMapping(path = "/events/{id}")
public ResponseEntity getEvent(@PathVariable Long id) {
return Optional.ofNullable(eventService.findOne(id))
.map(e -> new ResponseEntity<>(e, HttpStatus.OK))
.orElse(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}

View File

@@ -0,0 +1,25 @@
package demo.event;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "event")
public class EventProperties {
@NestedConfigurationProperty
private Props props;
public Props getProps() {
return props;
}
public void setProps(Props props) {
this.props = props;
}
public static class Props {
// TODO: Implement
}
}

View File

@@ -0,0 +1,21 @@
package demo.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.query.Param;
import java.io.Serializable;
/**
* Extension of {@link PagingAndSortingRepository} to provide additional support for persisting event logs to entities.
*
* @author Kenny Bastani
* @see Event
* @see EventService
*/
@NoRepositoryBean
public interface EventRepository<E extends Event, ID extends Serializable> extends PagingAndSortingRepository<E, ID> {
Page<E> findEventsByEntityId(@Param("entityId") ID entityId, Pageable pageable);
}

View File

@@ -0,0 +1,72 @@
package demo.event;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.ResourceSupport;
import java.io.Serializable;
/**
* Service interface for managing {@link Event} entities.
*
* @author Kenny Bastani
* @see Event
* @see Events
* @see EventServiceImpl
*/
public interface EventService<T extends Event, ID extends Serializable> {
/**
* Raises a synchronous domain event. An {@link Event} will be applied to an entity through a chain of HTTP
* requests/responses.
*
* @param event
* @param links
* @return the applied {@link Event}
*/
<E extends ResourceSupport, S extends T> S send(S event, Link... links);
/**
* Raises an asynchronous domain event. An {@link Event} will be applied to an entity through a chain of AMQP
* messages.
*
* @param event
* @param links
* @return a flag indicating if the {@link Event} message was sent successfully
*/
<S extends T> Boolean sendAsync(S event, Link... links);
/**
* Saves a given event entity. Use the returned instance for further operations as the save operation might have
* changed the entity instance completely.
*
* @param event
* @return the saved event entity
*/
<S extends T> S save(S event);
/**
* Saves a given event entity. Use the returned instance for further operations as the save operation might have
* changed the entity instance completely. The {@link ID} parameter is the unique {@link Event} identifier.
*
* @param id
* @param event
* @return the saved event entity
*/
<S extends T> S save(ID id, S event);
/**
* Retrieves an {@link Event} entity by its id.
*
* @param id
* @return the {@link Event} entity with the given id or {@literal null} if none found
*/
<EID extends ID> T findOne(EID id);
/**
* Retrieves an entity's {@link Event}s by its id.
*
* @param entityId
* @return a {@link Events} containing a collection of {@link Event}s
*/
<E extends Events> E find(ID entityId);
}

View File

@@ -0,0 +1,80 @@
package demo.event;
import org.apache.log4j.Logger;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.data.domain.PageRequest;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.hateoas.ResourceSupport;
import org.springframework.http.RequestEntity;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.client.RestTemplate;
import java.io.Serializable;
import java.net.URI;
/**
* Event service implementation of {@link EventService} for managing {@link Event} entities.
*
* @author Kenny Bastani
* @see Event
* @see Events
* @see EventService
*/
@SuppressWarnings("unchecked")
class EventServiceImpl<T extends Event, ID extends Serializable> implements EventService<T, ID> {
private static final Logger log = Logger.getLogger(EventServiceImpl.class);
private static final String EVENT_PROCESSOR_URL = "http://localhost:8083/v1/events";
private final EventRepository<T, ID> eventRepository;
private final Source eventStream;
private final RestTemplate restTemplate;
EventServiceImpl(EventRepository<T, ID> eventRepository, Source eventStream, RestTemplate restTemplate) {
this.eventRepository = eventRepository;
this.eventStream = eventStream;
this.restTemplate = restTemplate;
}
public <E extends ResourceSupport, S extends T> S send(S event, Link... links) {
// Assemble request to the event stream processor
RequestEntity<Resource<T>> requestEntity = RequestEntity.post(URI.create(EVENT_PROCESSOR_URL))
.contentType(MediaTypes.HAL_JSON).body(new Resource<T>(event), Resource.class);
try {
// Send the event to the event stream processor
E entity = (E) restTemplate.exchange(requestEntity, event.getEntity().getClass()).getBody();
// Set the applied entity reference to the event
event.setEntity(entity);
} catch (Exception ex) {
log.error(ex);
}
return event;
}
public <S extends T> Boolean sendAsync(S event, Link... links) {
return eventStream.output().send(MessageBuilder.withPayload(event).build());
}
public <S extends T> S save(S event) {
event = eventRepository.save(event);
return event;
}
public <S extends T> S save(ID id, S event) {
event.setEventId(id);
return save(event);
}
public <S extends ID> T findOne(S id) {
return eventRepository.findOne(id);
}
public <E extends Events> E find(ID entityId) {
return (E) new Events(entityId, eventRepository.findEventsByEntityId(entityId,
new PageRequest(0, Integer.MAX_VALUE)).getContent());
}
}

View File

@@ -0,0 +1,33 @@
package demo.event;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.ResourceSupport;
import org.springframework.hateoas.Resources;
import java.io.Serializable;
import java.util.List;
/**
* General helper to easily create a wrapper for a collection of {@link Event} entities.
*
* @author Kenny Bastani
*/
public class Events<T extends ResourceSupport, E, ID extends Serializable> extends Resources<Event<T, E, ID>> {
private ID entityId;
public Events(ID entityId, List<Event<T, E, ID>> content) {
this(content);
this.entityId = entityId;
}
public Events(Iterable<Event<T, E, ID>> content, Link... links) {
super(content, links);
}
@JsonIgnore
public ID getEntityId() {
return entityId;
}
}

View File

@@ -0,0 +1,9 @@
{
"groups": [
{
"name": "event",
"type": "demo.event.EventProperties",
"sourceType": "demo.event.EventProperties"
}
]
}

View File

@@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=demo.event.EventAutoConfig

View File

@@ -0,0 +1,39 @@
package demo.event;
import org.junit.After;
import org.junit.Test;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import static junit.framework.TestCase.assertNotNull;
public class ConfigurationTest {
private AnnotationConfigApplicationContext context;
@After
public void tearDown() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void contextLoads() {
load(EmptyConfiguration.class);
assertNotNull(context);
}
@Configuration
static class EmptyConfiguration {
}
private void load(Class<?> config, String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.register(config);
applicationContext.refresh();
this.context = applicationContext;
}
}