From 6436440cb68e227cf42fee14db7f38f7e89969ee Mon Sep 17 00:00:00 2001 From: Alexander Molochko Date: Sat, 2 Nov 2019 23:57:12 +0300 Subject: [PATCH] Fix issues, segregate configurations --- .../errorhandling/ErrorHandlingApp.java | 1 + .../DLXCustomAmqpConfiguration.java | 22 ++++-- .../DLXParkingLotAmqpConfiguration.java | 22 ++++-- ...talExceptionStrategyAmqpConfiguration.java | 17 +++-- ...ListenerErrorHandlerAmqpConfiguration.java | 15 +++- ...va => RoutingKeyDLQAmqpConfiguration.java} | 24 +++++-- .../SimpleDLQAmqpConfiguration.java | 19 +++-- .../consumer/DLQCustomAmqpContainer.java | 35 +++++++++ .../consumer/MessagesConsumer.java | 72 ++++++++----------- .../consumer/ParkingLotDLQAmqpContainer.java | 43 +++++++++++ .../consumer/RoutingDLQAmqpContainer.java | 33 +++++++++ .../consumer/SimpleDLQAmqpContainer.java | 31 ++++++++ .../src/main/resources/application.properties | 4 +- 13 files changed, 267 insertions(+), 71 deletions(-) rename spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/{DLXDefaultAmqpConfiguration.java => RoutingKeyDLQAmqpConfiguration.java} (50%) create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java index c006684083..72a9ec8ad7 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java @@ -11,6 +11,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class ErrorHandlingApp { + @Autowired MessageProducer messageProducer; diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java index ba358cba52..708b08476d 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java @@ -1,19 +1,31 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "dlx-custom") public class DLXCustomAmqpConfiguration { public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) - .build(); + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java index c11914605b..bff325e657 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java @@ -1,11 +1,23 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "parking-lot-dlx") public class DLXParkingLotAmqpConfiguration { public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot"; @@ -29,8 +41,8 @@ public class DLXParkingLotAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) - .build(); + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java index 3447c70420..dcd76d7f72 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java @@ -1,12 +1,17 @@ package com.baeldung.springamqp.errorhandling.configuration; import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.ErrorHandler; @@ -15,11 +20,15 @@ import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpC import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; @Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "fatal-error-strategy") public class FatalExceptionStrategyAmqpConfiguration { @Bean - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, - SimpleRabbitListenerContainerFactoryConfigurer configurer) { + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setErrorHandler(errorHandler()); @@ -39,7 +48,7 @@ public class FatalExceptionStrategyAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .build(); + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java index 5c0c0afaaf..8990381da2 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java @@ -1,17 +1,26 @@ package com.baeldung.springamqp.errorhandling.configuration; import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.util.ErrorHandler; import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "listener-error") public class ListenerErrorHandlerAmqpConfiguration { @Bean @@ -31,7 +40,7 @@ public class ListenerErrorHandlerAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .build(); + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java similarity index 50% rename from spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java rename to spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java index 6e576109f7..defabed306 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java @@ -1,19 +1,31 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; -//@Configuration -public class DLXDefaultAmqpConfiguration { +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "routing-dlq") +public class RoutingKeyDLQAmqpConfiguration { public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) - .build(); + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java index 26da2d59ba..ea129986ca 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java @@ -1,9 +1,18 @@ package com.baeldung.springamqp.errorhandling.configuration; -import org.springframework.amqp.core.*; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -//@Configuration +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "simple-dlq") public class SimpleDLQAmqpConfiguration { public static final String QUEUE_MESSAGES = "baeldung-messages-queue"; public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq"; @@ -12,9 +21,9 @@ public class SimpleDLQAmqpConfiguration { @Bean Queue messagesQueue() { return QueueBuilder.durable(QUEUE_MESSAGES) - .withArgument("x-dead-letter-exchange", "") - .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) - .build(); + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) + .build(); } @Bean diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java new file mode 100644 index 0000000000..62907abee9 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java @@ -0,0 +1,35 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; +import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT; + +public class DLQCustomAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + public static final int MAX_RETRIES_COUNT = 2; + + public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryHeaders(Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) + retriesCnt = 1; + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Discarding message"); + return; + } + log.info("Retrying message for the {} time", retriesCnt); + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java index 3af9a84678..17b65c58da 100644 --- a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java @@ -7,15 +7,14 @@ import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.stereotype.Service; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; -import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; - -@Service +@Configuration public class MessagesConsumer { public static final String HEADER_X_RETRIES_COUNT = "x-retries-count"; - public static final int MAX_RETRIES_COUNT = 1; + private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class); private final RabbitTemplate rabbitTemplate; @@ -25,51 +24,40 @@ public class MessagesConsumer { } @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) - public void receiveMessage(final Message message) throws BusinessException { + public void receiveMessage(Message message) throws BusinessException { log.info("Received message: {}", message.toString()); throw new BusinessException(); } - //@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ) - public void processFailedMessages(final Message message) { - log.info("Received failed message: {}", message.toString()); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "simple-dlq") + public SimpleDLQAmqpContainer simpleAmqpContainer() { + return new SimpleDLQAmqpContainer(rabbitTemplate); } - //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) - public void processFailedMessagesRequeue(final Message failedMessage) { - log.info("Received failed message, requeueing: {}", failedMessage.toString()); - rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "routing-dlq") + public RoutingDLQAmqpContainer routingDLQAmqpContainer() { + return new RoutingDLQAmqpContainer(rabbitTemplate); } - //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) - public void processFailedMessagesRetryHeaders(final Message failedMessage) { - Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); - if (retriesCnt == null) retriesCnt = 0; - log.info("Retrying message for the {} time", retriesCnt); - if (retriesCnt > MAX_RETRIES_COUNT) { - log.info("Discarding message"); - return; - } - failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); - rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "dlx-custom") + public DLQCustomAmqpContainer dlqAmqpContainer() { + return new DLQCustomAmqpContainer(rabbitTemplate); } - // @RabbitListener(queues = QUEUE_MESSAGES_DLQ) - public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) { - Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); - if (retriesCnt == null) retriesCnt = 0; - log.info("Retrying message for the {} time", retriesCnt); - if (retriesCnt > MAX_RETRIES_COUNT) { - log.info("Sending message to the parking lot queue"); - rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); - return; - } - failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); - rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); - } - - //@RabbitListener(queues = QUEUE_PARKING_LOT) - public void processParkingLotQueue(final Message failedMessage) { - log.info("Received message in parking lot queue"); + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "parking-lot-dlx") + public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() { + return new ParkingLotDLQAmqpContainer(rabbitTemplate); } } \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java new file mode 100644 index 0000000000..34dccd408a --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.QUEUE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; +import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT; + +public class ParkingLotDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(ParkingLotDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + public static final int MAX_RETRIES_COUNT = 2; + + public ParkingLotDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryWithParkingLot(Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) + retriesCnt = 1; + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Sending message to the parking lot queue"); + rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + return; + } + log.info("Retrying message for the {} time", retriesCnt); + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + @RabbitListener(queues = QUEUE_PARKING_LOT) + public void processParkingLotQueue(Message failedMessage) { + log.info("Received message in parking lot queue {}", failedMessage.toString()); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java new file mode 100644 index 0000000000..fedce76880 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java @@ -0,0 +1,33 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +public class RoutingDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(RoutingDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + + public RoutingDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) + public void receiveMessage(Message message) throws BusinessException { + log.info("Received message: {}", message.toString()); + throw new BusinessException(); + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java new file mode 100644 index 0000000000..6f9da7b587 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java @@ -0,0 +1,31 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +public class SimpleDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(SimpleDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + + public SimpleDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessages(Message message) { + log.info("Received failed message: {}", message.toString()); + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + log.info("Received failed message, requeueing: {}", failedMessage.getMessageProperties().getReceivedRoutingKey()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/resources/application.properties b/spring-amqp/src/main/resources/application.properties index 1353a0e447..c0c1cf1b47 100644 --- a/spring-amqp/src/main/resources/application.properties +++ b/spring-amqp/src/main/resources/application.properties @@ -1 +1,3 @@ -spring.rabbitmq.listener.simple.default-requeue-rejected=false \ No newline at end of file +spring.rabbitmq.listener.simple.default-requeue-rejected=false +spring.main.allow-bean-definition-overriding=true +amqp.configuration.current=parking-lot-dlx \ No newline at end of file