From 4dd386c5ec27412aec24e3d499e66ea1b97c8392 Mon Sep 17 00:00:00 2001 From: Alexander Molochko Date: Thu, 17 Oct 2019 03:01:56 +0300 Subject: [PATCH 1/2] BAEL-3200 Error handling with Spring AMQP --- .../errorhandling/ErrorHandlingApp.java | 25 +++++++ .../DLXCustomAmqpConfiguration.java | 43 +++++++++++ .../DLXDefaultAmqpConfiguration.java | 43 +++++++++++ .../DLXParkingLotAmqpConfiguration.java | 60 +++++++++++++++ ...talExceptionStrategyAmqpConfiguration.java | 54 +++++++++++++ ...ListenerErrorHandlerAmqpConfiguration.java | 46 ++++++++++++ .../SimpleDLQAmqpConfiguration.java | 34 +++++++++ .../consumer/MessagesConsumer.java | 75 +++++++++++++++++++ .../errorhandler/BusinessException.java | 4 + .../errorhandler/CustomErrorHandler.java | 14 ++++ .../CustomFatalExceptionStrategy.java | 10 +++ .../producer/MessageProducer.java | 24 ++++++ .../src/main/resources/application.properties | 1 + 13 files changed, 433 insertions(+) create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java create mode 100644 spring-amqp/src/main/resources/application.properties diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java new file mode 100644 index 0000000000..c006684083 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java @@ -0,0 +1,25 @@ +package com.baeldung.springamqp.errorhandling; + +import com.baeldung.springamqp.errorhandling.producer.MessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@EnableScheduling +public class ErrorHandlingApp { + @Autowired + MessageProducer messageProducer; + + public static void main(String[] args) { + SpringApplication.run(ErrorHandlingApp.class, args); + } + + @EventListener(ApplicationReadyEvent.class) + public void doSomethingAfterStartup() { + messageProducer.sendMessage(); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java new file mode 100644 index 0000000000..ba358cba52 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; + +//@Configuration +public class DLXCustomAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java new file mode 100644 index 0000000000..6e576109f7 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; + +//@Configuration +public class DLXDefaultAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java new file mode 100644 index 0000000000..c11914605b --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java @@ -0,0 +1,60 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; + +//@Configuration +public class DLXParkingLotAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot"; + public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot"; + + @Bean + FanoutExchange parkingLotExchange() { + return new FanoutExchange(EXCHANGE_PARKING_LOT); + } + + @Bean + Queue parkingLotQueue() { + return QueueBuilder.durable(QUEUE_PARKING_LOT).build(); + } + + @Bean + Binding parkingLotBinding() { + return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange()); + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java new file mode 100644 index 0000000000..3447c70420 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java @@ -0,0 +1,54 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; +import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ErrorHandler; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; + +@Configuration +public class FatalExceptionStrategyAmqpConfiguration { + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setErrorHandler(errorHandler()); + return factory; + } + + @Bean + public ErrorHandler errorHandler() { + return new ConditionalRejectingErrorHandler(customExceptionStrategy()); + } + + @Bean + FatalExceptionStrategy customExceptionStrategy() { + return new CustomFatalExceptionStrategy(); + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java new file mode 100644 index 0000000000..5c0c0afaaf --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java @@ -0,0 +1,46 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.util.ErrorHandler; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; + +//@Configuration +public class ListenerErrorHandlerAmqpConfiguration { + + @Bean + public ErrorHandler errorHandler() { + return new CustomErrorHandler(); + } + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setErrorHandler(errorHandler()); + return factory; + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java new file mode 100644 index 0000000000..26da2d59ba --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java @@ -0,0 +1,34 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +//@Configuration +public class SimpleDLQAmqpConfiguration { + public static final String QUEUE_MESSAGES = "baeldung-messages-queue"; + public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq"; + public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) + .build(); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java new file mode 100644 index 0000000000..3af9a84678 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java @@ -0,0 +1,75 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; + +@Service +public class MessagesConsumer { + public static final String HEADER_X_RETRIES_COUNT = "x-retries-count"; + public static final int MAX_RETRIES_COUNT = 1; + + private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class); + private final RabbitTemplate rabbitTemplate; + + public MessagesConsumer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) + public void receiveMessage(final Message message) throws BusinessException { + log.info("Received message: {}", message.toString()); + throw new BusinessException(); + } + + //@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ) + public void processFailedMessages(final Message message) { + log.info("Received failed message: {}", message.toString()); + } + + //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(final Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryHeaders(final Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) retriesCnt = 0; + log.info("Retrying message for the {} time", retriesCnt); + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Discarding message"); + return; + } + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + // @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) retriesCnt = 0; + log.info("Retrying message for the {} time", retriesCnt); + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Sending message to the parking lot queue"); + rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + return; + } + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + //@RabbitListener(queues = QUEUE_PARKING_LOT) + public void processParkingLotQueue(final Message failedMessage) { + log.info("Received message in parking lot queue"); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java new file mode 100644 index 0000000000..faec96e2aa --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java @@ -0,0 +1,4 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +public class BusinessException extends Exception { +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java new file mode 100644 index 0000000000..5c5e9cdf13 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java @@ -0,0 +1,14 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.util.ErrorHandler; + +public class CustomErrorHandler implements ErrorHandler { + + @Override + public void handleError(Throwable t) { + if (!(t.getCause() instanceof BusinessException)) { + throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t); + } + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java new file mode 100644 index 0000000000..e14be4e73c --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java @@ -0,0 +1,10 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; + +public class CustomFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { + @Override + public boolean isFatal(Throwable t) { + return !(t.getCause() instanceof BusinessException); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java new file mode 100644 index 0000000000..c14fd5bf3b --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java @@ -0,0 +1,24 @@ +package com.baeldung.springamqp.errorhandling.producer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +@Service +public class MessageProducer { + + private static final Logger log = LoggerFactory.getLogger(MessageProducer.class); + private int messageNumber = 0; + private final RabbitTemplate rabbitTemplate; + + public MessageProducer(final RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + public void sendMessage() { + log.info("Sending message..."); + rabbitTemplate.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES, SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/resources/application.properties b/spring-amqp/src/main/resources/application.properties new file mode 100644 index 0000000000..1353a0e447 --- /dev/null +++ b/spring-amqp/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.rabbitmq.listener.simple.default-requeue-rejected=false \ No newline at end of file From 6436440cb68e227cf42fee14db7f38f7e89969ee Mon Sep 17 00:00:00 2001 From: Alexander Molochko Date: Sat, 2 Nov 2019 23:57:12 +0300 Subject: [PATCH 2/2] Fix issues, segregate configurations --- .../errorhandling/ErrorHandlingApp.java | 1 + .../DLXCustomAmqpConfiguration.java | 22 ++++-- .../DLXParkingLotAmqpConfiguration.java | 22 ++++-- ...talExceptionStrategyAmqpConfiguration.java | 17 +++-- ...ListenerErrorHandlerAmqpConfiguration.java | 15 +++- ...va => RoutingKeyDLQAmqpConfiguration.java} | 24 +++++-- .../SimpleDLQAmqpConfiguration.java | 19 +++-- .../consumer/DLQCustomAmqpContainer.java | 35 +++++++++ .../consumer/MessagesConsumer.java | 72 ++++++++----------- .../consumer/ParkingLotDLQAmqpContainer.java | 43 +++++++++++ .../consumer/RoutingDLQAmqpContainer.java | 33 +++++++++ .../consumer/SimpleDLQAmqpContainer.java | 31 ++++++++ .../src/main/resources/application.properties | 4 +- 13 files changed, 267 insertions(+), 71 deletions(-) rename spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/{DLXDefaultAmqpConfiguration.java => RoutingKeyDLQAmqpConfiguration.java} (50%) create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java index c006684083..72a9ec8ad7 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java @@ -11,6 +11,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class ErrorHandlingApp { + @Autowired MessageProducer messageProducer; diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java index ba358cba52..708b08476d 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java @@ -1,19 +1,31 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "dlx-custom") public class DLXCustomAmqpConfiguration { public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) - .build(); + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java index c11914605b..bff325e657 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java @@ -1,11 +1,23 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "parking-lot-dlx") public class DLXParkingLotAmqpConfiguration { public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot"; @@ -29,8 +41,8 @@ public class DLXParkingLotAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) - .build(); + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java index 3447c70420..dcd76d7f72 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java @@ -1,12 +1,17 @@ package com.baeldung.springamqp.errorhandling.configuration; import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.ErrorHandler; @@ -15,11 +20,15 @@ import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpC import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; @Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "fatal-error-strategy") public class FatalExceptionStrategyAmqpConfiguration { @Bean - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, - SimpleRabbitListenerContainerFactoryConfigurer configurer) { + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setErrorHandler(errorHandler()); @@ -39,7 +48,7 @@ public class FatalExceptionStrategyAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .build(); + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java index 5c0c0afaaf..8990381da2 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java @@ -1,17 +1,26 @@ package com.baeldung.springamqp.errorhandling.configuration; import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.util.ErrorHandler; import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "listener-error") public class ListenerErrorHandlerAmqpConfiguration { @Bean @@ -31,7 +40,7 @@ public class ListenerErrorHandlerAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .build(); + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java similarity index 50% rename from spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java rename to spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java index 6e576109f7..defabed306 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java @@ -1,19 +1,31 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; -//@Configuration -public class DLXDefaultAmqpConfiguration { +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "routing-dlq") +public class RoutingKeyDLQAmqpConfiguration { public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) - .build(); + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java index 26da2d59ba..ea129986ca 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java @@ -1,9 +1,18 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "simple-dlq") public class SimpleDLQAmqpConfiguration { public static final String QUEUE_MESSAGES = "baeldung-messages-queue"; public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq"; @@ -12,9 +21,9 @@ public class SimpleDLQAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) - .build(); + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java new file mode 100644 index 0000000000..62907abee9 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java @@ -0,0 +1,35 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; +import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT; + +public class DLQCustomAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + public static final int MAX_RETRIES_COUNT = 2; + + public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryHeaders(Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) + retriesCnt = 1; + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Discarding message"); + return; + } + log.info("Retrying message for the {} time", retriesCnt); + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java index 3af9a84678..17b65c58da 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java @@ -7,15 +7,14 @@ import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.stereotype.Service; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; - -@Service +@Configuration public class MessagesConsumer { public static final String HEADER_X_RETRIES_COUNT = "x-retries-count"; - public static final int MAX_RETRIES_COUNT = 1; + private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class); private final RabbitTemplate rabbitTemplate; @@ -25,51 +24,40 @@ public class MessagesConsumer { } @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) - public void receiveMessage(final Message message) throws BusinessException { + public void receiveMessage(Message message) throws BusinessException { log.info("Received message: {}", message.toString()); throw new BusinessException(); } - //@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ) - public void processFailedMessages(final Message message) { - log.info("Received failed message: {}", message.toString()); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "simple-dlq") + public SimpleDLQAmqpContainer simpleAmqpContainer() { + return new SimpleDLQAmqpContainer(rabbitTemplate); } - //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) - public void processFailedMessagesRequeue(final Message failedMessage) { - log.info("Received failed message, requeueing: {}", failedMessage.toString()); - rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "routing-dlq") + public RoutingDLQAmqpContainer routingDLQAmqpContainer() { + return new RoutingDLQAmqpContainer(rabbitTemplate); } - //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) - public void processFailedMessagesRetryHeaders(final Message failedMessage) { - Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); - if (retriesCnt == null) retriesCnt = 0; - log.info("Retrying message for the {} time", retriesCnt); - if (retriesCnt > MAX_RETRIES_COUNT) { - log.info("Discarding message"); - return; - } - failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); - rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "dlx-custom") + public DLQCustomAmqpContainer dlqAmqpContainer() { + return new DLQCustomAmqpContainer(rabbitTemplate); } - // @RabbitListener(queues = QUEUE_MESSAGES_DLQ) - public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) { - Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); - if (retriesCnt == null) retriesCnt = 0; - log.info("Retrying message for the {} time", retriesCnt); - if (retriesCnt > MAX_RETRIES_COUNT) { - log.info("Sending message to the parking lot queue"); - rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); - return; - } - failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); - rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); - } - - //@RabbitListener(queues = QUEUE_PARKING_LOT) - public void processParkingLotQueue(final Message failedMessage) { - log.info("Received message in parking lot queue"); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "parking-lot-dlx") + public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() { + return new ParkingLotDLQAmqpContainer(rabbitTemplate); } } \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java new file mode 100644 index 0000000000..34dccd408a --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.QUEUE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; +import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT; + +public class ParkingLotDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(ParkingLotDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + public static final int MAX_RETRIES_COUNT = 2; + + public ParkingLotDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryWithParkingLot(Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) + retriesCnt = 1; + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Sending message to the parking lot queue"); + rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + return; + } + log.info("Retrying message for the {} time", retriesCnt); + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + @RabbitListener(queues = QUEUE_PARKING_LOT) + public void processParkingLotQueue(Message failedMessage) { + log.info("Received message in parking lot queue {}", failedMessage.toString()); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java new file mode 100644 index 0000000000..fedce76880 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java @@ -0,0 +1,33 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +public class RoutingDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(RoutingDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + + public RoutingDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) + public void receiveMessage(Message message) throws BusinessException { + log.info("Received message: {}", message.toString()); + throw new BusinessException(); + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java new file mode 100644 index 0000000000..6f9da7b587 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java @@ -0,0 +1,31 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +public class SimpleDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(SimpleDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + + public SimpleDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessages(Message message) { + log.info("Received failed message: {}", message.toString()); + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + log.info("Received failed message, requeueing: {}", failedMessage.getMessageProperties().getReceivedRoutingKey()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/resources/application.properties b/spring-amqp/src/main/resources/application.properties index 1353a0e447..c0c1cf1b47 100644 --- a/spring-amqp/src/main/resources/application.properties +++ b/spring-amqp/src/main/resources/application.properties @@ -1 +1,3 @@ -spring.rabbitmq.listener.simple.default-requeue-rejected=false \ No newline at end of file +spring.rabbitmq.listener.simple.default-requeue-rejected=false +spring.main.allow-bean-definition-overriding=true +amqp.configuration.current=parking-lot-dlx \ No newline at end of file