Add Listener for Reply Consumer.

This commit is contained in:
akuksin
2020-07-25 12:27:23 +02:00
parent e4e5c001ba
commit f2dd80ff03
25 changed files with 508 additions and 106 deletions

View File

@@ -20,18 +20,24 @@ repositories {
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.3.1.RELEASE'
implementation 'org.flywaydb:flyway-core'
compile 'com.fasterxml.jackson.core:jackson-core'
compile 'com.fasterxml.jackson.core:jackson-annotations'
compile 'com.fasterxml.jackson.core:jackson-databind'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'org.springframework.amqp:spring-rabbit-test'
compileOnly 'org.projectlombok:lombok'
testImplementation "org.testcontainers:rabbitmq:1.14.3"
annotationProcessor 'org.projectlombok:lombok'
implementation 'com.fasterxml.jackson.core:jackson-core:2.9.6'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.9.6'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.6'
testCompile "org.testcontainers:rabbitmq:1.14.3"
}
test {

View File

@@ -1,6 +1,7 @@
package io.reflectoring.client.rpc;
package io.reflectoring.client.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
@@ -26,4 +27,9 @@ public class PublisherConfiguration {
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
return new AsyncRabbitTemplate(rabbitTemplate);
}
@Bean
public Queue response(){
return new Queue("response");
}
}

View File

@@ -11,7 +11,7 @@ import java.util.UUID;
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Car {
public class CarDto {
private UUID id;
private String name;

View File

@@ -12,7 +12,7 @@ import java.util.UUID;
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Registration {
public class RegistrationDto {
private UUID id;
private Date date;

View File

@@ -0,0 +1,34 @@
package io.reflectoring.client.registration.async;
import io.reflectoring.client.dto.RegistrationDto;
import io.reflectoring.client.registration.service.RegistrationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Component
@Transactional
public class ReplyConsumer {
private final RegistrationService registrationService;
private static final Logger LOGGER = LoggerFactory.getLogger(ReplyConsumer.class);
public ReplyConsumer(RegistrationService registrationService) {
this.registrationService = registrationService;
}
@RabbitListener(queues = "#{response.name}", concurrency = "10")
public void receive(RegistrationDto registrationDto, Message message){
String correlationId = message.getMessageProperties().getCorrelationId();
registrationService.saveRegistration(UUID.fromString(correlationId), registrationDto);
LOGGER.info("Registration {} with correlation id {} received", registrationDto, correlationId);
}
}

View File

@@ -0,0 +1,62 @@
package io.reflectoring.client.registration.async;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.reflectoring.client.dto.CarDto;
import io.reflectoring.client.registration.service.RegistrationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Component
@Transactional
public class StatelessClient {
public static final Logger LOGGER = LoggerFactory.getLogger(StatelessClient.class);
private final RabbitTemplate template;
private final DirectExchange directExchange;
private final Queue replyQueue;
private final RegistrationService registrationService;
public StatelessClient(RabbitTemplate template, DirectExchange directExchange, Queue replyQueue, RegistrationService registrationService) {
this.template = template;
this.directExchange = directExchange;
this.replyQueue = replyQueue;
this.registrationService = registrationService;
}
@Scheduled(fixedDelay = 3000)
public void sendAndForget() throws JsonProcessingException {
CarDto carDto = CarDto.builder()
.id(UUID.randomUUID())
.color("white")
.name("vw")
.build();
LOGGER.info("Sending message with routing key {} and id {}", "old.car", carDto.getId());
UUID correlationId = UUID.randomUUID();
registrationService.saveCar(carDto, correlationId);
byte[] body = new ObjectMapper().writeValueAsBytes(carDto);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setReplyTo(replyQueue.getName());
messageProperties.setCorrelationId(correlationId.toString());
Message message = MessageBuilder.withBody(body)
.andProperties(messageProperties).build();
template.send(directExchange.getName(), "old.car", message);
}
}

View File

@@ -0,0 +1,28 @@
package io.reflectoring.client.registration.persistance;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.UUID;
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Car {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private UUID id;
private String name;
private String color;
private UUID correlationId;
}

View File

@@ -0,0 +1,10 @@
package io.reflectoring.client.registration.persistance;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.UUID;
public interface CarRepository extends JpaRepository<Car, UUID> {
Car findByCorrelationId(UUID correlationId);
}

View File

@@ -0,0 +1,27 @@
package io.reflectoring.client.registration.persistance;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.util.Date;
import java.util.UUID;
@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Registration {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private UUID id;
private Date date;
private String owner;
private String signature;
@ManyToOne
private Car car;
}

View File

@@ -0,0 +1,8 @@
package io.reflectoring.client.registration.persistance;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.UUID;
public interface RegistrationRepository extends JpaRepository<Registration, UUID> {
}

View File

@@ -1,4 +1,4 @@
package io.reflectoring.client.rpc;
package io.reflectoring.client.registration.rpc;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;

View File

@@ -0,0 +1,113 @@
package io.reflectoring.client.registration.rpc;
import io.reflectoring.client.dto.CarDto;
import io.reflectoring.client.dto.RegistrationDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@Component
public class StatefulClient {
public static final Logger LOGGER = LoggerFactory.getLogger(StatefulClient.class);
private final RabbitTemplate template;
private final AsyncRabbitTemplate asyncRabbitTemplate;
private final DirectExchange directExchange;
public static final String ROUTING_KEY = "old.car";
public StatefulClient(DirectExchange directExchange, RabbitTemplate template,
AsyncRabbitTemplate asyncRabbitTemplate) {
this.directExchange = directExchange;
this.template = template;
this.asyncRabbitTemplate = asyncRabbitTemplate;
}
@Scheduled(fixedDelay = 3000)
public void send() {
CarDto carDto = CarDto.builder()
.id(UUID.randomUUID())
.color("white")
.name("vw")
.build();
LOGGER.info("Sending message with routing key {} and id {}", ROUTING_KEY, carDto.getId());
ParameterizedTypeReference<RegistrationDto> responseType
= new ParameterizedTypeReference<>() {
};
RegistrationDto registrationDto = template.convertSendAndReceiveAsType(
directExchange.getName(), ROUTING_KEY, carDto, responseType);
LOGGER.info("Message received: {}", registrationDto);
}
@Scheduled(fixedDelay = 3000, initialDelay = 1500)
public void sendAsynchronously() {
CarDto carDto = CarDto.builder()
.id(UUID.randomUUID())
.color("black")
.name("bmw")
.build();
LOGGER.info("Sending message with routing key {} and id {}", ROUTING_KEY, carDto.getId());
ListenableFuture<RegistrationDto> listenableFuture =
asyncRabbitTemplate.convertSendAndReceiveAsType(
directExchange.getName(),
ROUTING_KEY,
carDto,
new ParameterizedTypeReference<>() {
});
// non blocking part
try {
RegistrationDto registrationDto = listenableFuture.get();
LOGGER.info("Asynchronous message received: {}", registrationDto);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Cannot get response.", e);
}
}
@Scheduled(fixedDelay = 3000, initialDelay = 1500)
public void sendAsynchronouslyWithCallback() {
CarDto carDto = CarDto.builder()
.id(UUID.randomUUID())
.color("black")
.name("bmw")
.build();
LOGGER.info("Sending message with routing key {} and id {}", ROUTING_KEY, carDto.getId());
ParameterizedTypeReference<RegistrationDto> responseType
= new ParameterizedTypeReference<>() {
};
RabbitConverterFuture<RegistrationDto> rabbitConverterFuture =
asyncRabbitTemplate.convertSendAndReceiveAsType(
directExchange.getName(), ROUTING_KEY, carDto, responseType);
rabbitConverterFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.error("Cannot get response for: {}", carDto.getId(), ex);
}
@Override
public void onSuccess(RegistrationDto registrationDto) {
LOGGER.info("Registration received {}", registrationDto);
}
});
LOGGER.info("Message {} sent", carDto);
}
}

View File

@@ -0,0 +1,25 @@
package io.reflectoring.client.registration.service;
import io.reflectoring.client.dto.CarDto;
import io.reflectoring.client.registration.persistance.Car;
import org.springframework.stereotype.Component;
@Component
public class CarMapper {
Car toCar(CarDto carDto) {
return Car.builder()
.id(carDto.getId())
.color(carDto.getColor())
.name(carDto.getName())
.build();
}
CarDto toCarDto(Car car) {
return CarDto.builder()
.id(car.getId())
.color(car.getColor())
.name(car.getName())
.build();
}
}

View File

@@ -0,0 +1,18 @@
package io.reflectoring.client.registration.service;
import io.reflectoring.client.dto.RegistrationDto;
import io.reflectoring.client.registration.persistance.Registration;
import org.springframework.stereotype.Component;
@Component
public class RegistrationMapper {
public Registration toRegistration(RegistrationDto registrationDto){
return Registration.builder()
.id(registrationDto.getId())
.date(registrationDto.getDate())
.owner(registrationDto.getOwner())
.signature(registrationDto.getSignature())
.build();
}
}

View File

@@ -0,0 +1,40 @@
package io.reflectoring.client.registration.service;
import io.reflectoring.client.dto.CarDto;
import io.reflectoring.client.dto.RegistrationDto;
import io.reflectoring.client.registration.persistance.Car;
import io.reflectoring.client.registration.persistance.CarRepository;
import io.reflectoring.client.registration.persistance.Registration;
import io.reflectoring.client.registration.persistance.RegistrationRepository;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class RegistrationService {
private final CarMapper carMapper;
private final RegistrationMapper registrationMapper;
private final CarRepository carRepository;
private final RegistrationRepository registrationRepository;
public RegistrationService(CarMapper carMapper, RegistrationMapper registrationMapper, CarRepository carRepository, RegistrationRepository registrationRepository) {
this.carMapper = carMapper;
this.registrationMapper = registrationMapper;
this.carRepository = carRepository;
this.registrationRepository = registrationRepository;
}
public void saveCar(CarDto carDto, UUID correlationId) {
Car car = carMapper.toCar(carDto);
car.setCorrelationId(correlationId);
carRepository.save(car);
}
public void saveRegistration(UUID correlationId, RegistrationDto registrationDto){
Registration registration = registrationMapper.toRegistration(registrationDto);
Car car = carRepository.findByCorrelationId(correlationId);
registration.setCar(car);
registrationRepository.save(registration);
}
}

View File

@@ -1,77 +0,0 @@
package io.reflectoring.client.rpc;
import io.reflectoring.client.dto.Car;
import io.reflectoring.client.dto.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@Component
public class EventPublisher {
public static final Logger LOGGER = LoggerFactory.getLogger(EventPublisher.class);
private final RabbitTemplate template;
private final AsyncRabbitTemplate asyncRabbitTemplate;
private final DirectExchange directExchange;
public EventPublisher(DirectExchange directExchange, RabbitTemplate template,
AsyncRabbitTemplate asyncRabbitTemplate) {
this.directExchange = directExchange;
this.template = template;
this.asyncRabbitTemplate = asyncRabbitTemplate;
}
@Scheduled(fixedDelay = 3000)
public void send() {
String key = "old.car";
Car car = Car.builder()
.id(UUID.randomUUID())
.color("white")
.name("vw")
.build();
LOGGER.info("Sending message with routing key {} and id {}", key, car.getId());
ParameterizedTypeReference<Registration> responseType
= new ParameterizedTypeReference<>() {
};
Registration registration = template.convertSendAndReceiveAsType(
directExchange.getName(), key, car, responseType);
LOGGER.info("Message received: {}", registration);
}
@Scheduled(fixedDelay = 3000, initialDelay = 1500)
public void sendAsynchronously() {
String key = "old.car";
Car car = Car.builder()
.id(UUID.randomUUID())
.color("black")
.name("bmw")
.build();
LOGGER.info("Sending message with routing key {} and id {}", key, car.getId());
ParameterizedTypeReference<Registration> responseType
= new ParameterizedTypeReference<>() {
};
AsyncRabbitTemplate.RabbitConverterFuture<Registration> future =
asyncRabbitTemplate.convertSendAndReceiveAsType(
directExchange.getName(), key, car, responseType);
try {
Registration registration = future.get();
LOGGER.info("Asynchronous message received: {}", registration);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Cannot get response.", e);
}
}
}

View File

@@ -0,0 +1,13 @@
spring:
h2:
console:
enabled: true
path: /h2-console
datasource:
url: jdbc:h2:mem:testdb
driverClassName: org.h2.Driver
username: sa
password: password
jpa:
database-platform: org.hibernate.dialect.H2Dialect

View File

@@ -0,0 +1,16 @@
CREATE TABLE car
(
id UUID PRIMARY KEY,
name VARCHAR(512),
color VARCHAR(512),
correlation_id UUID
);
CREATE TABLE registration
(
id UUID PRIMARY KEY,
date DATE,
owner VARCHAR(512),
signature VARCHAR(1024),
car_id UUID
)

View File

@@ -0,0 +1,48 @@
package io.reflectoring.client.registration;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.test.context.ContextConfiguration;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.lifecycle.Startables;
import java.util.Map;
import java.util.stream.Stream;
@ContextConfiguration(initializers = AbstractIntegrationTest.Initializer.class)
public class AbstractIntegrationTest {
static class Initializer
implements ApplicationContextInitializer<ConfigurableApplicationContext> {
static RabbitMQContainer rabbitMQContainer = new RabbitMQContainer();
private static void startContainers() {
Startables.deepStart(Stream.of(rabbitMQContainer)).join();
// we can add further containers here like rabbitmq or other database
}
private static Map<String, String> createConnectionConfiguration() {
return Map.of(
"spring.rabbitmq.host", rabbitMQContainer.getHost(),
"spring.rabbitmq.port", rabbitMQContainer.getAmqpPort().toString(),
"spring.rabbitmq.username", rabbitMQContainer.getAdminUsername(),
"spring.rabbitmq.password", rabbitMQContainer.getAdminPassword()
);
}
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
startContainers();
ConfigurableEnvironment environment = applicationContext.getEnvironment();
MapPropertySource testcontainers = new MapPropertySource(
"testcontainers",
(Map) createConnectionConfiguration()
);
environment.getPropertySources().addFirst(testcontainers);
}
}
}

View File

@@ -0,0 +1,29 @@
package io.reflectoring.client.registration.async;
import io.reflectoring.client.registration.AbstractIntegrationTest;
import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;
import static org.assertj.core.api.Assertions.assertThatCode;
@SpringBootTest
@TestPropertySource(properties = "scheduling.enable=false")
class StatelessClientTest extends AbstractIntegrationTest {
@Autowired
private StatelessClient statelessClient;
@Test
void sendAndForget() {
// given
// when
ThrowableAssert.ThrowingCallable send = () -> statelessClient.sendAndForget();
// then
assertThatCode(send).doesNotThrowAnyException();
}
}

View File

@@ -1,4 +1,4 @@
package io.reflectoring.client.rpc;
package io.reflectoring.client.registration.rpc;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

View File

@@ -1,35 +1,27 @@
package io.reflectoring.client.rpc;
package io.reflectoring.client.registration.rpc;
import io.reflectoring.client.registration.AbstractIntegrationTest;
import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.containers.RabbitMQContainer;
import static org.assertj.core.api.Assertions.assertThatCode;
@SpringBootTest
@TestPropertySource(properties = "scheduling.enable=false")
class EventPublisherTest {
private final RabbitMQContainer rabbitMQContainer = new RabbitMQContainer();
class StatefulClientTest extends AbstractIntegrationTest {
@Autowired
private EventPublisher eventPublisher;
@BeforeEach
void setUp() {
rabbitMQContainer.start();
}
private StatefulClient statefulClient;
@Test
void sendMessageSynchronously() {
// given
// when
ThrowableAssert.ThrowingCallable send = () -> eventPublisher.send();
ThrowableAssert.ThrowingCallable send = () -> statefulClient.send();
// then
assertThatCode(send).doesNotThrowAnyException();
@@ -40,7 +32,7 @@ class EventPublisherTest {
// given
// when
ThrowableAssert.ThrowingCallable send = () -> eventPublisher.sendAsynchronously();
ThrowableAssert.ThrowingCallable send = () -> statefulClient.sendAsynchronously();
// then
assertThatCode(send).doesNotThrowAnyException();

View File

@@ -24,6 +24,8 @@ dependencies {
annotationProcessor 'org.projectlombok:lombok'
implementation 'com.fasterxml.jackson.core:jackson-core:2.9.6'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.9.6'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.6'
}
test {

View File

@@ -8,6 +8,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Component
public class EventConsumer {
@@ -15,9 +16,11 @@ public class EventConsumer {
public static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class);
@RabbitListener(queues = "#{queue.name}")
public Registration receive(Car car) {
@RabbitListener(queues = "#{queue.name}", concurrency = "10")
public Registration receive(Car car) throws InterruptedException {
LOGGER.info("Message received {} ", car);
TimeUnit.SECONDS.sleep(10);
LOGGER.info("Message proceeded {} ", car);
return Registration.builder()
.id(car.getId())
.date(new Date())