Cleaning up

This commit is contained in:
Kenny Bastani
2016-12-20 17:30:24 -08:00
parent 878e40a2e5
commit 2af2788bc7
154 changed files with 2507 additions and 451 deletions

View File

@@ -0,0 +1,99 @@
# Account Microservice: Worker
The `account-worker` application is a event stream processing application that listens for `Account` domain events as AMQP messages. The domain events that are generated by the `account-web` application are processed in this module.
The worker is responsible for durable transaction processing for work flows that are required to coordinate asynchronously with applications residing in other domain contexts.
The worker is also responsible for automatically remediating state changes in a distributed transactions that encountered a partial failure. The most important goal of the worker module is to keep the state of the system consistent through automated means — to guarantee eventual consistency.
# Usage
The `account-worker` is a _Spring Cloud Stream_ application that drives the state of the `Account` domain resource. The application is completely stateless because it uses hypermedia to drive the state of the application. At the heart of the `account-worker` is a configurable state machine that describes how domain events trigger state transitions on an `Account` resource.
The code snippet below describes a single state machine transition.
```java
// Describe state machine transitions for accounts
transitions.withExternal()
.source(AccountStatus.ACCOUNT_CREATED)
.target(AccountStatus.ACCOUNT_PENDING)
.event(AccountEventType.ACCOUNT_CREATED)
.action(createAccount())
```
The `/src/main/java/demo/config/StateMachineConfig.java` class configures a state machine using the _Spring Statemachine_ project. The snippet above describes the first transition of a state machine for the `Account` resource. Here we see that the source state is `ACCOUNT_CREATED` and the target state is `ACCOUNT_PENDING`. We also see that the state transition is triggered by an `ACCOUNT_CREATED` event. Finally, we see that an action method named `createAccount` is mapped to this state transition.
Each time an `AccountEvent` is received by the stream listener in the `AccountEventStream` class, a state machine is replicated by applying the ordered history of previous account events—we call this history the _Event Log_. Since each `AccountEvent` provides hypermedia links for retrieving the context of the attached `Account` resource, we can traverse to an account's event log and use a technique called _Event Sourcing_ to aggregate the current state of the `Account`.
### Functions
As we saw earlier in the configuration of state machine transitions, an action can be mapped to a function. In the `StateMachineConfig` class we'll find multiple bean definitions that correspond to transition actions. For example, earlier we saw the method triggered for a transition triggered by an `ACCOUNT_CREATED` event that mapped to an action named `createAccount`. Let's see the definition of that method.
```java
@Bean
public Action<AccountStatus, AccountEventType> createAccount() {
return context -> applyEvent(context,
new CreateAccountFunction(context));
}
```
The `createAccount` method returns an executable action that passes the state context to a method named `applyEvent`. The `applyEvent` method is a step function that replicates the current state of an `Account` resource.
Since a state machine is replicated in-memory each time an `AccountEvent` is processed, we'll need to ensure that actions are not executed against the same resource multiple times during replication. The `applyEvent` method will only execute the supplied function—in this case `CreateAccountFunction`—if the state machine is finished replicating.
When the state machine is finished replicating, it will attempt to apply the `AccountEvent` for this context to an action mapped function. We can find each of the function classes for state transitions inside the `/src/main/java/demo/function` package.
```bash
.
├── /src/main/java/demo/function
├── AccountFunction.java
├── ActivateAccountFunction.java
├── ArchiveAccountFunction.java
├── ConfirmAccountFunction.java
├── CreateAccountFunction.java
├── SuspendAccountFunction.java
├── UnarchiveAccountFunction.java
└── UnsuspendAccountFunction.java
```
The `AccountFunction` abstract class is extended by each of the other classes inside of the `function` package. Since we're using hypermedia to drive the state of the application, each function is immutable and stateless. In this reference application we can either define the task inside an `AccountFunction` class, or we can use a `Consumer<T>`, which is a Java 8 lambda expression, to apply an `AccountEvent` to an `Account` resource.
Let's go back to the `StateMachineConfig` class and look at an example of an action mapped function that uses a lambda expression.
```java
@Bean
public Action<AccountStatus, AccountEventType> confirmAccount() {
return context -> {
// Map the account action to a Java 8 lambda function
ConfirmAccountFunction accountFunction;
accountFunction = new ConfirmAccountFunction(context, event -> {
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
// Follow the command resource to activate the account
Account account = traverson.follow("commands")
.follow("activate")
.toEntity(Account.class)
.getBody();
});
applyEvent(context, accountFunction);
};
}
```
The snippet above shows the definition of the `confirmAccount` action. Here we see a stateless function that uses a `Traverson` client to follow hypermedia links of the `AccountEvent` resource in a workflow that activates the `Account`. Since the embedded hypermedia links provide the full context of an `AccountEvent` resource, we can implement this function from anywhere—_even as a serverless function_!
#### Serverless Functions
A _serverless_ function is a unit of cloud deployment in a PaaS (Platform-as-a-Service) that is composed of a stateless function. Serverless was first popularized by _Amazon Web Services_ as a part of their _AWS Lambda_ compute platform.
Serverless—which is also referred to as FaaS (Function-as-a-Service)—allows you to deploy code as functions without needing to setup or manage application servers or containers.
With a serverless function, a cloud platform will take care of when and where a function is scheduled and executed. A cloud platform is also opinionated about the compute resources required to execute and/or scale a function.
In this reference architecture we have two units of deployment per microservice, a web and worker application. Each of the workloads for the deployments are designed to operate in an immutable Linux container. Since the state machine actions that are mapped to the `AccountFunction` classes in the `account-worker` application are both immutable and stateless, we can choose to instead map each of these actions to a serverless function.

