From 9d13169d80fad235cf8a17bf43cf2d95b621fabf Mon Sep 17 00:00:00 2001 From: Kenny Bastani Date: Wed, 11 Jan 2017 20:35:15 -0800 Subject: [PATCH] Distributed lock for inventory --- .../src/main/resources/application.yml | 1 + .../src/main/resources/application.yml | 1 + docker-compose.yml | 4 ++ .../src/main/resources/application.yml | 1 + .../src/main/resources/application.yml | 1 + .../src/main/resources/application.yml | 1 + .../src/main/resources/application.yml | 1 + warehouse/warehouse-web/pom.xml | 9 ++++ .../main/java/demo/config/RedisConfig.java | 15 ++++++ .../inventory/action/ReserveInventory.java | 9 +--- .../java/demo/inventory/domain/Inventory.java | 17 ++++-- .../inventory/domain/InventoryService.java | 52 +++++++++++++++---- .../repository/InventoryRepository.java | 4 +- .../src/main/resources/application.yml | 47 ++++++++++++++--- .../test/java/demo/WarehouseServiceTests.java | 37 ++++++++++++- .../src/main/resources/application.yml | 4 ++ 16 files changed, 171 insertions(+), 33 deletions(-) create mode 100644 warehouse/warehouse-web/src/main/java/demo/config/RedisConfig.java diff --git a/account/account-web/src/main/resources/application.yml b/account/account-web/src/main/resources/application.yml index 93c12dd..2fe6a17 100644 --- a/account/account-web/src/main/resources/application.yml +++ b/account/account-web/src/main/resources/application.yml @@ -27,6 +27,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: diff --git a/account/account-worker/src/main/resources/application.yml b/account/account-worker/src/main/resources/application.yml index 885fc78..474e20d 100644 --- a/account/account-worker/src/main/resources/application.yml +++ b/account/account-worker/src/main/resources/application.yml @@ -38,6 +38,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: diff --git a/docker-compose.yml b/docker-compose.yml index 5dffa3a..bf30bbd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,10 @@ mysql: - MYSQL_ROOT_PASSWORD=dbpass - MYSQL_DATABASE=dev] net: host +redis: + container_name: redis + image: redis:latest + net: host rabbit: container_name: rabbit image: rabbitmq:3-management diff --git a/order/order-web/src/main/resources/application.yml b/order/order-web/src/main/resources/application.yml index 029f1a9..b90be1d 100644 --- a/order/order-web/src/main/resources/application.yml +++ b/order/order-web/src/main/resources/application.yml @@ -27,6 +27,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: diff --git a/order/order-worker/src/main/resources/application.yml b/order/order-worker/src/main/resources/application.yml index 40f0b0d..392474b 100644 --- a/order/order-worker/src/main/resources/application.yml +++ b/order/order-worker/src/main/resources/application.yml @@ -32,6 +32,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: diff --git a/payment/payment-web/src/main/resources/application.yml b/payment/payment-web/src/main/resources/application.yml index bcfb834..23778cc 100644 --- a/payment/payment-web/src/main/resources/application.yml +++ b/payment/payment-web/src/main/resources/application.yml @@ -27,6 +27,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: diff --git a/payment/payment-worker/src/main/resources/application.yml b/payment/payment-worker/src/main/resources/application.yml index b690963..3e0286b 100644 --- a/payment/payment-worker/src/main/resources/application.yml +++ b/payment/payment-worker/src/main/resources/application.yml @@ -32,6 +32,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: diff --git a/warehouse/warehouse-web/pom.xml b/warehouse/warehouse-web/pom.xml index 31f50ba..79eebca 100644 --- a/warehouse/warehouse-web/pom.xml +++ b/warehouse/warehouse-web/pom.xml @@ -31,6 +31,15 @@ org.springframework.boot spring-boot-starter-data-jpa + + org.springframework.boot + spring-boot-starter-redis + + + org.redisson + redisson + 3.2.2 + org.springframework.boot spring-boot-starter-actuator diff --git a/warehouse/warehouse-web/src/main/java/demo/config/RedisConfig.java b/warehouse/warehouse-web/src/main/java/demo/config/RedisConfig.java new file mode 100644 index 0000000..9edda1f --- /dev/null +++ b/warehouse/warehouse-web/src/main/java/demo/config/RedisConfig.java @@ -0,0 +1,15 @@ +package demo.config; + +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RedisConfig { + + @Bean + public RedissonClient redissonClient() { + return Redisson.create(); + } +} diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java b/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java index a37b11a..31887d8 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/action/ReserveInventory.java @@ -35,19 +35,14 @@ public class ReserveInventory extends Action { public BiFunction getFunction() { return (inventory, reservationId) -> { - Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_PENDING, - "Inventory must be in a reservation pending state"); + Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED, + "Inventory must be in a reservation connected state"); Assert.isTrue(inventory.getReservation() == null, "There is already a reservation attached to the inventory"); Reservation reservation = reservationService.get(reservationId); - Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist"); - inventory.setReservation(reservation); - inventory.setStatus(RESERVATION_CONNECTED); - inventory = inventoryService.update(inventory); - try { // Trigger the reservation connected event inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory)); diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java index bcf865c..55001a6 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/Inventory.java @@ -34,7 +34,7 @@ public class Inventory extends AbstractEntity { private Reservation reservation; @JsonIgnore - @ManyToOne(cascade = CascadeType.PERSIST, fetch = FetchType.LAZY) + @ManyToOne(cascade = CascadeType.ALL, fetch = FetchType.EAGER) private Warehouse warehouse; public Inventory() { @@ -111,9 +111,16 @@ public class Inventory extends AbstractEntity { */ @Override public Link getId() { - return linkTo(InventoryController.class) - .slash("inventory") - .slash(getIdentity()) - .withSelfRel(); + Link link; + try { + link = linkTo(InventoryController.class) + .slash("inventory") + .slash(getIdentity()) + .withSelfRel(); + } catch (Exception ex) { + link = new Link("http://warehouse-service/v1/inventory/" + id, "self"); + } + + return link; } } diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java index 3159cc6..c98342c 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/domain/InventoryService.java @@ -3,16 +3,23 @@ package demo.inventory.domain; import demo.domain.Service; import demo.inventory.repository.InventoryRepository; import demo.reservation.domain.Reservation; -import org.springframework.transaction.annotation.Transactional; +import org.apache.log4j.Logger; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.util.Assert; +import java.util.concurrent.TimeUnit; + @org.springframework.stereotype.Service public class InventoryService extends Service { + private final Logger log = Logger.getLogger(InventoryService.class); private final InventoryRepository inventoryRepository; + private final RedissonClient redissonClient; - public InventoryService(InventoryRepository inventoryRepository) { + public InventoryService(InventoryRepository inventoryRepository, RedissonClient redissonClient) { this.inventoryRepository = inventoryRepository; + this.redissonClient = redissonClient; } /** @@ -79,19 +86,44 @@ public class InventoryService extends Service { * @param reservation is the reservation to connect to the inventory * @return the first available inventory in the warehouse or null */ - @Transactional public Inventory findAvailableInventory(Reservation reservation) { Assert.notNull(reservation.getWarehouse(), "Reservation must be connected to a warehouse"); Assert.notNull(reservation.getProductId(), "Reservation must contain a valid product identifier"); - Inventory inventory = inventoryRepository - .findFirstInventoryByWarehouseIdAndProductIdAndStatus(reservation.getWarehouse() - .getIdentity(), reservation.getProductId(), InventoryStatus.RESERVATION_PENDING) - .orElse(null); + Boolean reserved = false; + Inventory inventory = null; - if (inventory != null) { - // Reserve the inventory - inventory = inventory.reserve(reservation.getIdentity()); + while (!reserved) { + inventory = inventoryRepository + .findFirstInventoryByWarehouseIdAndProductIdAndStatus(reservation.getWarehouse() + .getIdentity(), reservation.getProductId(), InventoryStatus.RESERVATION_PENDING); + if (inventory != null) { + // Acquire lock + RLock inventoryLock = redissonClient + .getLock(String.format("inventory_%s", inventory.getIdentity().toString())); + + Boolean lock = false; + + try { + lock = inventoryLock.tryLock(30, 5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("Interrupted while acquiring lock on inventory", e); + } + + if (lock) { + inventory.setStatus(InventoryStatus.RESERVATION_CONNECTED); + inventory = update(inventory); + + // Reserve the inventory + inventory = inventory.reserve(reservation.getIdentity()); + + inventoryLock.unlock(); + } + + reserved = lock; + } else { + reserved = true; + } } return inventory; diff --git a/warehouse/warehouse-web/src/main/java/demo/inventory/repository/InventoryRepository.java b/warehouse/warehouse-web/src/main/java/demo/inventory/repository/InventoryRepository.java index bac8c8a..676efcd 100644 --- a/warehouse/warehouse-web/src/main/java/demo/inventory/repository/InventoryRepository.java +++ b/warehouse/warehouse-web/src/main/java/demo/inventory/repository/InventoryRepository.java @@ -5,9 +5,7 @@ import demo.inventory.domain.InventoryStatus; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; -import java.util.Optional; - public interface InventoryRepository extends JpaRepository { - Optional findFirstInventoryByWarehouseIdAndProductIdAndStatus(@Param("warehouseId") Long warehouseId, + Inventory findFirstInventoryByWarehouseIdAndProductIdAndStatus(@Param("warehouseId") Long warehouseId, @Param("productId") String productId, @Param("status") InventoryStatus status); } diff --git a/warehouse/warehouse-web/src/main/resources/application.yml b/warehouse/warehouse-web/src/main/resources/application.yml index 736d2dd..f2d4843 100644 --- a/warehouse/warehouse-web/src/main/resources/application.yml +++ b/warehouse/warehouse-web/src/main/resources/application.yml @@ -1,6 +1,13 @@ spring: profiles: active: development +server: + port: 0 +events: + worker: http://warehouse-worker/v1/events +--- +spring: + profiles: development cloud: stream: bindings: @@ -13,19 +20,29 @@ spring: inventory: contentType: 'application/json' destination: inventory -server: - port: 0 -events: - worker: http://warehouse-worker/v1/events ---- -spring: - profiles: development --- spring: profiles: docker rabbitmq: host: ${DOCKER_IP:192.168.99.100} port: 5672 + datasource: + url: jdbc:h2:mem:AZ;MVCC=TRUE + redis: + host: ${DOCKER_IP:192.168.99.100} + port: 6379 + cloud: + stream: + bindings: + warehouse: + contentType: 'application/json' + destination: warehouse + reservation: + contentType: 'application/json' + destination: reservation + inventory: + contentType: 'application/json' + destination: inventory eureka: client: service-url: @@ -33,16 +50,32 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: profiles: test + redis: + host: localhost + port: 6379 eureka: client: enabled: false --- spring: profiles: cloud + cloud: + stream: + bindings: + warehouse: + contentType: 'application/json' + destination: warehouse + reservation: + contentType: 'application/json' + destination: reservation + inventory: + contentType: 'application/json' + destination: inventory eureka: instance: hostname: ${vcap.application.uris[0]:localhost} diff --git a/warehouse/warehouse-web/src/test/java/demo/WarehouseServiceTests.java b/warehouse/warehouse-web/src/test/java/demo/WarehouseServiceTests.java index 66ae575..bf9e2bf 100644 --- a/warehouse/warehouse-web/src/test/java/demo/WarehouseServiceTests.java +++ b/warehouse/warehouse-web/src/test/java/demo/WarehouseServiceTests.java @@ -2,6 +2,7 @@ package demo; import demo.inventory.domain.Inventory; import demo.inventory.domain.InventoryService; +import demo.inventory.domain.InventoryStatus; import demo.inventory.event.InventoryEventService; import demo.inventory.repository.InventoryRepository; import demo.reservation.domain.Reservation; @@ -22,6 +23,7 @@ import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -95,7 +97,7 @@ public class WarehouseServiceTests { actualInventory = inventoryService.get(1L); assertThat(actualInventory.getWarehouse()).isNotNull(); - List warehouseFound = warehouseRepository.findAllWithInventory(3L ,Arrays + List warehouseFound = warehouseRepository.findAllWithInventory(3L, Arrays .asList("SKU-001", "SKU-002", "SKU-003")); List warehouseNotFound = warehouseRepository.findAllWithInventory(2L, Arrays @@ -103,4 +105,37 @@ public class WarehouseServiceTests { } + @Test + public void asyncInventoryReservationTest() throws Exception { + + Warehouse warehouse = new Warehouse(); + warehouse = warehouseRepository.save(warehouse); + + Inventory inventory1 = new Inventory(); + inventory1.setProductId("SKU-001"); + inventory1.setWarehouse(warehouse); + inventory1.setStatus(InventoryStatus.RESERVATION_PENDING); + Inventory inventory2 = new Inventory(); + inventory2.setProductId("SKU-001"); + inventory2.setWarehouse(warehouse); + inventory2.setStatus(InventoryStatus.RESERVATION_PENDING); + List inventories = inventoryRepository.save(Arrays.asList(inventory1, inventory2)); + + Reservation reservation1 = new Reservation(); + reservation1.setProductId("SKU-001"); + reservation1.setWarehouse(warehouse); + Reservation reservation2 = new Reservation(); + reservation2.setProductId("SKU-001"); + reservation2.setWarehouse(warehouse); + List reservations = reservationRepository.save(Arrays.asList(reservation1, reservation2)); + + List reservedInventory = reservations.parallelStream() + .map(a -> inventoryService.findAvailableInventory(a)).collect(Collectors + .toList()); + + assertThat(reservedInventory).isNotEmpty(); + assertThat(reservedInventory.size()).isEqualTo(2); + assertThat(reservedInventory.get(0)).isNotSameAs(reservedInventory.get(1)); + } + } \ No newline at end of file diff --git a/warehouse/warehouse-worker/src/main/resources/application.yml b/warehouse/warehouse-worker/src/main/resources/application.yml index 5948fe2..9bdb950 100644 --- a/warehouse/warehouse-worker/src/main/resources/application.yml +++ b/warehouse/warehouse-worker/src/main/resources/application.yml @@ -10,18 +10,21 @@ spring: group: warehouse-group consumer: durableSubscription: true + concurrency: 5 reservation: contentType: 'application/json' destination: reservation group: reservation-group consumer: durableSubscription: true + concurrency: 5 inventory: contentType: 'application/json' destination: inventory group: inventory-group consumer: durableSubscription: true + concurrency: 5 server: port: 0 --- @@ -40,6 +43,7 @@ eureka: registryFetchIntervalSeconds: 5 instance: hostname: ${DOCKER_IP:192.168.99.100} + instance-id: ${spring.application.name}:${random.int} leaseRenewalIntervalInSeconds: 5 --- spring: