AWS Lambda Starter & Consistency Models

This commit is contained in:
Kenny Bastani
2016-12-20 11:47:44 -08:00
parent 58c0a03e05
commit 9f4535b986
49 changed files with 949 additions and 313 deletions

View File

@@ -1,5 +1,5 @@
name: account-web
memory: 512M
memory: 1024M
instances: 1
path: ./target/account-web-0.0.1-SNAPSHOT.jar
buildpack: java_buildpack

View File

@@ -2,8 +2,10 @@ package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableHypermediaSupport;
@SpringBootApplication
@EnableHypermediaSupport(type = {EnableHypermediaSupport.HypermediaType.HAL})
public class AccountServiceApplication {
public static void main(String[] args) {

View File

@@ -22,11 +22,12 @@ public class Account extends BaseEntity {
@Id
@GeneratedValue
private Long id;
private Long userId;
private String accountNumber;
private Boolean defaultAccount;
@OneToMany(cascade = CascadeType.MERGE, fetch = FetchType.LAZY)
private String firstName;
private String lastName;
private String email;
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private Set<AccountEvent> events = new HashSet<>();
@Enumerated(value = EnumType.STRING)
@@ -36,17 +37,11 @@ public class Account extends BaseEntity {
status = AccountStatus.ACCOUNT_CREATED;
}
public Account(Long userId, String accountNumber, Boolean defaultAccount) {
public Account(String firstName, String lastName, String email) {
this();
this.accountNumber = accountNumber;
this.defaultAccount = defaultAccount;
this.userId = userId;
}
public Account(String accountNumber, Boolean defaultAccount, AccountStatus status) {
this.accountNumber = accountNumber;
this.defaultAccount = defaultAccount;
this.status = status;
this.firstName = firstName;
this.lastName = lastName;
this.email = email;
}
@JsonIgnore
@@ -58,28 +53,28 @@ public class Account extends BaseEntity {
this.id = id;
}
public Long getUserId() {
return userId;
public String getFirstName() {
return firstName;
}
public void setUserId(Long userId) {
this.userId = userId;
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getAccountNumber() {
return accountNumber;
public String getLastName() {
return lastName;
}
public void setAccountNumber(String accountNumber) {
this.accountNumber = accountNumber;
public void setLastName(String lastName) {
this.lastName = lastName;
}
public Boolean getDefaultAccount() {
return defaultAccount;
public String getEmail() {
return email;
}
public void setDefaultAccount(Boolean defaultAccount) {
this.defaultAccount = defaultAccount;
public void setEmail(String email) {
this.email = email;
}
@JsonIgnore
@@ -103,9 +98,10 @@ public class Account extends BaseEntity {
public String toString() {
return "Account{" +
"id=" + id +
", userId=" + userId +
", accountNumber='" + accountNumber + '\'' +
", defaultAccount=" + defaultAccount +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
", email='" + email + '\'' +
", events=" + events +
", status=" + status +
"} " + super.toString();
}

View File

@@ -136,12 +136,12 @@ public class AccountController {
*/
private Resource<Account> createAccountResource(Account account) {
Assert.notNull(account, "Account body must not be null");
Assert.notNull(account.getUserId(), "UserId is required");
Assert.notNull(account.getAccountNumber(), "AccountNumber is required");
Assert.notNull(account.getDefaultAccount(), "DefaultAccount is required");
Assert.notNull(account.getEmail(), "Email is required");
Assert.notNull(account.getFirstName(), "First name is required");
Assert.notNull(account.getLastName(), "Last name is required");
// Create the new account
account = accountService.createAccount(account);
account = accountService.registerAccount(account);
return getAccountResource(account);
}

View File

@@ -5,6 +5,5 @@ import org.springframework.data.repository.query.Param;
public interface AccountRepository extends JpaRepository<Account, Long> {
Account findAccountByUserId(@Param("userId") Long userId);
Account findAccountByAccountNumber(@Param("accountNumber") String accountNumber);
Account findAccountByEmail(@Param("email") String email);
}

View File

@@ -3,20 +3,19 @@ package demo.account;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import demo.event.EventService;
import demo.event.ConsistencyModel;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import java.util.Arrays;
import java.util.Objects;
import static demo.account.AccountStatus.*;
import static demo.account.AccountStatus.ACCOUNT_ACTIVE;
import static demo.account.AccountStatus.ACCOUNT_ARCHIVED;
/**
* The {@link AccountService} provides transactional support for managing {@link Account}
@@ -28,16 +27,36 @@ import static demo.account.AccountStatus.ACCOUNT_ARCHIVED;
* @author kbastani
*/
@Service
@Transactional
@CacheConfig(cacheNames = {"accounts"})
public class AccountService {
private final AccountRepository accountRepository;
private final EventService eventService;
private final CacheManager cacheManager;
public AccountService(AccountRepository accountRepository, EventService eventService) {
public AccountService(AccountRepository accountRepository, EventService eventService, CacheManager cacheManager) {
this.accountRepository = accountRepository;
this.eventService = eventService;
this.cacheManager = cacheManager;
}
@CacheEvict(cacheNames = "accounts", key = "#account.getAccountId().toString()")
public Account registerAccount(Account account) {
account = createAccount(account);
cacheManager.getCache("accounts")
.evict(account.getAccountId());
// Trigger the account creation event
AccountEvent event = appendEvent(account.getAccountId(),
new AccountEvent(AccountEventType.ACCOUNT_CREATED));
// Attach account identifier
event.getAccount().setAccountId(account.getAccountId());
// Return the result
return event.getAccount();
}
/**
@@ -48,18 +67,13 @@ public class AccountService {
*/
@CacheEvict(cacheNames = "accounts", key = "#account.getAccountId().toString()")
public Account createAccount(Account account) {
// Assert for uniqueness constraint
Assert.isNull(accountRepository.findAccountByUserId(account.getUserId()),
"An account with the supplied userId already exists");
Assert.isNull(accountRepository.findAccountByAccountNumber(account.getAccountNumber()),
"An account with the supplied account number already exists");
Assert.isNull(accountRepository.findAccountByEmail(account.getEmail()),
"An account with the supplied email already exists");
// Save the account to the repository
account = accountRepository.save(account);
// Trigger the account creation event
appendEvent(account.getAccountId(),
new AccountEvent(AccountEventType.ACCOUNT_CREATED));
account = accountRepository.saveAndFlush(account);
return account;
}
@@ -98,9 +112,9 @@ public class AccountService {
"The account with the supplied id does not exist");
Account currentAccount = accountRepository.findOne(id);
currentAccount.setUserId(account.getUserId());
currentAccount.setAccountNumber(account.getAccountNumber());
currentAccount.setDefaultAccount(account.getDefaultAccount());
currentAccount.setEmail(account.getEmail());
currentAccount.setFirstName(account.getFirstName());
currentAccount.setLastName(account.getLastName());
currentAccount.setStatus(account.getStatus());
return accountRepository.save(currentAccount);
@@ -127,12 +141,24 @@ public class AccountService {
* @return the newly appended {@link AccountEvent}
*/
public AccountEvent appendEvent(Long accountId, AccountEvent event) {
return appendEvent(accountId, event, ConsistencyModel.ACID);
}
/**
* Append a new {@link AccountEvent} to the {@link Account} reference for the supplied identifier.
*
* @param accountId is the unique identifier for the {@link Account}
* @param event is the {@link AccountEvent} to append to the {@link Account} entity
* @return the newly appended {@link AccountEvent}
*/
public AccountEvent appendEvent(Long accountId, AccountEvent event, ConsistencyModel consistencyModel) {
Account account = getAccount(accountId);
Assert.notNull(account, "The account with the supplied id does not exist");
event.setAccount(account);
event = eventService.createEvent(event);
event = eventService.createEvent(accountId, event);
account.getEvents().add(event);
accountRepository.save(account);
accountRepository.saveAndFlush(account);
eventService.raiseEvent(event, consistencyModel);
return event;
}
@@ -158,8 +184,9 @@ public class AccountService {
// Confirm the account
Account updateAccount = account;
updateAccount.setStatus(ACCOUNT_CONFIRMED);
account = this.updateAccount(id, updateAccount);
this.appendEvent(id, new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED));
this.updateAccount(id, updateAccount);
this.appendEvent(id, new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED))
.getAccount();
break;
case ACTIVATE_ACCOUNT:
Assert.isTrue(status != ACCOUNT_ACTIVE, "The account is already active");
@@ -168,8 +195,9 @@ public class AccountService {
// Activate the account
account.setStatus(ACCOUNT_ACTIVE);
account = this.updateAccount(id, account);
this.appendEvent(id, new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED));
this.updateAccount(id, account);
this.appendEvent(id, new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED))
.getAccount();
break;
case SUSPEND_ACCOUNT:
Assert.isTrue(status == ACCOUNT_ACTIVE, "An inactive account cannot be suspended");

View File

@@ -14,7 +14,7 @@ import java.util.Arrays;
@Configuration
@EnableCaching
public class CacheConfiguration {
public class CacheConfig {
@Bean
public JedisConnectionFactory redisConnectionFactory(

View File

@@ -9,5 +9,5 @@ import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
*/
@Configuration
@EnableJpaAuditing
public class JpaConfiguration {
public class JpaConfig {
}

View File

@@ -1,36 +0,0 @@
package demo.config;
import demo.account.Account;
import demo.account.AccountController;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.hateoas.Resource;
import org.springframework.hateoas.ResourceProcessor;
import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
@Configuration
public class ResourceConfiguration {
/**
* Enriches the {@link Account} resource with hypermedia links.
*
* @return a hypermedia processor for the {@link Account} resource
*/
@Bean
public ResourceProcessor<Resource<Account>> accountProcessor() {
return new ResourceProcessor<Resource<Account>>() {
@Override
public Resource<Account> process(Resource<Account> resource) {
resource.add(
linkTo(AccountController.class)
.slash("accounts")
.slash(resource.getContent().getAccountId())
.slash("commands")
.withRel("commands"));
return resource;
}
};
}
}

View File

@@ -6,5 +6,5 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBinding(Source.class)
public class StreamConfiguration {
public class StreamConfig {
}

View File

@@ -0,0 +1,36 @@
package demo.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import java.util.Collections;
import java.util.List;
@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter {
private ObjectMapper objectMapper;
public WebMvcConfig(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
converter.setObjectMapper(objectMapper);
converters.add(converter);
}
@Bean
protected RestTemplate restTemplate(ObjectMapper objectMapper) {
MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
converter.setObjectMapper(objectMapper);
return new RestTemplate(Collections.singletonList(converter));
}
}

View File

@@ -26,7 +26,7 @@ public class AccountEvent extends BaseEntity {
@Enumerated(EnumType.STRING)
private AccountEventType type;
@OneToOne(cascade = CascadeType.MERGE, fetch = FetchType.LAZY)
@OneToOne(cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@JsonIgnore
private Account account;

View File

@@ -0,0 +1,6 @@
package demo.event;
public enum ConsistencyModel {
BASE,
ACID
}

View File

@@ -16,9 +16,9 @@ public class EventController {
this.eventService = eventService;
}
@PostMapping(path = "/events")
public ResponseEntity createEvent(@RequestBody AccountEvent event) {
return Optional.ofNullable(eventService.createEvent(event))
@PostMapping(path = "/events/{id}")
public ResponseEntity createEvent(@RequestBody AccountEvent event, @PathVariable Long id) {
return Optional.ofNullable(eventService.createEvent(id, event, ConsistencyModel.ACID))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new IllegalArgumentException("Event creation failed"));
}

View File

@@ -2,17 +2,21 @@ package demo.event;
import demo.account.Account;
import demo.account.AccountController;
import org.apache.log4j.Logger;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.data.domain.PageRequest;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.Resource;
import org.springframework.http.RequestEntity;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -22,42 +26,122 @@ import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo;
/**
* The {@link EventService} provides transactional service methods for {@link AccountEvent}
* entities of the Account Service. Account domain events are generated with a {@link AccountEventType},
* and action logs are appended to the {@link AccountEvent}. The logs resource provides an append-only transaction
* log that can be used to source the state of the {@link Account}
* and action logs are appended to the {@link AccountEvent}.
*
* @author kbastani
*/
@Service
@Transactional
@CacheConfig(cacheNames = {"events"})
public class EventService {
private final Logger log = Logger.getLogger(EventService.class);
private final EventRepository eventRepository;
private final Source accountStreamSource;
private final RestTemplate restTemplate;
public EventService(EventRepository eventRepository, Source accountStreamSource) {
public EventService(EventRepository eventRepository, Source accountStreamSource, RestTemplate restTemplate) {
this.eventRepository = eventRepository;
this.accountStreamSource = accountStreamSource;
this.restTemplate = restTemplate;
}
/**
* Create a new {@link AccountEvent} and append it to the event log of the referenced {@link Account}.
* After the {@link AccountEvent} has been persisted, send the event to the account stream. Events can
* be raised as a blocking or non-blocking operation depending on the {@link ConsistencyModel}.
*
* @param accountId is the unique identifier for the {@link Account}
* @param event is the {@link AccountEvent} to create
* @param consistencyModel is the desired consistency model for the response
* @return an {@link AccountEvent} that has been appended to the {@link Account}'s event log
*/
public AccountEvent createEvent(Long accountId, AccountEvent event, ConsistencyModel consistencyModel) {
event = createEvent(accountId, event);
return raiseEvent(event, consistencyModel);
}
/**
* Raise an {@link AccountEvent} that attempts to transition the state of an {@link Account}.
*
* @param event is an {@link AccountEvent} that will be raised
* @param consistencyModel is the consistency model for this request
* @return an {@link AccountEvent} that has been appended to the {@link Account}'s event log
*/
public AccountEvent raiseEvent(AccountEvent event, ConsistencyModel consistencyModel) {
switch (consistencyModel) {
case BASE:
asyncRaiseEvent(event);
break;
case ACID:
event = raiseEvent(event);
break;
}
return event;
}
/**
* Raise an asynchronous {@link AccountEvent} by sending an AMQP message to the account stream. Any
* state changes will be applied to the {@link Account} outside of the current HTTP request context.
* <p>
* Use this operation when a workflow can be processed asynchronously outside of the current HTTP
* request context.
*
* @param event is an {@link AccountEvent} that will be raised
*/
private void asyncRaiseEvent(AccountEvent event) {
// Append the account event to the stream
accountStreamSource.output()
.send(MessageBuilder
.withPayload(getAccountEventResource(event))
.build());
}
/**
* Raise a synchronous {@link AccountEvent} by sending a HTTP request to the account stream. The response
* is a blocking operation, which ensures that the result of a multi-step workflow will not return until
* the transaction reaches a consistent state.
* <p>
* Use this operation when the result of a workflow must be returned within the current HTTP request context.
*
* @param event is an {@link AccountEvent} that will be raised
* @return an {@link AccountEvent} which contains the consistent state of an {@link Account}
*/
private AccountEvent raiseEvent(AccountEvent event) {
try {
// Create a new request entity
RequestEntity<Resource<AccountEvent>> requestEntity = RequestEntity.post(
URI.create("http://localhost:8081/v1/events"))
.contentType(MediaTypes.HAL_JSON)
.body(getAccountEventResource(event), Resource.class);
// Update the account entity's status
Account result = restTemplate.exchange(requestEntity, Account.class)
.getBody();
log.info(result);
event.setAccount(result);
} catch (Exception ex) {
log.error(ex);
}
return event;
}
/**
* Create a new {@link AccountEvent} and publish it to the account stream.
*
* @param event is the {@link AccountEvent} to publish to the account stream
* @return a hypermedia {@link AccountEvent} resource
*/
@CacheEvict(cacheNames = "events", key = "#event.getAccount().getAccountId().toString()")
public AccountEvent createEvent(AccountEvent event) {
@CacheEvict(cacheNames = "events", key = "#id.toString()")
public AccountEvent createEvent(Long id, AccountEvent event) {
// Save new event
event = addEvent(event);
Assert.notNull(event, "The event could not be appended to the account");
// Append the account event to the stream
accountStreamSource.output()
.send(MessageBuilder
.withPayload(getAccountEventResource(event))
.build());
return event;
}
@@ -105,7 +189,7 @@ public class EventService {
* @return a hypermedia resource for the supplied {@link AccountEvent} entity
*/
private Resource<AccountEvent> getAccountEventResource(AccountEvent event) {
return new Resource<>(event, Arrays.asList(
return new Resource<AccountEvent>(event, Arrays.asList(
linkTo(AccountController.class)
.slash("events")
.slash(event.getEventId())
@@ -113,8 +197,7 @@ public class EventService {
linkTo(AccountController.class)
.slash("accounts")
.slash(event.getAccount().getAccountId())
.withRel("account"))
);
.withRel("account")));
}
/**
@@ -125,7 +208,7 @@ public class EventService {
*/
@CacheEvict(cacheNames = "events", key = "#event.getAccount().getAccountId().toString()")
private AccountEvent addEvent(AccountEvent event) {
event = eventRepository.save(event);
event = eventRepository.saveAndFlush(event);
return event;
}
}

View File

@@ -14,4 +14,4 @@ spring:
host: localhost
port: 6379
server:
port: 0
port: 8080

View File

@@ -30,9 +30,9 @@ public class AccountControllerTest {
@Test
public void getUserAccountResourceShouldReturnAccount() throws Exception {
String content = "{\"userId\": 1, \"accountNumber\": \"123456789\", \"defaultAccount\": true}";
String content = "{\"firstName\": \"Jane\", \"lastName\": \"Doe\", \"email\": \"jane.doe@example.com\"}";
Account account = new Account(1L, "123456789", true);
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
given(this.accountService.getAccount(1L))
.willReturn(account);

View File

@@ -7,6 +7,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cache.CacheManager;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -21,49 +22,51 @@ public class AccountServiceTests {
@MockBean
private AccountRepository accountRepository;
@MockBean
private CacheManager cacheManager;
private AccountService accountService;
@Before
public void before() {
accountService = new AccountService(accountRepository, eventService);
accountService = new AccountService(accountRepository, eventService, cacheManager);
}
@Test
public void getAccountReturnsAccount() throws Exception {
Account expected = new Account(1L, "123456789", true);
expected.setUserId(1L);
Account expected = new Account("Jane", "Doe", "jane.doe@example.com");
given(this.accountRepository.findOne(1L)).willReturn(expected);
Account actual = accountService.getAccount(1L);
assertThat(actual).isNotNull();
assertThat(actual.getUserId()).isEqualTo(1L);
assertThat(actual.getAccountNumber()).isEqualTo("123456789");
assertThat(actual.getEmail()).isEqualTo("jane.doe@example.com");
assertThat(actual.getFirstName()).isEqualTo("Jane");
assertThat(actual.getLastName()).isEqualTo("Doe");
}
@Test
public void createAccountReturnsAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
account.setUserId(1L);
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setAccountId(1L);
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.save(account)).willReturn(account);
given(this.accountRepository.saveAndFlush(account)).willReturn(account);
Account actual = accountService.createAccount(account);
assertThat(actual).isNotNull();
assertThat(actual.getStatus()).isEqualTo(AccountStatus.ACCOUNT_CREATED);
assertThat(actual.getUserId()).isEqualTo(1L);
assertThat(actual.getAccountNumber()).isEqualTo("123456789");
assertThat(actual.getEmail()).isEqualTo("jane.doe@example.com");
assertThat(actual.getFirstName()).isEqualTo("Jane");
assertThat(actual.getLastName()).isEqualTo("Doe");
}
@Test
public void applyCommandSuspendsAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setStatus(AccountStatus.ACCOUNT_ACTIVE);
account.setUserId(1L);
AccountEvent accountEvent = new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED);
accountEvent.setAccount(account);
@@ -72,7 +75,7 @@ public class AccountServiceTests {
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.exists(1L)).willReturn(true);
given(this.accountRepository.save(account)).willReturn(account);
given(this.eventService.createEvent(new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED)))
given(this.eventService.createEvent(1L, new AccountEvent(AccountEventType.ACCOUNT_SUSPENDED)))
.willReturn(accountEvent);
Account actual = accountService.applyCommand(1L, AccountCommand.SUSPEND_ACCOUNT);
@@ -83,9 +86,8 @@ public class AccountServiceTests {
@Test
public void applyCommandUnsuspendsAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setStatus(AccountStatus.ACCOUNT_SUSPENDED);
account.setUserId(1L);
AccountEvent accountEvent = new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED);
accountEvent.setAccount(account);
@@ -94,7 +96,7 @@ public class AccountServiceTests {
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.exists(1L)).willReturn(true);
given(this.accountRepository.save(account)).willReturn(account);
given(this.eventService.createEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED)))
given(this.eventService.createEvent(1L, new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED)))
.willReturn(accountEvent);
Account actual = accountService.applyCommand(1L, AccountCommand.ACTIVATE_ACCOUNT);
@@ -105,9 +107,8 @@ public class AccountServiceTests {
@Test
public void applyCommandArchivesAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setStatus(AccountStatus.ACCOUNT_ACTIVE);
account.setUserId(1L);
AccountEvent accountEvent = new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED);
accountEvent.setAccount(account);
@@ -116,7 +117,7 @@ public class AccountServiceTests {
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.exists(1L)).willReturn(true);
given(this.accountRepository.save(account)).willReturn(account);
given(this.eventService.createEvent(new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED)))
given(this.eventService.createEvent(1L, new AccountEvent(AccountEventType.ACCOUNT_ARCHIVED)))
.willReturn(accountEvent);
Account actual = accountService.applyCommand(1L, AccountCommand.ARCHIVE_ACCOUNT);
@@ -126,10 +127,9 @@ public class AccountServiceTests {
}
@Test
public void applyCommmandUnarchivesAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
public void applyCommandUnarchivesAccount() throws Exception {
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setStatus(AccountStatus.ACCOUNT_ARCHIVED);
account.setUserId(1L);
AccountEvent accountEvent = new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED);
accountEvent.setAccount(account);
@@ -138,7 +138,7 @@ public class AccountServiceTests {
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.exists(1L)).willReturn(true);
given(this.accountRepository.save(account)).willReturn(account);
given(this.eventService.createEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED)))
given(this.eventService.createEvent(1L, new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED)))
.willReturn(accountEvent);
Account actual = accountService.applyCommand(1L, AccountCommand.ACTIVATE_ACCOUNT);
@@ -148,10 +148,9 @@ public class AccountServiceTests {
}
@Test
public void applyCommmandConfirmsAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
public void applyCommandConfirmsAccount() throws Exception {
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setStatus(AccountStatus.ACCOUNT_PENDING);
account.setUserId(1L);
AccountEvent accountEvent = new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED);
accountEvent.setAccount(account);
@@ -160,7 +159,7 @@ public class AccountServiceTests {
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.exists(1L)).willReturn(true);
given(this.accountRepository.save(account)).willReturn(account);
given(this.eventService.createEvent(new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED)))
given(this.eventService.createEvent(1L, new AccountEvent(AccountEventType.ACCOUNT_CONFIRMED)))
.willReturn(accountEvent);
Account actual = accountService.applyCommand(1L, AccountCommand.CONFIRM_ACCOUNT);
@@ -170,10 +169,10 @@ public class AccountServiceTests {
}
@Test
public void applyCommmandActivatesAccount() throws Exception {
Account account = new Account(1L, "123456789", true);
public void applyCommandActivatesAccount() throws Exception {
Account account = new Account("Jane", "Doe", "jane.doe@example.com");
account.setStatus(AccountStatus.ACCOUNT_CONFIRMED);
account.setUserId(1L);
AccountEvent accountEvent = new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED);
accountEvent.setAccount(account);
@@ -182,7 +181,7 @@ public class AccountServiceTests {
given(this.accountRepository.findOne(1L)).willReturn(account);
given(this.accountRepository.exists(1L)).willReturn(true);
given(this.accountRepository.save(account)).willReturn(account);
given(this.eventService.createEvent(new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED)))
given(this.eventService.createEvent(1L, new AccountEvent(AccountEventType.ACCOUNT_ACTIVATED)))
.willReturn(accountEvent);
Account actual = accountService.applyCommand(1L, AccountCommand.ACTIVATE_ACCOUNT);

View File

@@ -1 +1 @@
INSERT INTO ACCOUNT(ID, USER_ID, ACCOUNT_NUMBER, DEFAULT_ACCOUNT) values (1, 1, '123456789', FALSE);
INSERT INTO ACCOUNT(ID, FIRST_NAME, LAST_NAME, EMAIL) values (1, 'John', 'Doe', 'john.doe@example.com');

View File

@@ -1,11 +1,10 @@
name: account-web
memory: 512M
name: account-worker
memory: 1024M
instances: 1
path: ./target/account-web-0.0.1-SNAPSHOT.jar
path: ./target/account-worker-0.0.1-SNAPSHOT.jar
buildpack: java_buildpack
services:
- rabbit-events
- redis-cache
disk_quota: 1024M
host: account-event-web
host: account-event-worker
domain: cfapps.io

View File

@@ -48,6 +48,16 @@
<artifactId>spring-statemachine-core</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.kbastani</groupId>
<artifactId>spring-boot-starter-aws-lambda</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.11.67</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>

View File

@@ -2,8 +2,11 @@ package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableHypermediaSupport;
import org.springframework.hateoas.config.EnableHypermediaSupport.HypermediaType;
@SpringBootApplication
@EnableHypermediaSupport(type = {HypermediaType.HAL})
public class AccountStreamModuleApplication {
public static void main(String[] args) {
SpringApplication.run(AccountStreamModuleApplication.class, args);

View File

@@ -12,36 +12,36 @@ import demo.event.AccountEvent;
* @author kbastani
*/
public class Account extends BaseEntity {
private Long userId;
private String accountNumber;
private Boolean defaultAccount;
private String firstName;
private String lastName;
private String email;
private AccountStatus status;
public Account() {
}
public Long getUserId() {
return userId;
public String getFirstName() {
return firstName;
}
public void setUserId(Long userId) {
this.userId = userId;
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getAccountNumber() {
return accountNumber;
public String getLastName() {
return lastName;
}
public void setAccountNumber(String accountNumber) {
this.accountNumber = accountNumber;
public void setLastName(String lastName) {
this.lastName = lastName;
}
public Boolean getDefaultAccount() {
return defaultAccount;
public String getEmail() {
return email;
}
public void setDefaultAccount(Boolean defaultAccount) {
this.defaultAccount = defaultAccount;
public void setEmail(String email) {
this.email = email;
}
public AccountStatus getStatus() {
@@ -51,13 +51,4 @@ public class Account extends BaseEntity {
public void setStatus(AccountStatus status) {
this.status = status;
}
@Override
public String toString() {
return "Account{" +
"userId=" + userId +
", accountNumber='" + accountNumber + '\'' +
", defaultAccount=" + defaultAccount +
"} " + super.toString();
}
}

View File

@@ -0,0 +1,23 @@
package demo.config;
import amazon.aws.AWSLambdaConfigurerAdapter;
import com.fasterxml.jackson.databind.ObjectMapper;
import demo.function.LambdaFunctions;
import demo.util.LambdaUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AwsLambdaConfig {
@Bean
public LambdaFunctions lambdaInvoker(AWSLambdaConfigurerAdapter configurerAdapter) {
return configurerAdapter
.getFunctionInstance(LambdaFunctions.class);
}
@Bean
public LambdaUtil lambdaUtil(ObjectMapper objectMapper) {
return new LambdaUtil(objectMapper);
}
}

View File

@@ -150,13 +150,13 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
* to ACCOUNT_PENDING.
* <p>
* The body of this method shows an example of how to map an {@link AccountFunction} to a function
* defined in a class method of {@link CreateAccountFunction}.
* defined in a class method of {@link CreateAccount}.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@Bean
public Action<AccountStatus, AccountEventType> createAccount() {
return context -> applyEvent(context, new CreateAccountFunction(context));
return context -> applyEvent(context, new CreateAccount(context));
}
/**
@@ -164,7 +164,7 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
* to ACCOUNT_CONFIRMED.
* <p>
* The body of this method shows an example of how to use a {@link java.util.function.Consumer} Java 8
* lambda function instead of the class method definition that was shown in {@link CreateAccountFunction}.
* lambda function instead of the class method definition that was shown in {@link CreateAccount}.
*
* @return an implementation of {@link Action} that includes a function to execute
*/
@@ -172,9 +172,9 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
public Action<AccountStatus, AccountEventType> confirmAccount() {
return context -> {
// Map the account action to a Java 8 lambda function
ConfirmAccountFunction accountFunction;
ConfirmAccount accountFunction;
accountFunction = new ConfirmAccountFunction(context, event -> {
accountFunction = new ConfirmAccount(context, event -> {
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
@@ -188,6 +188,8 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
.getBody();
log.info(event.getType() + ": " + event.getLink("account").getHref());
return account;
});
applyEvent(context, accountFunction);
@@ -203,9 +205,18 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
@Bean
public Action<AccountStatus, AccountEventType> activateAccount() {
return context -> applyEvent(context,
new ActivateAccountFunction(context,
event -> log.info(event.getType() + ": " +
event.getLink("account").getHref())));
new ActivateAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
@@ -217,9 +228,18 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
@Bean
public Action<AccountStatus, AccountEventType> archiveAccount() {
return context -> applyEvent(context,
new ArchiveAccountFunction(context,
event -> log.info(event.getType() + ": " +
event.getLink("account").getHref())));
new ArchiveAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
@@ -231,9 +251,18 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
@Bean
public Action<AccountStatus, AccountEventType> suspendAccount() {
return context -> applyEvent(context,
new SuspendAccountFunction(context,
event -> log.info(event.getType() + ": " +
event.getLink("account").getHref())));
new SuspendAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
@@ -245,9 +274,18 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
@Bean
public Action<AccountStatus, AccountEventType> unarchiveAccount() {
return context -> applyEvent(context,
new UnarchiveAccountFunction(context,
event -> log.info(event.getType() + ": " +
event.getLink("account").getHref())));
new UnarchiveAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
/**
@@ -259,9 +297,18 @@ public class StateMachineConfig extends EnumStateMachineConfigurerAdapter<Accoun
@Bean
public Action<AccountStatus, AccountEventType> unsuspendAccount() {
return context -> applyEvent(context,
new UnsuspendAccountFunction(context,
event -> log.info(event.getType() + ": " +
event.getLink("account").getHref())));
new UnsuspendAccount(context, event -> {
log.info(event.getType() + ": " + event.getLink("account").getHref());
// Get the account resource for the event
Traverson traverson = new Traverson(
URI.create(event.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
return traverson.follow("self")
.toEntity(Account.class)
.getBody();
}));
}
}

View File

@@ -1,23 +1,12 @@
package demo.event;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.state.StateMachineService;
import org.apache.log4j.Logger;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* The {@link AccountEventStream} monitors for a variety of {@link AccountEvent} domain
* events for an {@link Account}.
@@ -28,11 +17,10 @@ import java.util.Map;
@EnableBinding(Sink.class)
public class AccountEventStream {
final private Logger log = Logger.getLogger(AccountEventStream.class);
final private StateMachineService stateMachineService;
private EventService eventService;
public AccountEventStream(StateMachineService stateMachineService) {
this.stateMachineService = stateMachineService;
public AccountEventStream(EventService eventService) {
this.eventService = eventService;
}
/**
@@ -41,48 +29,10 @@ public class AccountEventStream {
* reproduces the current state of the {@link Account} resource that is the
* subject of the {@link AccountEvent}.
*
* @param accountEvent
* @param accountEvent is the {@link Account} domain event to process
*/
@StreamListener(Sink.INPUT)
public void streamListerner(AccountEvent accountEvent) {
log.info("Account event received: " + accountEvent.getLink("self").getHref());
// Generate a state machine for computing the state of the account resource
StateMachine<AccountStatus, AccountEventType> stateMachine =
stateMachineService.getStateMachine();
// Follow the hypermedia link to fetch the attached account
Traverson traverson = new Traverson(
URI.create(accountEvent.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
// Get the event log for the attached account resource
AccountEvents events = traverson.follow("events")
.toEntity(AccountEvents.class)
.getBody();
// Prepare account event message headers
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("event", accountEvent);
// Replicate the current state of the account resource
events.getContent()
.stream()
.sorted((a1, a2) -> a1.getCreatedAt().compareTo(a2.getCreatedAt()))
.forEach(e -> {
MessageHeaders headers = new MessageHeaders(null);
// Check to see if this is the current event
if (e.getLink("self").equals(accountEvent.getLink("self"))) {
headers = new MessageHeaders(headerMap);
}
// Send the event to the state machine
stateMachine.sendEvent(MessageBuilder.createMessage(e.getType(), headers));
});
// Destroy the state machine
stateMachine.stop();
eventService.apply(accountEvent);
}
}

View File

@@ -0,0 +1,28 @@
package demo.event;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Optional;
@RestController
@RequestMapping("/v1")
public class EventController {
private EventService eventService;
public EventController(EventService eventService) {
this.eventService = eventService;
}
@PostMapping(path = "/events")
public ResponseEntity handleEvent(@RequestBody AccountEvent event) {
return Optional.ofNullable(eventService.apply(event))
.map(e -> new ResponseEntity<>(e, HttpStatus.CREATED))
.orElseThrow(() -> new RuntimeException("Apply event failed"));
}
}

View File

@@ -0,0 +1,82 @@
package demo.event;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.state.StateMachineService;
import org.apache.log4j.Logger;
import org.springframework.hateoas.MediaTypes;
import org.springframework.hateoas.client.Traverson;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@Service
public class EventService {
final private Logger log = Logger.getLogger(EventService.class);
final private StateMachineService stateMachineService;
public EventService(StateMachineService stateMachineService) {
this.stateMachineService = stateMachineService;
}
public Account apply(AccountEvent accountEvent) {
Account result;
log.info("Account event received: " + accountEvent.getLink("self").getHref());
// Generate a state machine for computing the state of the account resource
StateMachine<AccountStatus, AccountEventType> stateMachine =
stateMachineService.getStateMachine();
// Follow the hypermedia link to fetch the attached account
Traverson traverson = new Traverson(
URI.create(accountEvent.getLink("account").getHref()),
MediaTypes.HAL_JSON
);
// Get the event log for the attached account resource
AccountEvents events = traverson.follow("events")
.toEntity(AccountEvents.class)
.getBody();
// Prepare account event message headers
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("event", accountEvent);
// Replicate the current state of the account resource
events.getContent()
.stream()
.sorted((a1, a2) -> a1.getCreatedAt().compareTo(a2.getCreatedAt()))
.forEach(e -> {
MessageHeaders headers = new MessageHeaders(null);
// Check to see if this is the current event
if (e.getLink("self").equals(accountEvent.getLink("self"))) {
headers = new MessageHeaders(headerMap);
}
// Send the event to the state machine
stateMachine.sendEvent(MessageBuilder.createMessage(e.getType(), headers));
});
// Get result
Map<Object, Object> context = stateMachine.getExtendedState()
.getVariables();
// Get the account result
result = (Account) context.getOrDefault("account", null);
// Destroy the state machine
stateMachine.stop();
return result;
}
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -18,8 +19,8 @@ import java.util.function.Consumer;
public abstract class AccountFunction {
final private Logger log = Logger.getLogger(AccountFunction.class);
final private StateContext<AccountStatus, AccountEventType> context;
final private Consumer<AccountEvent> lambda;
final protected StateContext<AccountStatus, AccountEventType> context;
final protected Function<AccountEvent, Account> lambda;
/**
* Create a new instance of a class that extends {@link AccountFunction}, supplying
@@ -30,7 +31,7 @@ public abstract class AccountFunction {
* @param lambda is the lambda function describing an action that consumes an {@link AccountEvent}
*/
public AccountFunction(StateContext<AccountStatus, AccountEventType> context,
Consumer<AccountEvent> lambda) {
Function<AccountEvent, Account> lambda) {
this.context = context;
this.lambda = lambda;
}
@@ -41,8 +42,10 @@ public abstract class AccountFunction {
*
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
// Execute the lambda function
lambda.accept(event);
Account result = lambda.apply(event);
context.getExtendedState().getVariables().put("account", result);
return result;
}
}

View File

@@ -1,4 +0,0 @@
package demo.function;
public class AccountService {
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -15,11 +16,11 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class ActivateAccountFunction extends AccountFunction {
public class ActivateAccount extends AccountFunction {
final private Logger log = Logger.getLogger(ActivateAccountFunction.class);
final private Logger log = Logger.getLogger(ActivateAccount.class);
public ActivateAccountFunction(StateContext<AccountStatus, AccountEventType> context, Consumer<AccountEvent> lambda) {
public ActivateAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
@@ -30,8 +31,8 @@ public class ActivateAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
log.info("Executing workflow for an activated account...");
super.apply(event);
return super.apply(event);
}
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -15,11 +16,11 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class ArchiveAccountFunction extends AccountFunction {
public class ArchiveAccount extends AccountFunction {
final private Logger log = Logger.getLogger(ArchiveAccountFunction.class);
final private Logger log = Logger.getLogger(ArchiveAccount.class);
public ArchiveAccountFunction(StateContext<AccountStatus, AccountEventType> context, Consumer<AccountEvent> lambda) {
public ArchiveAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
@@ -30,8 +31,8 @@ public class ArchiveAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
log.info("Executing workflow for an archived account...");
super.apply(event);
return super.apply(event);
}
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -15,11 +16,11 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class ConfirmAccountFunction extends AccountFunction {
public class ConfirmAccount extends AccountFunction {
final private Logger log = Logger.getLogger(ConfirmAccountFunction.class);
final private Logger log = Logger.getLogger(ConfirmAccount.class);
public ConfirmAccountFunction(StateContext<AccountStatus, AccountEventType> context, Consumer<AccountEvent> lambda) {
public ConfirmAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
@@ -30,8 +31,8 @@ public class ConfirmAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
log.info("Executing workflow for a confirmed account...");
super.apply(event);
return super.apply(event);
}
}

View File

@@ -12,7 +12,7 @@ import org.springframework.statemachine.StateContext;
import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -21,16 +21,16 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class CreateAccountFunction extends AccountFunction {
public class CreateAccount extends AccountFunction {
final private Logger log = Logger.getLogger(CreateAccountFunction.class);
final private Logger log = Logger.getLogger(CreateAccount.class);
public CreateAccountFunction(StateContext<AccountStatus, AccountEventType> context) {
public CreateAccount(StateContext<AccountStatus, AccountEventType> context) {
this(context, null);
}
public CreateAccountFunction(StateContext<AccountStatus, AccountEventType> context,
Consumer<AccountEvent> function) {
public CreateAccount(StateContext<AccountStatus, AccountEventType> context,
Function<AccountEvent, Account> function) {
super(context, function);
}
@@ -40,7 +40,10 @@ public class CreateAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} for this context
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
Account account;
log.info("Executing workflow for a created account...");
// Create a traverson for the root account
@@ -50,7 +53,7 @@ public class CreateAccountFunction extends AccountFunction {
);
// Get the account resource attached to the event
Account account = traverson.follow("self")
account = traverson.follow("self")
.toEntity(Account.class)
.getBody();
@@ -68,6 +71,10 @@ public class CreateAccountFunction extends AccountFunction {
log.info(event.getType() + ": " +
event.getLink("account").getHref());
}
context.getExtendedState().getVariables().put("account", account);
return account;
}
/**

View File

@@ -0,0 +1,31 @@
package demo.function;
import com.amazonaws.services.lambda.invoke.LambdaFunction;
import com.amazonaws.services.lambda.model.LogType;
import demo.account.Account;
import java.util.Map;
public interface LambdaFunctions {
@LambdaFunction(functionName="account-created-accountCreated-13P0EDGLDE399", logType = LogType.Tail)
Account accountCreated(Map event);
@LambdaFunction(functionName="accountConfirmed", logType = LogType.Tail)
Account accountConfirmed(Map event);
@LambdaFunction(functionName="account-activated-accountActivated-1P0I6FTFCMHKH", logType = LogType.Tail)
Account accountActivated(Map event);
@LambdaFunction(functionName="accountSuspended", logType = LogType.Tail)
Account accountSuspended(Map event);
@LambdaFunction(functionName="accountArchived", logType = LogType.Tail)
Account accountArchived(Map event);
@LambdaFunction(functionName="accountUnsuspended", logType = LogType.Tail)
Account accountUnsuspended(Map event);
@LambdaFunction(functionName="accountUnarchived", logType = LogType.Tail)
Account accountUnarchived(Map event);
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -15,11 +16,11 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class SuspendAccountFunction extends AccountFunction {
public class SuspendAccount extends AccountFunction {
final private Logger log = Logger.getLogger(SuspendAccountFunction.class);
final private Logger log = Logger.getLogger(SuspendAccount.class);
public SuspendAccountFunction(StateContext<AccountStatus, AccountEventType> context, Consumer<AccountEvent> lambda) {
public SuspendAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
@@ -30,8 +31,8 @@ public class SuspendAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
log.info("Executing workflow for a suspended account...");
super.apply(event);
return super.apply(event);
}
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -15,11 +16,11 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class UnarchiveAccountFunction extends AccountFunction {
public class UnarchiveAccount extends AccountFunction {
final private Logger log = Logger.getLogger(UnarchiveAccountFunction.class);
final private Logger log = Logger.getLogger(UnarchiveAccount.class);
public UnarchiveAccountFunction(StateContext<AccountStatus, AccountEventType> context, Consumer<AccountEvent> lambda) {
public UnarchiveAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
@@ -30,8 +31,8 @@ public class UnarchiveAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
log.info("Executing workflow for an unarchived account...");
super.apply(event);
return super.apply(event);
}
}

View File

@@ -1,12 +1,13 @@
package demo.function;
import demo.account.Account;
import demo.account.AccountStatus;
import demo.event.AccountEvent;
import demo.event.AccountEventType;
import org.apache.log4j.Logger;
import org.springframework.statemachine.StateContext;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* The {@link AccountFunction} is an abstraction used to map actions that are triggered by
@@ -15,11 +16,11 @@ import java.util.function.Consumer;
*
* @author kbastani
*/
public class UnsuspendAccountFunction extends AccountFunction {
public class UnsuspendAccount extends AccountFunction {
final private Logger log = Logger.getLogger(UnsuspendAccountFunction.class);
final private Logger log = Logger.getLogger(UnsuspendAccount.class);
public UnsuspendAccountFunction(StateContext<AccountStatus, AccountEventType> context, Consumer<AccountEvent> lambda) {
public UnsuspendAccount(StateContext<AccountStatus, AccountEventType> context, Function<AccountEvent, Account> lambda) {
super(context, lambda);
}
@@ -30,8 +31,8 @@ public class UnsuspendAccountFunction extends AccountFunction {
* @param event is the {@link AccountEvent} to apply to the lambda function
*/
@Override
public void apply(AccountEvent event) {
public Account apply(AccountEvent event) {
log.info("Executing workflow for a unsuspended account...");
super.apply(event);
return super.apply(event);
}
}

View File

@@ -0,0 +1,29 @@
package demo.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
@Component
public class LambdaUtil {
private ObjectMapper objectMapper;
public LambdaUtil(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
public HashMap objectToMap(Object object) {
HashMap result = null;
try {
result = objectMapper.readValue(objectMapper.writeValueAsString(object), HashMap.class);
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}

View File

@@ -14,4 +14,8 @@ spring:
consumer:
durableSubscription: true
server:
port: 0
port: 8081
amazon:
aws:
access-key-id: replace
access-key-secret: replace

View File

@@ -25,6 +25,7 @@
<module>order-parent</module>
<module>user-parent</module>
<module>warehouse-parent</module>
<module>spring-boot-starter-aws-lambda</module>
</modules>
<dependencies>

View File

@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-boot-starter-aws-lambda</artifactId>
<packaging>jar</packaging>
<parent>
<groupId>org.kbastani</groupId>
<artifactId>event-stream-processing-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-lambda</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.11.67</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.67</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@@ -0,0 +1,88 @@
package amazon.aws;
import com.amazonaws.auth.*;
import com.amazonaws.services.lambda.AWSLambdaClientBuilder;
import com.amazonaws.services.lambda.invoke.LambdaInvokerFactory;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* This class is a client for interacting with Amazon S3 bucket resources.
*
* @author kbastani
*/
@Component
public class AWSLambdaConfigurerAdapter {
private String accessKeyId;
private String accessKeySecret;
private Credentials sessionCredentials;
/**
* Create a new instance of the {@link AWSLambdaConfigurerAdapter} with the bucket name and access credentials
*
* @param accessKeyId is the access key id credential for the specified bucket name
* @param accessKeySecret is the access key secret for the specified bucket name
*/
public AWSLambdaConfigurerAdapter(String accessKeyId,
String accessKeySecret) {
this.accessKeyId = accessKeyId;
this.accessKeySecret = accessKeySecret;
}
/**
* Gets an instance of a function interface
* @param type
* @param <T>
* @return
*/
public <T> T getFunctionInstance(Class<T> type) {
return LambdaInvokerFactory.builder()
.lambdaClient(AWSLambdaClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(
getBasicSessionCredentials()))
.build())
.build(type);
}
/**
* Get the basic session credentials for the template's configured IAM authentication keys
*
* @return a {@link BasicSessionCredentials} instance with a valid authenticated session token
*/
private BasicSessionCredentials getBasicSessionCredentials() {
// Create a new session token if the session is expired or not initialized
if (sessionCredentials == null || sessionCredentials.getExpiration().before(new Date()))
sessionCredentials = getSessionCredentials();
// Create basic session credentials using the generated session token
return new BasicSessionCredentials(sessionCredentials.getAccessKeyId(),
sessionCredentials.getSecretAccessKey(),
sessionCredentials.getSessionToken());
}
/**
* Creates a new session credential that is valid for 12 hours
*
* @return an authenticated {@link Credentials} for the new session token
*/
private Credentials getSessionCredentials() {
// Create a new session with the user credentials for the service instance
AWSSecurityTokenServiceClient stsClient =
new AWSSecurityTokenServiceClient(new BasicAWSCredentials(accessKeyId, accessKeySecret));
// Start a new session for managing a service instance's bucket
GetSessionTokenRequest getSessionTokenRequest =
new GetSessionTokenRequest().withDurationSeconds(43200);
// Get the session token for the service instance's bucket
sessionCredentials = stsClient.getSessionToken(getSessionTokenRequest).getCredentials();
return sessionCredentials;
}
}

View File

@@ -0,0 +1,28 @@
package amazon.aws;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* This class auto-configures a {@link AWSLambdaConfigurerAdapter} bean.
*
* @author kbastani
*/
@Configuration
@ConditionalOnMissingBean(AWSLambdaConfigurerAdapter.class)
@EnableConfigurationProperties(AmazonProperties.class)
public class AmazonAutoConfiguration {
@Autowired
private AmazonProperties amazonProperties;
@Bean
protected AWSLambdaConfigurerAdapter lambdaAdapter() {
return new AWSLambdaConfigurerAdapter(
amazonProperties.getAws().getAccessKeyId(),
amazonProperties.getAws().getAccessKeySecret());
}
}

View File

@@ -0,0 +1,89 @@
package amazon.aws;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.context.annotation.Configuration;
/**
* Configuration property group for Amazon S3 and AWS
*
* @author kbastani
*/
@Configuration
@ConfigurationProperties(prefix = "amazon")
public class AmazonProperties {
@NestedConfigurationProperty
private Aws aws;
/**
* A property group for Amazon Web Service (AWS) configurations
*
* @return a property group for AWS configurations
*/
public Aws getAws() {
return aws;
}
/**
* A property group for Amazon Web Service (AWS) configurations
*
* @param aws is a property group for AWS configurations
*/
public void setAws(Aws aws) {
this.aws = aws;
}
/**
* A property group for Amazon Web Service (AWS) configurations
*/
public static class Aws {
private String accessKeyId;
private String accessKeySecret;
/**
* A valid AWS account's access key id.
*
* @return an AWS access key id
*/
public String getAccessKeyId() {
return accessKeyId;
}
/**
* A valid AWS account's access key id.
*
* @param accessKeyId is a valid AWS account's access key id.
*/
public void setAccessKeyId(String accessKeyId) {
this.accessKeyId = accessKeyId;
}
/**
* A valid AWS account's secret access token.
*
* @return an AWS account's secret access key
*/
public String getAccessKeySecret() {
return accessKeySecret;
}
/**
* A valid AWS account's secret access token.
*
* @param accessKeySecret is a valid AWS account's secret access token.
*/
public void setAccessKeySecret(String accessKeySecret) {
this.accessKeySecret = accessKeySecret;
}
@Override
public String toString() {
return "Aws{" +
"accessKeyId='" + accessKeyId + '\'' +
", accessKeySecret='" + accessKeySecret + '\'' +
'}';
}
}
}

View File

@@ -0,0 +1,9 @@
{
"groups": [
{
"name": "amazon",
"type": "amazon.aws.AmazonProperties",
"sourceType": "amazon.aws.AmazonProperties"
}
]
}

View File

@@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=amazon.aws.AmazonAutoConfiguration

View File

@@ -0,0 +1,2 @@
amazon.aws.access-key-id=replace
amazon.aws.access-key-secret=replace

View File

@@ -0,0 +1,44 @@
package amazon.aws;
import org.junit.After;
import org.junit.Test;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import static junit.framework.TestCase.assertNotNull;
public class AmazonConfigurationTest {
private AnnotationConfigApplicationContext context;
@After
public void tearDown() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void defaultAdapter() {
load(EmptyConfiguration.class,
"amazon.aws.access-key-id=AJGLDLSXKDFLS",
"amazon.aws.access-key-secret=XSDFSDFLKKHASDFJALASDF");
AWSLambdaConfigurerAdapter amazonS3Template = this.context.getBean(AWSLambdaConfigurerAdapter.class);
assertNotNull(amazonS3Template);
}
@Configuration
static class EmptyConfiguration {
}
private void load(Class<?> config, String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.register(config);
applicationContext.register(AmazonAutoConfiguration.class);
applicationContext.refresh();
this.context = applicationContext;
}
}