Distributed lock for inventory
This commit is contained in:
@@ -27,6 +27,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
@@ -38,6 +38,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
@@ -6,6 +6,10 @@ mysql:
|
||||
- MYSQL_ROOT_PASSWORD=dbpass
|
||||
- MYSQL_DATABASE=dev]
|
||||
net: host
|
||||
redis:
|
||||
container_name: redis
|
||||
image: redis:latest
|
||||
net: host
|
||||
rabbit:
|
||||
container_name: rabbit
|
||||
image: rabbitmq:3-management
|
||||
|
||||
@@ -27,6 +27,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
@@ -32,6 +32,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
@@ -27,6 +27,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
@@ -32,6 +32,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
@@ -31,6 +31,15 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-redis</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.redisson</groupId>
|
||||
<artifactId>redisson</artifactId>
|
||||
<version>3.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package demo.config;
|
||||
|
||||
import org.redisson.Redisson;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class RedisConfig {
|
||||
|
||||
@Bean
|
||||
public RedissonClient redissonClient() {
|
||||
return Redisson.create();
|
||||
}
|
||||
}
|
||||
@@ -35,19 +35,14 @@ public class ReserveInventory extends Action<Inventory> {
|
||||
|
||||
public BiFunction<Inventory, Long, Inventory> getFunction() {
|
||||
return (inventory, reservationId) -> {
|
||||
Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_PENDING,
|
||||
"Inventory must be in a reservation pending state");
|
||||
Assert.isTrue(inventory.getStatus() == InventoryStatus.RESERVATION_CONNECTED,
|
||||
"Inventory must be in a reservation connected state");
|
||||
Assert.isTrue(inventory.getReservation() == null,
|
||||
"There is already a reservation attached to the inventory");
|
||||
|
||||
Reservation reservation = reservationService.get(reservationId);
|
||||
|
||||
Assert.notNull(reservation, "Reserve inventory failed, the reservation does not exist");
|
||||
|
||||
inventory.setReservation(reservation);
|
||||
inventory.setStatus(RESERVATION_CONNECTED);
|
||||
inventory = inventoryService.update(inventory);
|
||||
|
||||
try {
|
||||
// Trigger the reservation connected event
|
||||
inventory.sendAsyncEvent(new InventoryEvent(InventoryEventType.RESERVATION_CONNECTED, inventory));
|
||||
|
||||
@@ -34,7 +34,7 @@ public class Inventory extends AbstractEntity<InventoryEvent, Long> {
|
||||
private Reservation reservation;
|
||||
|
||||
@JsonIgnore
|
||||
@ManyToOne(cascade = CascadeType.PERSIST, fetch = FetchType.LAZY)
|
||||
@ManyToOne(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
|
||||
private Warehouse warehouse;
|
||||
|
||||
public Inventory() {
|
||||
@@ -111,9 +111,16 @@ public class Inventory extends AbstractEntity<InventoryEvent, Long> {
|
||||
*/
|
||||
@Override
|
||||
public Link getId() {
|
||||
return linkTo(InventoryController.class)
|
||||
.slash("inventory")
|
||||
.slash(getIdentity())
|
||||
.withSelfRel();
|
||||
Link link;
|
||||
try {
|
||||
link = linkTo(InventoryController.class)
|
||||
.slash("inventory")
|
||||
.slash(getIdentity())
|
||||
.withSelfRel();
|
||||
} catch (Exception ex) {
|
||||
link = new Link("http://warehouse-service/v1/inventory/" + id, "self");
|
||||
}
|
||||
|
||||
return link;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,16 +3,23 @@ package demo.inventory.domain;
|
||||
import demo.domain.Service;
|
||||
import demo.inventory.repository.InventoryRepository;
|
||||
import demo.reservation.domain.Reservation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@org.springframework.stereotype.Service
|
||||
public class InventoryService extends Service<Inventory, Long> {
|
||||
|
||||
private final Logger log = Logger.getLogger(InventoryService.class);
|
||||
private final InventoryRepository inventoryRepository;
|
||||
private final RedissonClient redissonClient;
|
||||
|
||||
public InventoryService(InventoryRepository inventoryRepository) {
|
||||
public InventoryService(InventoryRepository inventoryRepository, RedissonClient redissonClient) {
|
||||
this.inventoryRepository = inventoryRepository;
|
||||
this.redissonClient = redissonClient;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -79,19 +86,44 @@ public class InventoryService extends Service<Inventory, Long> {
|
||||
* @param reservation is the reservation to connect to the inventory
|
||||
* @return the first available inventory in the warehouse or null
|
||||
*/
|
||||
@Transactional
|
||||
public Inventory findAvailableInventory(Reservation reservation) {
|
||||
Assert.notNull(reservation.getWarehouse(), "Reservation must be connected to a warehouse");
|
||||
Assert.notNull(reservation.getProductId(), "Reservation must contain a valid product identifier");
|
||||
|
||||
Inventory inventory = inventoryRepository
|
||||
.findFirstInventoryByWarehouseIdAndProductIdAndStatus(reservation.getWarehouse()
|
||||
.getIdentity(), reservation.getProductId(), InventoryStatus.RESERVATION_PENDING)
|
||||
.orElse(null);
|
||||
Boolean reserved = false;
|
||||
Inventory inventory = null;
|
||||
|
||||
if (inventory != null) {
|
||||
// Reserve the inventory
|
||||
inventory = inventory.reserve(reservation.getIdentity());
|
||||
while (!reserved) {
|
||||
inventory = inventoryRepository
|
||||
.findFirstInventoryByWarehouseIdAndProductIdAndStatus(reservation.getWarehouse()
|
||||
.getIdentity(), reservation.getProductId(), InventoryStatus.RESERVATION_PENDING);
|
||||
if (inventory != null) {
|
||||
// Acquire lock
|
||||
RLock inventoryLock = redissonClient
|
||||
.getLock(String.format("inventory_%s", inventory.getIdentity().toString()));
|
||||
|
||||
Boolean lock = false;
|
||||
|
||||
try {
|
||||
lock = inventoryLock.tryLock(30, 5000, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Interrupted while acquiring lock on inventory", e);
|
||||
}
|
||||
|
||||
if (lock) {
|
||||
inventory.setStatus(InventoryStatus.RESERVATION_CONNECTED);
|
||||
inventory = update(inventory);
|
||||
|
||||
// Reserve the inventory
|
||||
inventory = inventory.reserve(reservation.getIdentity());
|
||||
|
||||
inventoryLock.unlock();
|
||||
}
|
||||
|
||||
reserved = lock;
|
||||
} else {
|
||||
reserved = true;
|
||||
}
|
||||
}
|
||||
|
||||
return inventory;
|
||||
|
||||
@@ -5,9 +5,7 @@ import demo.inventory.domain.InventoryStatus;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface InventoryRepository extends JpaRepository<Inventory, Long> {
|
||||
Optional<Inventory> findFirstInventoryByWarehouseIdAndProductIdAndStatus(@Param("warehouseId") Long warehouseId,
|
||||
Inventory findFirstInventoryByWarehouseIdAndProductIdAndStatus(@Param("warehouseId") Long warehouseId,
|
||||
@Param("productId") String productId, @Param("status") InventoryStatus status);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
spring:
|
||||
profiles:
|
||||
active: development
|
||||
server:
|
||||
port: 0
|
||||
events:
|
||||
worker: http://warehouse-worker/v1/events
|
||||
---
|
||||
spring:
|
||||
profiles: development
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
@@ -13,19 +20,29 @@ spring:
|
||||
inventory:
|
||||
contentType: 'application/json'
|
||||
destination: inventory
|
||||
server:
|
||||
port: 0
|
||||
events:
|
||||
worker: http://warehouse-worker/v1/events
|
||||
---
|
||||
spring:
|
||||
profiles: development
|
||||
---
|
||||
spring:
|
||||
profiles: docker
|
||||
rabbitmq:
|
||||
host: ${DOCKER_IP:192.168.99.100}
|
||||
port: 5672
|
||||
datasource:
|
||||
url: jdbc:h2:mem:AZ;MVCC=TRUE
|
||||
redis:
|
||||
host: ${DOCKER_IP:192.168.99.100}
|
||||
port: 6379
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
warehouse:
|
||||
contentType: 'application/json'
|
||||
destination: warehouse
|
||||
reservation:
|
||||
contentType: 'application/json'
|
||||
destination: reservation
|
||||
inventory:
|
||||
contentType: 'application/json'
|
||||
destination: inventory
|
||||
eureka:
|
||||
client:
|
||||
service-url:
|
||||
@@ -33,16 +50,32 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
profiles: test
|
||||
redis:
|
||||
host: localhost
|
||||
port: 6379
|
||||
eureka:
|
||||
client:
|
||||
enabled: false
|
||||
---
|
||||
spring:
|
||||
profiles: cloud
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
warehouse:
|
||||
contentType: 'application/json'
|
||||
destination: warehouse
|
||||
reservation:
|
||||
contentType: 'application/json'
|
||||
destination: reservation
|
||||
inventory:
|
||||
contentType: 'application/json'
|
||||
destination: inventory
|
||||
eureka:
|
||||
instance:
|
||||
hostname: ${vcap.application.uris[0]:localhost}
|
||||
|
||||
@@ -2,6 +2,7 @@ package demo;
|
||||
|
||||
import demo.inventory.domain.Inventory;
|
||||
import demo.inventory.domain.InventoryService;
|
||||
import demo.inventory.domain.InventoryStatus;
|
||||
import demo.inventory.event.InventoryEventService;
|
||||
import demo.inventory.repository.InventoryRepository;
|
||||
import demo.reservation.domain.Reservation;
|
||||
@@ -22,6 +23,7 @@ import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -95,7 +97,7 @@ public class WarehouseServiceTests {
|
||||
actualInventory = inventoryService.get(1L);
|
||||
assertThat(actualInventory.getWarehouse()).isNotNull();
|
||||
|
||||
List<Warehouse> warehouseFound = warehouseRepository.findAllWithInventory(3L ,Arrays
|
||||
List<Warehouse> warehouseFound = warehouseRepository.findAllWithInventory(3L, Arrays
|
||||
.asList("SKU-001", "SKU-002", "SKU-003"));
|
||||
|
||||
List<Warehouse> warehouseNotFound = warehouseRepository.findAllWithInventory(2L, Arrays
|
||||
@@ -103,4 +105,37 @@ public class WarehouseServiceTests {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void asyncInventoryReservationTest() throws Exception {
|
||||
|
||||
Warehouse warehouse = new Warehouse();
|
||||
warehouse = warehouseRepository.save(warehouse);
|
||||
|
||||
Inventory inventory1 = new Inventory();
|
||||
inventory1.setProductId("SKU-001");
|
||||
inventory1.setWarehouse(warehouse);
|
||||
inventory1.setStatus(InventoryStatus.RESERVATION_PENDING);
|
||||
Inventory inventory2 = new Inventory();
|
||||
inventory2.setProductId("SKU-001");
|
||||
inventory2.setWarehouse(warehouse);
|
||||
inventory2.setStatus(InventoryStatus.RESERVATION_PENDING);
|
||||
List<Inventory> inventories = inventoryRepository.save(Arrays.asList(inventory1, inventory2));
|
||||
|
||||
Reservation reservation1 = new Reservation();
|
||||
reservation1.setProductId("SKU-001");
|
||||
reservation1.setWarehouse(warehouse);
|
||||
Reservation reservation2 = new Reservation();
|
||||
reservation2.setProductId("SKU-001");
|
||||
reservation2.setWarehouse(warehouse);
|
||||
List<Reservation> reservations = reservationRepository.save(Arrays.asList(reservation1, reservation2));
|
||||
|
||||
List<Inventory> reservedInventory = reservations.parallelStream()
|
||||
.map(a -> inventoryService.findAvailableInventory(a)).collect(Collectors
|
||||
.toList());
|
||||
|
||||
assertThat(reservedInventory).isNotEmpty();
|
||||
assertThat(reservedInventory.size()).isEqualTo(2);
|
||||
assertThat(reservedInventory.get(0)).isNotSameAs(reservedInventory.get(1));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -10,18 +10,21 @@ spring:
|
||||
group: warehouse-group
|
||||
consumer:
|
||||
durableSubscription: true
|
||||
concurrency: 5
|
||||
reservation:
|
||||
contentType: 'application/json'
|
||||
destination: reservation
|
||||
group: reservation-group
|
||||
consumer:
|
||||
durableSubscription: true
|
||||
concurrency: 5
|
||||
inventory:
|
||||
contentType: 'application/json'
|
||||
destination: inventory
|
||||
group: inventory-group
|
||||
consumer:
|
||||
durableSubscription: true
|
||||
concurrency: 5
|
||||
server:
|
||||
port: 0
|
||||
---
|
||||
@@ -40,6 +43,7 @@ eureka:
|
||||
registryFetchIntervalSeconds: 5
|
||||
instance:
|
||||
hostname: ${DOCKER_IP:192.168.99.100}
|
||||
instance-id: ${spring.application.name}:${random.int}
|
||||
leaseRenewalIntervalInSeconds: 5
|
||||
---
|
||||
spring:
|
||||
|
||||
Reference in New Issue
Block a user