View File

@@ -0,0 +1,10 @@
name: account-worker
memory: 1024M
instances: 1
path: ./target/account-worker-0.0.1-SNAPSHOT.jar
buildpack: java_buildpack
services:
- rabbit-events
disk_quota: 1024M
host: account-event-worker
domain: cfapps.io

View File

@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>account-worker</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>account-worker</name>
<parent>
<groupId>org.kbastani</groupId>
<artifactId>account</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>${spring-statemachine-core.version}</version>
</dependency>
<dependency>
<groupId>org.kbastani</groupId>
<artifactId>spring-boot-starter-aws-lambda</artifactId>
<version>${spring-boot-starter-aws-lambda.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk-sts.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>${json-path.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>${aws-java-sdk-sts.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,16 @@
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableHypermediaSupport;
import org.springframework.hateoas.config.EnableHypermediaSupport.HypermediaType;
@SpringBootApplication
@EnableHypermediaSupport(type = {HypermediaType.HAL})
public class AccountStreamModuleApplication {
public static void main(String[] args) {
SpringApplication.run(AccountStreamModuleApplication.class, args);
}
}

View File

@@ -0,0 +1,54 @@
package demo.account;
import demo.domain.BaseEntity;
import demo.event.AccountEvent;
/**
* The {@link Account} domain object contains information related to
* a user's account. The status of an account is event sourced using
* events logged to the {@link AccountEvent} collection attached to
* this resource.
*
* @author kbastani
*/
public class Account extends BaseEntity {
private String firstName;
private String lastName;
private String email;
private AccountStatus status;
public Account() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public AccountStatus getStatus() {
return status;
}
public void setStatus(AccountStatus status) {
this.status = status;
}
}

View File

@@ -0,0 +1,17 @@
package demo.account;
/**
* The {@link AccountStatus} describes the state of an {@link Account}.
* The aggregate state of a {@link Account} is sourced from attached domain
* events in the form of {@link demo.event.AccountEvent}.
*
* @author kbastani
*/
public enum AccountStatus {
ACCOUNT_CREATED,
ACCOUNT_PENDING,
ACCOUNT_CONFIRMED,
ACCOUNT_ACTIVE,
ACCOUNT_SUSPENDED,
ACCOUNT_ARCHIVED
}

View File

@@ -0,0 +1,25 @@
package demo.config;
import amazon.aws.AWSLambdaConfigurerAdapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import demo.function.LambdaFunctions;
import demo.util.LambdaUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Configuration
@Profile("cloud")
public class AwsLambdaConfig {
@Bean
public LambdaFunctions lambdaInvoker(AWSLambdaConfigurerAdapter configurerAdapter) {
return configurerAdapter
.getFunctionInstance(LambdaFunctions.class);
}
@Bean
public LambdaUtil lambdaUtil(ObjectMapper objectMapper) {
return new LambdaUtil(objectMapper);
}
}

View File

@@ -0,0 +1,314 @@
package demo.config;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import demo.function.*;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.action.Action;
import org.springframework.statemachine.config.EnableStateMachineFactory;
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import java.net.URI;
import java.util.EnumSet;
/**
* A configuration adapter for describing a {@link StateMachine} factory that maps actions to functional
* expressions. Actions are executed during transitions between a source state and a target state.
* <p>
* A state machine provides a robust declarative language for describing the state of an {@link Account}
* resource given a sequence of ordered {@link demo.event.AccountEvents}. When an event is received
* in {@link demo.event.AccountEventStream}, an in-memory state machine is fully replicated given the
* {@link demo.event.AccountEvents} attached to an {@link Account} resource.
*
* @author kbastani
*/
@Configuration
@EnableStateMachineFactory
public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<AccountStatus, AccountEventType> {
final private Logger log = Logger.getLogger(StateMachineConfig.class);
/**
* Configures the initial conditions of a new in-memory {@link StateMachine} for {@link Account}.
*
* @param states is the {@link StateMachineStateConfigurer} used to describe the initial condition
*/
@Override
public void configure(StateMachineStateConfigurer<AccountStatus, AccountEventType> states) {
try {
// Describe the initial condition of the account state machine
states.withStates()
.initial(AccountStatus.ACCOUNT_CREATED)
.states(EnumSet.allOf(AccountStatus.class));
} catch (Exception e) {
throw new RuntimeException("State machine configuration failed", e);
}
}
/**
* Configures the {@link StateMachine} that describes how {@link AccountEventType} drives the state
* of an {@link Account}. Events are applied as transitions from a source {@link AccountStatus} to
* a target {@link AccountStatus}. An {@link Action} is attached to each transition, which maps to a
* function that is executed in the context of an {@link AccountEvent}.
*
* @param transitions is the {@link StateMachineTransitionConfigurer} used to describe state transitions
*/
@Override
public void configure(StateMachineTransitionConfigurer<AccountStatus, AccountEventType> transitions) {
try {
// Describe state machine transitions for accounts
transitions.withExternal()
.source(AccountStatus.ACCOUNT_CREATED)
.target(AccountStatus.ACCOUNT_PENDING)
.event(AccountEventType.ACCOUNT_CREATED)
.action(createAccount())
.and()
.withExternal()
.source(AccountStatus.ACCOUNT_PENDING)
.target(AccountStatus.ACCOUNT_CONFIRMED)
.event(AccountEventType.ACCOUNT_CONFIRMED)
.action(confirmAccount())
.and()
.withExternal()
.source(AccountStatus.ACCOUNT_CONFIRMED)
.target(AccountStatus.ACCOUNT_ACTIVE)
.event(AccountEventType.ACCOUNT_ACTIVATED)
.action(activateAccount())
.and()
.withExternal()
.source(AccountStatus.ACCOUNT_ACTIVE)
.target(AccountStatus.ACCOUNT_ARCHIVED)
.event(AccountEventType.ACCOUNT_ARCHIVED)
.action(archiveAccount())
.and()
.withExternal()
.source(AccountStatus.ACCOUNT_ACTIVE)
.target(AccountStatus.ACCOUNT_SUSPENDED)
.event(AccountEventType.ACCOUNT_SUSPENDED)
.action(suspendAccount())
.and()
.withExternal()
.source(AccountStatus.ACCOUNT_ARCHIVED)
.target(AccountStatus.ACCOUNT_ACTIVE)
.event(AccountEventType.ACCOUNT_ACTIVATED)
.action(unarchiveAccount())
.and()
.withExternal()
.source(AccountStatus.ACCOUNT_SUSPENDED)
.target(AccountStatus.ACCOUNT_ACTIVE)
.event(AccountEventType.ACCOUNT_ACTIVATED)
.action(unsuspendAccount());
} catch (Exception e) {
throw new RuntimeException("Could not configure state machine transitions", e);
}
}
/**
* Functions are mapped to actions that are triggered during the replication of a state machine. Functions
* should only be executed after the state machine has completed replication. This method checks the state
* context of the machine for an {@link AccountEvent}, which signals that the state machine is finished
* replication.
* <p>
* The {@link AccountFunction} argument is only applied if an {@link AccountEvent} is provided as a
* message header in the {@link StateContext}.
*
* @param context is the state machine context that may include an {@link AccountEvent}
* @param accountFunction is the account function to apply after the state machine has completed replication
* @return an {@link AccountEvent} only if this event has not yet been processed, otherwise returns null
*/
private AccountEvent applyEvent(StateContext<AccountStatus, AccountEventType> context,
AccountFunction accountFunction) {
AccountEvent accountEvent = null;
// Log out the progress of the state machine replication
log.info("Replicate event: " + context.getMessage().getPayload());
// The machine is finished replicating when an AccountEvent is found in the message header
if (context.getMessageHeader("event") != null) {
accountEvent = (AccountEvent) context.getMessageHeader("event");
log.info("State machine replicated: " + accountEvent.getType());
// Apply the provided function to the AccountEvent
accountFunction.apply(accountEvent);
}
return accountEvent;
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_CREATED
* to ACCOUNT_PENDING.
* <p>
* The body of this method shows an example of how to map an {@link AccountFunction} to a function
* defined in a class method of {@link CreateAccount}.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> createAccount() {
return context -> applyEvent(context, new CreateAccount(context));
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_PENDING
* to ACCOUNT_CONFIRMED.
* <p>
* The body of this method shows an example of how to use a {@link java.util.function.Consumer} Java 8
* lambda function instead of the class method definition that was shown in {@link CreateAccount}.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> confirmAccount() {
return context -> {
// Map the account action to a Java 8 lambda function
ConfirmAccount accountFunction;
accountFunction = new ConfirmAccount(context, event -> {
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
// Follow the command resource to activate the account
Account account = traverson.follow("commands")
.follow("activate")
.toEntity(Account.class)
.getBody();
log.info(event.getType() + ": " + event.getLink("account").getHref());
return account;
});
applyEvent(context, accountFunction);
};
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_CONFIRMED
* to ACCOUNT_ACTIVE.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> activateAccount() {
return context -> applyEvent(context,
new ActivateAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_ACTIVE
* to ACCOUNT_ARCHIVED.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> archiveAccount() {
return context -> applyEvent(context,
new ArchiveAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_ACTIVE
* to ACCOUNT_SUSPENDED.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> suspendAccount() {
return context -> applyEvent(context,
new SuspendAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_ARCHIVED
* to ACCOUNT_ACTIVE.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> unarchiveAccount() {
return context -> applyEvent(context,
new UnarchiveAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
* The action that is triggered in response to an account transitioning from ACCOUNT_SUSPENDED
* to ACCOUNT_ACTIVE.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> unsuspendAccount() {
return context -> applyEvent(context,
new UnsuspendAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
}

View File

@@ -0,0 +1,36 @@
package demo.domain;
import org.springframework.hateoas.ResourceSupport;
public class BaseEntity extends ResourceSupport {
private Long createdAt;
private Long lastModified;
public BaseEntity() {
}
public Long getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Long createdAt) {
this.createdAt = createdAt;
}
public Long getLastModified() {
return lastModified;
}
public void setLastModified(Long lastModified) {
this.lastModified = lastModified;
}
@Override
public String toString() {
return "BaseEntity{" +
"createdAt=" + createdAt +
", lastModified=" + lastModified +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,41 @@
package demo.event;
import demo.account.Account;
import demo.domain.BaseEntity;
/**
* The domain event {@link AccountEvent} tracks the type and state of events as
* applied to the {@link Account} domain object. This event resource can be used
* to event source the aggregate state of {@link Account}.
* <p>
* This event resource also provides a transaction log that can be used to append
* actions to the event.
*
* @author kbastani
*/
public class AccountEvent extends BaseEntity {
private AccountEventType type;
public AccountEvent() {
}
public AccountEvent(AccountEventType type) {
this.type = type;
}
public AccountEventType getType() {
return type;
}
public void setType(AccountEventType type) {
this.type = type;
}
@Override
public String toString() {
return "AccountEvent{" +
"type=" + type +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,40 @@
package demo.event;
import demo.account.Account;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Profile;
import org.springframework.statemachine.StateMachine;
/**
* The {@link AccountEventStream} monitors for a variety of {@link AccountEvent} domain
* events for an {@link Account}.
*
* @author kbastani
*/
@EnableAutoConfiguration
@EnableBinding(Sink.class)
@Profile({ "cloud", "development" })
public class AccountEventStream {
private EventService eventService;
public AccountEventStream(EventService eventService) {
this.eventService = eventService;
}
/**
* Listens to a stream of incoming {@link AccountEvent} messages. For each
* new message received, replicate an in-memory {@link StateMachine} that
* reproduces the current state of the {@link Account} resource that is the
* subject of the {@link AccountEvent}.
*
* @param accountEvent is the {@link Account} domain event to process
*/
@StreamListener(Sink.INPUT)
public void streamListerner(AccountEvent accountEvent) {
eventService.apply(accountEvent);
}
}

View File

@@ -0,0 +1,18 @@
package demo.event;
import demo.account.Account;
import demo.account.AccountStatus;
/**
* The {@link AccountEventType} represents a collection of possible events that describe
* state transitions of {@link AccountStatus} on the {@link Account} aggregate.
*
* @author kbastani
*/
public enum AccountEventType {
ACCOUNT_CREATED,
ACCOUNT_CONFIRMED,
ACCOUNT_ACTIVATED,
ACCOUNT_SUSPENDED,
ACCOUNT_ARCHIVED
}

View File

@@ -0,0 +1,11 @@
package demo.event;
import org.springframework.hateoas.Resources;
/**
* The {@link AccountEvents} is a hypermedia collection of {@link AccountEvent} resources.
*
* @author kbastani
*/
public class AccountEvents extends Resources<AccountEvent> {
}

View File

@@ -0,0 +1,28 @@
package demo.event;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Optional;
@RestController
@RequestMapping("/v1")
public class EventController {
private EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
@PostMapping(path = "/events")
public ResponseEntity handleEvent(@RequestBody AccountEvent event) {
return Optional.ofNullable(eventService.apply(event))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new RuntimeException("Apply event failed"));
}
}

View File

@@ -0,0 +1,82 @@
package demo.event;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.state.StateMachineService;
import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@Service
public class EventService {
final private Logger log = Logger.getLogger(EventService.class);
final private StateMachineService stateMachineService;
public EventService(StateMachineService stateMachineService) {
this.stateMachineService = stateMachineService;
}
public Account apply(AccountEvent accountEvent) {
Account result;
log.info("Account event received: " + accountEvent.getLink("self").getHref());
// Generate a state machine for computing the state of the account resource
StateMachine<AccountStatus, AccountEventType> stateMachine =
stateMachineService.getStateMachine();
// Follow the hypermedia link to fetch the attached account
Traverson traverson = new Traverson(
URI.create(accountEvent.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
// Get the event log for the attached account resource
AccountEvents events = traverson.follow("events")
.toEntity(AccountEvents.class)
.getBody();
// Prepare account event message headers
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("event", accountEvent);
// Replicate the current state of the account resource
events.getContent()
.stream()
.sorted((a1, a2) -> a1.getCreatedAt().compareTo(a2.getCreatedAt()))
.forEach(e -> {
MessageHeaders headers = new MessageHeaders(null);
// Check to see if this is the current event
if (e.getLink("self").equals(accountEvent.getLink("self"))) {
headers = new MessageHeaders(headerMap);
}
// Send the event to the state machine
stateMachine.sendEvent(MessageBuilder.createMessage(e.getType(), headers));
});
// Get result
Map<Object, Object> context = stateMachine.getExtendedState()
.getVariables();
// Get the account result
result = (Account) context.getOrDefault("account", null);
// Destroy the state machine
stateMachine.stop();
return result;
}
}

View File

@@ -0,0 +1,51 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public abstract class AccountFunction {
final private Logger log = Logger.getLogger(AccountFunction.class);
final protected StateContext<AccountStatus, AccountEventType> context;
final protected Function<AccountEvent, Account> lambda;
/**
* Create a new instance of a class that extends {@link AccountFunction}, supplying
* a state context and a lambda function used to apply {@link AccountEvent} to a provided
* action.
*
* @param context is the {@link StateContext} for a replicated state machine
* @param lambda is the lambda function describing an action that consumes an {@link AccountEvent}
*/
public AccountFunction(StateContext<AccountStatus, AccountEventType> context,
Function<AccountEvent, Account> lambda) {
this.context = context;
this.lambda = lambda;
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
public Account apply(AccountEvent event) {
// Execute the lambda function
Account result = lambda.apply(event);
context.getExtendedState().getVariables().put("account", result);
return result;
}
}

View File

@@ -0,0 +1,38 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class ActivateAccount extends AccountFunction {
final private Logger log = Logger.getLogger(ActivateAccount.class);
public ActivateAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public Account apply(AccountEvent event) {
log.info("Executing workflow for an activated account...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,38 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class ArchiveAccount extends AccountFunction {
final private Logger log = Logger.getLogger(ArchiveAccount.class);
public ArchiveAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public Account apply(AccountEvent event) {
log.info("Executing workflow for an archived account...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,38 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class ConfirmAccount extends AccountFunction {
final private Logger log = Logger.getLogger(ConfirmAccount.class);
public ConfirmAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public Account apply(AccountEvent event) {
log.info("Executing workflow for a confirmed account...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,103 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.http.RequestEntity;
import org.springframework.statemachine.StateContext;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class CreateAccount extends AccountFunction {
final private Logger log = Logger.getLogger(CreateAccount.class);
public CreateAccount(StateContext<AccountStatus, AccountEventType> context) {
this(context, null);
}
public CreateAccount(StateContext<AccountStatus, AccountEventType> context,
Function<AccountEvent, Account> function) {
super(context, function);
}
/**
* Applies the {@link AccountEvent} to the {@link Account} aggregate.
*
* @param event is the {@link AccountEvent} for this context
*/
@Override
public Account apply(AccountEvent event) {
Account account;
log.info("Executing workflow for a created account...");
// Create a traverson for the root account
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
// Get the account resource attached to the event
account = traverson.follow("self")
.toEntity(Account.class)
.getBody();
// Set the account to a pending state
account = setAccountPendingStatus(event, account);
// The account can only be confirmed if it is in a pending state
if (account.getStatus() == AccountStatus.ACCOUNT_PENDING) {
// Traverse to the confirm account command
account = traverson.follow("commands")
.follow("confirm")
.toEntity(Account.class)
.getBody();
log.info(event.getType() + ": " +
event.getLink("account").getHref());
}
context.getExtendedState().getVariables().put("account", account);
return account;
}
/**
* Set the {@link Account} resource to a pending state.
*
* @param event is the {@link AccountEvent} for this context
* @param account is the {@link Account} attached to the {@link AccountEvent} resource
* @return an {@link Account} with its updated state set to pending
*/
private Account setAccountPendingStatus(AccountEvent event, Account account) {
// Set the account status to pending
account.setStatus(AccountStatus.ACCOUNT_PENDING);
RestTemplate restTemplate = new RestTemplate();
// Create a new request entity
RequestEntity<Account> requestEntity = RequestEntity.put(
URI.create(event.getLink("account").getHref()))
.contentType(MediaTypes.HAL_JSON)
.body(account);
// Update the account entity's status
account = restTemplate.exchange(requestEntity, Account.class).getBody();
return account;
}
}

View File

@@ -0,0 +1,31 @@
package demo.function;
import com.amazonaws.services.lambda.invoke.LambdaFunction;
import com.amazonaws.services.lambda.model.LogType;
import demo.account.Account;
import java.util.Map;
public interface LambdaFunctions {
@LambdaFunction(functionName="account-created-accountCreated-13P0EDGLDE399", logType = LogType.Tail)
Account accountCreated(Map event);
@LambdaFunction(functionName="accountConfirmed", logType = LogType.Tail)
Account accountConfirmed(Map event);
@LambdaFunction(functionName="account-activated-accountActivated-1P0I6FTFCMHKH", logType = LogType.Tail)
Account accountActivated(Map event);
@LambdaFunction(functionName="accountSuspended", logType = LogType.Tail)
Account accountSuspended(Map event);
@LambdaFunction(functionName="accountArchived", logType = LogType.Tail)
Account accountArchived(Map event);
@LambdaFunction(functionName="accountUnsuspended", logType = LogType.Tail)
Account accountUnsuspended(Map event);
@LambdaFunction(functionName="accountUnarchived", logType = LogType.Tail)
Account accountUnarchived(Map event);
}

View File

@@ -0,0 +1,38 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class SuspendAccount extends AccountFunction {
final private Logger log = Logger.getLogger(SuspendAccount.class);
public SuspendAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public Account apply(AccountEvent event) {
log.info("Executing workflow for a suspended account...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,38 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class UnarchiveAccount extends AccountFunction {
final private Logger log = Logger.getLogger(UnarchiveAccount.class);
public UnarchiveAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public Account apply(AccountEvent event) {
log.info("Executing workflow for an unarchived account...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,38 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
* state transitions on a {@link demo.account.Account} resource on to a function. Mapped functions
* can take multiple forms and reside either remotely or locally on the classpath of this application.
*
* @author kbastani
*/
public class UnsuspendAccount extends AccountFunction {
final private Logger log = Logger.getLogger(UnsuspendAccount.class);
public UnsuspendAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
/**
* Apply an {@link AccountEvent} to the lambda function that was provided through the
* constructor of this {@link AccountFunction}.
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public Account apply(AccountEvent event) {
log.info("Executing workflow for a unsuspended account...");
return super.apply(event);
}
}

View File

@@ -0,0 +1,42 @@
package demo.state;
import demo.account.AccountStatus;
import demo.event.AccountEventType;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.config.StateMachineFactory;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* The {@link StateMachineService} provides factory access to get new state machines for
* replicating the state of an {@link demo.account.Account} from {@link demo.event.AccountEvents}.
*
* @author kbastani
*/
@Service
public class StateMachineService {
private final StateMachineFactory<AccountStatus, AccountEventType> factory;
public StateMachineService(StateMachineFactory<AccountStatus, AccountEventType> factory) {
this.factory = factory;
}
/**
* Create a new state machine that is initially configured and ready for replicating
* the state of an {@link demo.account.Account} from a sequence of {@link demo.event.AccountEvent}.
*
* @return a new instance of {@link StateMachine}
*/
public StateMachine<AccountStatus, AccountEventType> getStateMachine() {
// Create a new state machine in its initial state
StateMachine<AccountStatus, AccountEventType> stateMachine =
factory.getStateMachine(UUID.randomUUID().toString());
// Start the new state machine
stateMachine.start();
return stateMachine;
}
}

View File

@@ -0,0 +1,29 @@
package demo.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
@Component
public class LambdaUtil {
private ObjectMapper objectMapper;
public LambdaUtil(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public HashMap objectToMap(Object object) {
HashMap result = null;
try {
result = objectMapper.readValue(objectMapper.writeValueAsString(object), HashMap.class);
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}

View File

@@ -0,0 +1,24 @@
spring:
profiles:
active: development
---
spring:
profiles: development
cloud:
stream:
bindings:
input:
destination: account
group: account-group
contentType: 'application/json'
consumer:
durableSubscription: true
server:
port: 8081
amazon:
aws:
access-key-id: replace
access-key-secret: replace
---
spring:
profiles: test

View File

@@ -0,0 +1,4 @@
spring:
application:
name: account-worker
---

View File

@@ -0,0 +1,18 @@
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@ActiveProfiles("test")
public class AccountStreamModuleApplicationTests {
@Test
public void contextLoads() {
}
}