diff --git a/spring-boot/request-response/client/build.gradle b/spring-boot/request-response/client/build.gradle index b257ce3..bcc6093 100644 --- a/spring-boot/request-response/client/build.gradle +++ b/spring-boot/request-response/client/build.gradle @@ -20,18 +20,24 @@ repositories { dependencies { implementation 'org.springframework.boot:spring-boot-starter-amqp' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + compile group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.3.1.RELEASE' + + implementation 'org.flywaydb:flyway-core' + compile 'com.fasterxml.jackson.core:jackson-core' + compile 'com.fasterxml.jackson.core:jackson-annotations' + compile 'com.fasterxml.jackson.core:jackson-databind' + + compileOnly 'org.projectlombok:lombok' + runtimeOnly 'com.h2database:h2' + testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.amqp:spring-rabbit-test' - compileOnly 'org.projectlombok:lombok' + testImplementation "org.testcontainers:rabbitmq:1.14.3" + annotationProcessor 'org.projectlombok:lombok' - - implementation 'com.fasterxml.jackson.core:jackson-core:2.9.6' - implementation 'com.fasterxml.jackson.core:jackson-annotations:2.9.6' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.6' - - testCompile "org.testcontainers:rabbitmq:1.14.3" } test { diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/PublisherConfiguration.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/config/PublisherConfiguration.java similarity index 84% rename from spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/PublisherConfiguration.java rename to spring-boot/request-response/client/src/main/java/io/reflectoring/client/config/PublisherConfiguration.java index 0857895..ac837fa 100644 --- a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/PublisherConfiguration.java +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/config/PublisherConfiguration.java @@ -1,6 +1,7 @@ -package io.reflectoring.client.rpc; +package io.reflectoring.client.config; import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.AsyncRabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; @@ -26,4 +27,9 @@ public class PublisherConfiguration { public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){ return new AsyncRabbitTemplate(rabbitTemplate); } + + @Bean + public Queue response(){ + return new Queue("response"); + } } diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/Car.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/CarDto.java similarity index 93% rename from spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/Car.java rename to spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/CarDto.java index 20f5b53..eaca0d6 100644 --- a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/Car.java +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/CarDto.java @@ -11,7 +11,7 @@ import java.util.UUID; @NoArgsConstructor @AllArgsConstructor @Builder -public class Car { +public class CarDto { private UUID id; private String name; diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/Registration.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/RegistrationDto.java similarity index 91% rename from spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/Registration.java rename to spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/RegistrationDto.java index 8631acd..7d04e67 100644 --- a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/Registration.java +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/dto/RegistrationDto.java @@ -12,7 +12,7 @@ import java.util.UUID; @NoArgsConstructor @AllArgsConstructor @Builder -public class Registration { +public class RegistrationDto { private UUID id; private Date date; diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/async/ReplyConsumer.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/async/ReplyConsumer.java new file mode 100644 index 0000000..566144d --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/async/ReplyConsumer.java @@ -0,0 +1,34 @@ +package io.reflectoring.client.registration.async; + + +import io.reflectoring.client.dto.RegistrationDto; +import io.reflectoring.client.registration.service.RegistrationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.UUID; + + +@Component +@Transactional +public class ReplyConsumer { + + private final RegistrationService registrationService; + + private static final Logger LOGGER = LoggerFactory.getLogger(ReplyConsumer.class); + + public ReplyConsumer(RegistrationService registrationService) { + this.registrationService = registrationService; + } + + @RabbitListener(queues = "#{response.name}", concurrency = "10") + public void receive(RegistrationDto registrationDto, Message message){ + String correlationId = message.getMessageProperties().getCorrelationId(); + registrationService.saveRegistration(UUID.fromString(correlationId), registrationDto); + LOGGER.info("Registration {} with correlation id {} received", registrationDto, correlationId); + } +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/async/StatelessClient.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/async/StatelessClient.java new file mode 100644 index 0000000..a8417aa --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/async/StatelessClient.java @@ -0,0 +1,62 @@ +package io.reflectoring.client.registration.async; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.reflectoring.client.dto.CarDto; +import io.reflectoring.client.registration.service.RegistrationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.UUID; + +@Component +@Transactional +public class StatelessClient { + + public static final Logger LOGGER = LoggerFactory.getLogger(StatelessClient.class); + + private final RabbitTemplate template; + + private final DirectExchange directExchange; + + private final Queue replyQueue; + + + private final RegistrationService registrationService; + + public StatelessClient(RabbitTemplate template, DirectExchange directExchange, Queue replyQueue, RegistrationService registrationService) { + this.template = template; + this.directExchange = directExchange; + this.replyQueue = replyQueue; + this.registrationService = registrationService; + } + + @Scheduled(fixedDelay = 3000) + public void sendAndForget() throws JsonProcessingException { + + CarDto carDto = CarDto.builder() + .id(UUID.randomUUID()) + .color("white") + .name("vw") + .build(); + LOGGER.info("Sending message with routing key {} and id {}", "old.car", carDto.getId()); + + UUID correlationId = UUID.randomUUID(); + + registrationService.saveCar(carDto, correlationId); + + byte[] body = new ObjectMapper().writeValueAsBytes(carDto); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setReplyTo(replyQueue.getName()); + messageProperties.setCorrelationId(correlationId.toString()); + Message message = MessageBuilder.withBody(body) + .andProperties(messageProperties).build(); + + template.send(directExchange.getName(), "old.car", message); + } +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/Car.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/Car.java new file mode 100644 index 0000000..1b86024 --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/Car.java @@ -0,0 +1,28 @@ +package io.reflectoring.client.registration.persistance; + + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import java.util.UUID; + +@Entity +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class Car { + + @Id + @GeneratedValue(strategy = GenerationType.AUTO) + private UUID id; + private String name; + private String color; + private UUID correlationId; +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/CarRepository.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/CarRepository.java new file mode 100644 index 0000000..1a723de --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/CarRepository.java @@ -0,0 +1,10 @@ +package io.reflectoring.client.registration.persistance; + +import org.springframework.data.jpa.repository.JpaRepository; + +import java.util.UUID; + +public interface CarRepository extends JpaRepository { + + Car findByCorrelationId(UUID correlationId); +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/Registration.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/Registration.java new file mode 100644 index 0000000..869a5e4 --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/Registration.java @@ -0,0 +1,27 @@ +package io.reflectoring.client.registration.persistance; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.persistence.*; +import java.util.Date; +import java.util.UUID; + +@Entity +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class Registration { + + @Id + @GeneratedValue(strategy = GenerationType.AUTO) + private UUID id; + private Date date; + private String owner; + private String signature; + @ManyToOne + private Car car; +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/RegistrationRepository.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/RegistrationRepository.java new file mode 100644 index 0000000..2cb353d --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/persistance/RegistrationRepository.java @@ -0,0 +1,8 @@ +package io.reflectoring.client.registration.persistance; + +import org.springframework.data.jpa.repository.JpaRepository; + +import java.util.UUID; + +public interface RegistrationRepository extends JpaRepository { +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/SchedulingConfiguration.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/rpc/SchedulingConfiguration.java similarity index 89% rename from spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/SchedulingConfiguration.java rename to spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/rpc/SchedulingConfiguration.java index 16c27cc..63af413 100644 --- a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/SchedulingConfiguration.java +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/rpc/SchedulingConfiguration.java @@ -1,4 +1,4 @@ -package io.reflectoring.client.rpc; +package io.reflectoring.client.registration.rpc; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/rpc/StatefulClient.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/rpc/StatefulClient.java new file mode 100644 index 0000000..53cb4f8 --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/rpc/StatefulClient.java @@ -0,0 +1,113 @@ +package io.reflectoring.client.registration.rpc; + +import io.reflectoring.client.dto.CarDto; +import io.reflectoring.client.dto.RegistrationDto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.rabbit.AsyncRabbitTemplate; +import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +@Component +public class StatefulClient { + + public static final Logger LOGGER = LoggerFactory.getLogger(StatefulClient.class); + + private final RabbitTemplate template; + + private final AsyncRabbitTemplate asyncRabbitTemplate; + + private final DirectExchange directExchange; + public static final String ROUTING_KEY = "old.car"; + + public StatefulClient(DirectExchange directExchange, RabbitTemplate template, + AsyncRabbitTemplate asyncRabbitTemplate) { + this.directExchange = directExchange; + this.template = template; + this.asyncRabbitTemplate = asyncRabbitTemplate; + } + + @Scheduled(fixedDelay = 3000) + public void send() { + CarDto carDto = CarDto.builder() + .id(UUID.randomUUID()) + .color("white") + .name("vw") + .build(); + LOGGER.info("Sending message with routing key {} and id {}", ROUTING_KEY, carDto.getId()); + + ParameterizedTypeReference responseType + = new ParameterizedTypeReference<>() { + }; + RegistrationDto registrationDto = template.convertSendAndReceiveAsType( + directExchange.getName(), ROUTING_KEY, carDto, responseType); + LOGGER.info("Message received: {}", registrationDto); + } + + + + + @Scheduled(fixedDelay = 3000, initialDelay = 1500) + public void sendAsynchronously() { + CarDto carDto = CarDto.builder() + .id(UUID.randomUUID()) + .color("black") + .name("bmw") + .build(); + LOGGER.info("Sending message with routing key {} and id {}", ROUTING_KEY, carDto.getId()); + + ListenableFuture listenableFuture = + asyncRabbitTemplate.convertSendAndReceiveAsType( + directExchange.getName(), + ROUTING_KEY, + carDto, + new ParameterizedTypeReference<>() { + }); + // non blocking part + try { + RegistrationDto registrationDto = listenableFuture.get(); + LOGGER.info("Asynchronous message received: {}", registrationDto); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("Cannot get response.", e); + } + } + + @Scheduled(fixedDelay = 3000, initialDelay = 1500) + public void sendAsynchronouslyWithCallback() { + CarDto carDto = CarDto.builder() + .id(UUID.randomUUID()) + .color("black") + .name("bmw") + .build(); + LOGGER.info("Sending message with routing key {} and id {}", ROUTING_KEY, carDto.getId()); + + ParameterizedTypeReference responseType + = new ParameterizedTypeReference<>() { + }; + RabbitConverterFuture rabbitConverterFuture = + asyncRabbitTemplate.convertSendAndReceiveAsType( + directExchange.getName(), ROUTING_KEY, carDto, responseType); + + rabbitConverterFuture.addCallback(new ListenableFutureCallback<>() { + @Override + public void onFailure(Throwable ex) { + LOGGER.error("Cannot get response for: {}", carDto.getId(), ex); + } + + @Override + public void onSuccess(RegistrationDto registrationDto) { + LOGGER.info("Registration received {}", registrationDto); + } + }); + LOGGER.info("Message {} sent", carDto); + } +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/CarMapper.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/CarMapper.java new file mode 100644 index 0000000..30520c9 --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/CarMapper.java @@ -0,0 +1,25 @@ +package io.reflectoring.client.registration.service; + +import io.reflectoring.client.dto.CarDto; +import io.reflectoring.client.registration.persistance.Car; +import org.springframework.stereotype.Component; + +@Component +public class CarMapper { + + Car toCar(CarDto carDto) { + return Car.builder() + .id(carDto.getId()) + .color(carDto.getColor()) + .name(carDto.getName()) + .build(); + } + + CarDto toCarDto(Car car) { + return CarDto.builder() + .id(car.getId()) + .color(car.getColor()) + .name(car.getName()) + .build(); + } +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/RegistrationMapper.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/RegistrationMapper.java new file mode 100644 index 0000000..cb10210 --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/RegistrationMapper.java @@ -0,0 +1,18 @@ +package io.reflectoring.client.registration.service; + +import io.reflectoring.client.dto.RegistrationDto; +import io.reflectoring.client.registration.persistance.Registration; +import org.springframework.stereotype.Component; + +@Component +public class RegistrationMapper { + + public Registration toRegistration(RegistrationDto registrationDto){ + return Registration.builder() + .id(registrationDto.getId()) + .date(registrationDto.getDate()) + .owner(registrationDto.getOwner()) + .signature(registrationDto.getSignature()) + .build(); + } +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/RegistrationService.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/RegistrationService.java new file mode 100644 index 0000000..d2ea58f --- /dev/null +++ b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/registration/service/RegistrationService.java @@ -0,0 +1,40 @@ +package io.reflectoring.client.registration.service; + +import io.reflectoring.client.dto.CarDto; +import io.reflectoring.client.dto.RegistrationDto; +import io.reflectoring.client.registration.persistance.Car; +import io.reflectoring.client.registration.persistance.CarRepository; +import io.reflectoring.client.registration.persistance.Registration; +import io.reflectoring.client.registration.persistance.RegistrationRepository; +import org.springframework.stereotype.Service; + +import java.util.UUID; + +@Service +public class RegistrationService { + + private final CarMapper carMapper; + private final RegistrationMapper registrationMapper; + private final CarRepository carRepository; + private final RegistrationRepository registrationRepository; + + public RegistrationService(CarMapper carMapper, RegistrationMapper registrationMapper, CarRepository carRepository, RegistrationRepository registrationRepository) { + this.carMapper = carMapper; + this.registrationMapper = registrationMapper; + this.carRepository = carRepository; + this.registrationRepository = registrationRepository; + } + + public void saveCar(CarDto carDto, UUID correlationId) { + Car car = carMapper.toCar(carDto); + car.setCorrelationId(correlationId); + carRepository.save(car); + } + + public void saveRegistration(UUID correlationId, RegistrationDto registrationDto){ + Registration registration = registrationMapper.toRegistration(registrationDto); + Car car = carRepository.findByCorrelationId(correlationId); + registration.setCar(car); + registrationRepository.save(registration); + } +} diff --git a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/EventPublisher.java b/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/EventPublisher.java deleted file mode 100644 index b4fe95c..0000000 --- a/spring-boot/request-response/client/src/main/java/io/reflectoring/client/rpc/EventPublisher.java +++ /dev/null @@ -1,77 +0,0 @@ -package io.reflectoring.client.rpc; - -import io.reflectoring.client.dto.Car; -import io.reflectoring.client.dto.Registration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.rabbit.AsyncRabbitTemplate; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.UUID; -import java.util.concurrent.ExecutionException; - -@Component -public class EventPublisher { - - public static final Logger LOGGER = LoggerFactory.getLogger(EventPublisher.class); - - private final RabbitTemplate template; - - private final AsyncRabbitTemplate asyncRabbitTemplate; - - private final DirectExchange directExchange; - - public EventPublisher(DirectExchange directExchange, RabbitTemplate template, - AsyncRabbitTemplate asyncRabbitTemplate) { - this.directExchange = directExchange; - this.template = template; - this.asyncRabbitTemplate = asyncRabbitTemplate; - } - - @Scheduled(fixedDelay = 3000) - public void send() { - String key = "old.car"; - Car car = Car.builder() - .id(UUID.randomUUID()) - .color("white") - .name("vw") - .build(); - LOGGER.info("Sending message with routing key {} and id {}", key, car.getId()); - - ParameterizedTypeReference responseType - = new ParameterizedTypeReference<>() { - }; - Registration registration = template.convertSendAndReceiveAsType( - directExchange.getName(), key, car, responseType); - LOGGER.info("Message received: {}", registration); - } - - - @Scheduled(fixedDelay = 3000, initialDelay = 1500) - public void sendAsynchronously() { - String key = "old.car"; - Car car = Car.builder() - .id(UUID.randomUUID()) - .color("black") - .name("bmw") - .build(); - LOGGER.info("Sending message with routing key {} and id {}", key, car.getId()); - - ParameterizedTypeReference responseType - = new ParameterizedTypeReference<>() { - }; - AsyncRabbitTemplate.RabbitConverterFuture future = - asyncRabbitTemplate.convertSendAndReceiveAsType( - directExchange.getName(), key, car, responseType); - try { - Registration registration = future.get(); - LOGGER.info("Asynchronous message received: {}", registration); - } catch (InterruptedException | ExecutionException e) { - LOGGER.error("Cannot get response.", e); - } - } -} diff --git a/spring-boot/request-response/client/src/main/resources/application.properties b/spring-boot/request-response/client/src/main/resources/application.properties deleted file mode 100644 index 8b13789..0000000 --- a/spring-boot/request-response/client/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/spring-boot/request-response/client/src/main/resources/application.yaml b/spring-boot/request-response/client/src/main/resources/application.yaml new file mode 100644 index 0000000..7f6d815 --- /dev/null +++ b/spring-boot/request-response/client/src/main/resources/application.yaml @@ -0,0 +1,13 @@ +spring: + h2: + console: + enabled: true + path: /h2-console + + datasource: + url: jdbc:h2:mem:testdb + driverClassName: org.h2.Driver + username: sa + password: password + jpa: + database-platform: org.hibernate.dialect.H2Dialect diff --git a/spring-boot/request-response/client/src/main/resources/db/migration/V1_1__init.sql b/spring-boot/request-response/client/src/main/resources/db/migration/V1_1__init.sql new file mode 100644 index 0000000..bd804ee --- /dev/null +++ b/spring-boot/request-response/client/src/main/resources/db/migration/V1_1__init.sql @@ -0,0 +1,16 @@ +CREATE TABLE car +( + id UUID PRIMARY KEY, + name VARCHAR(512), + color VARCHAR(512), + correlation_id UUID +); + +CREATE TABLE registration +( + id UUID PRIMARY KEY, + date DATE, + owner VARCHAR(512), + signature VARCHAR(1024), + car_id UUID +) \ No newline at end of file diff --git a/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/AbstractIntegrationTest.java b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/AbstractIntegrationTest.java new file mode 100644 index 0000000..c7eec5b --- /dev/null +++ b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/AbstractIntegrationTest.java @@ -0,0 +1,48 @@ +package io.reflectoring.client.registration; + +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.test.context.ContextConfiguration; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.lifecycle.Startables; + +import java.util.Map; +import java.util.stream.Stream; + +@ContextConfiguration(initializers = AbstractIntegrationTest.Initializer.class) +public class AbstractIntegrationTest { + + static class Initializer + implements ApplicationContextInitializer { + + static RabbitMQContainer rabbitMQContainer = new RabbitMQContainer(); + + private static void startContainers() { + Startables.deepStart(Stream.of(rabbitMQContainer)).join(); + // we can add further containers here like rabbitmq or other database + } + + private static Map createConnectionConfiguration() { + return Map.of( + "spring.rabbitmq.host", rabbitMQContainer.getHost(), + "spring.rabbitmq.port", rabbitMQContainer.getAmqpPort().toString(), + "spring.rabbitmq.username", rabbitMQContainer.getAdminUsername(), + "spring.rabbitmq.password", rabbitMQContainer.getAdminPassword() + ); + } + + + @Override + public void initialize(ConfigurableApplicationContext applicationContext) { + startContainers(); + ConfigurableEnvironment environment = applicationContext.getEnvironment(); + MapPropertySource testcontainers = new MapPropertySource( + "testcontainers", + (Map) createConnectionConfiguration() + ); + environment.getPropertySources().addFirst(testcontainers); + } + } +} diff --git a/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/async/StatelessClientTest.java b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/async/StatelessClientTest.java new file mode 100644 index 0000000..fb066e5 --- /dev/null +++ b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/async/StatelessClientTest.java @@ -0,0 +1,29 @@ +package io.reflectoring.client.registration.async; + +import io.reflectoring.client.registration.AbstractIntegrationTest; +import org.assertj.core.api.ThrowableAssert; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; + +import static org.assertj.core.api.Assertions.assertThatCode; + +@SpringBootTest +@TestPropertySource(properties = "scheduling.enable=false") +class StatelessClientTest extends AbstractIntegrationTest { + + @Autowired + private StatelessClient statelessClient; + + @Test + void sendAndForget() { + // given + + // when + ThrowableAssert.ThrowingCallable send = () -> statelessClient.sendAndForget(); + + // then + assertThatCode(send).doesNotThrowAnyException(); + } +} \ No newline at end of file diff --git a/spring-boot/request-response/client/src/test/java/io/reflectoring/client/rpc/ClientApplicationTests.java b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/rpc/ClientApplicationTests.java similarity index 79% rename from spring-boot/request-response/client/src/test/java/io/reflectoring/client/rpc/ClientApplicationTests.java rename to spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/rpc/ClientApplicationTests.java index 3d5a1f5..33574f2 100644 --- a/spring-boot/request-response/client/src/test/java/io/reflectoring/client/rpc/ClientApplicationTests.java +++ b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/rpc/ClientApplicationTests.java @@ -1,4 +1,4 @@ -package io.reflectoring.client.rpc; +package io.reflectoring.client.registration.rpc; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; diff --git a/spring-boot/request-response/client/src/test/java/io/reflectoring/client/rpc/EventPublisherTest.java b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/rpc/StatefulClientTest.java similarity index 58% rename from spring-boot/request-response/client/src/test/java/io/reflectoring/client/rpc/EventPublisherTest.java rename to spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/rpc/StatefulClientTest.java index c4c42cb..c3f3286 100644 --- a/spring-boot/request-response/client/src/test/java/io/reflectoring/client/rpc/EventPublisherTest.java +++ b/spring-boot/request-response/client/src/test/java/io/reflectoring/client/registration/rpc/StatefulClientTest.java @@ -1,35 +1,27 @@ -package io.reflectoring.client.rpc; +package io.reflectoring.client.registration.rpc; +import io.reflectoring.client.registration.AbstractIntegrationTest; import org.assertj.core.api.ThrowableAssert; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.TestPropertySource; -import org.testcontainers.containers.RabbitMQContainer; import static org.assertj.core.api.Assertions.assertThatCode; @SpringBootTest @TestPropertySource(properties = "scheduling.enable=false") -class EventPublisherTest { - - private final RabbitMQContainer rabbitMQContainer = new RabbitMQContainer(); +class StatefulClientTest extends AbstractIntegrationTest { @Autowired - private EventPublisher eventPublisher; - - @BeforeEach - void setUp() { - rabbitMQContainer.start(); - } + private StatefulClient statefulClient; @Test void sendMessageSynchronously() { // given // when - ThrowableAssert.ThrowingCallable send = () -> eventPublisher.send(); + ThrowableAssert.ThrowingCallable send = () -> statefulClient.send(); // then assertThatCode(send).doesNotThrowAnyException(); @@ -40,7 +32,7 @@ class EventPublisherTest { // given // when - ThrowableAssert.ThrowingCallable send = () -> eventPublisher.sendAsynchronously(); + ThrowableAssert.ThrowingCallable send = () -> statefulClient.sendAsynchronously(); // then assertThatCode(send).doesNotThrowAnyException(); diff --git a/spring-boot/request-response/server/build.gradle b/spring-boot/request-response/server/build.gradle index 40a0307..089e19f 100644 --- a/spring-boot/request-response/server/build.gradle +++ b/spring-boot/request-response/server/build.gradle @@ -24,6 +24,8 @@ dependencies { annotationProcessor 'org.projectlombok:lombok' implementation 'com.fasterxml.jackson.core:jackson-core:2.9.6' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.9.6' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.6' } test { diff --git a/spring-boot/request-response/server/src/main/java/io/reflectoring/server/rpc/EventConsumer.java b/spring-boot/request-response/server/src/main/java/io/reflectoring/server/rpc/EventConsumer.java index 16c97e5..8d2f8c6 100644 --- a/spring-boot/request-response/server/src/main/java/io/reflectoring/server/rpc/EventConsumer.java +++ b/spring-boot/request-response/server/src/main/java/io/reflectoring/server/rpc/EventConsumer.java @@ -8,6 +8,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; +import java.util.concurrent.TimeUnit; @Component public class EventConsumer { @@ -15,9 +16,11 @@ public class EventConsumer { public static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class); - @RabbitListener(queues = "#{queue.name}") - public Registration receive(Car car) { + @RabbitListener(queues = "#{queue.name}", concurrency = "10") + public Registration receive(Car car) throws InterruptedException { LOGGER.info("Message received {} ", car); + TimeUnit.SECONDS.sleep(10); + LOGGER.info("Message proceeded {} ", car); return Registration.builder() .id(car.getId()) .date(new Date())