diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java new file mode 100644 index 0000000000..72a9ec8ad7 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java @@ -0,0 +1,26 @@ +package com.baeldung.springamqp.errorhandling; + +import com.baeldung.springamqp.errorhandling.producer.MessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@EnableScheduling +public class ErrorHandlingApp { + + @Autowired + MessageProducer messageProducer; + + public static void main(String[] args) { + SpringApplication.run(ErrorHandlingApp.class, args); + } + + @EventListener(ApplicationReadyEvent.class) + public void doSomethingAfterStartup() { + messageProducer.sendMessage(); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java new file mode 100644 index 0000000000..708b08476d --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java @@ -0,0 +1,55 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "dlx-custom") +public class DLXCustomAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java new file mode 100644 index 0000000000..bff325e657 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java @@ -0,0 +1,72 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "parking-lot-dlx") +public class DLXParkingLotAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot"; + public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot"; + + @Bean + FanoutExchange parkingLotExchange() { + return new FanoutExchange(EXCHANGE_PARKING_LOT); + } + + @Bean + Queue parkingLotQueue() { + return QueueBuilder.durable(QUEUE_PARKING_LOT).build(); + } + + @Bean + Binding parkingLotBinding() { + return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange()); + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java new file mode 100644 index 0000000000..dcd76d7f72 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java @@ -0,0 +1,63 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; +import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ErrorHandler; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; + +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "fatal-error-strategy") +public class FatalExceptionStrategyAmqpConfiguration { + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setErrorHandler(errorHandler()); + return factory; + } + + @Bean + public ErrorHandler errorHandler() { + return new ConditionalRejectingErrorHandler(customExceptionStrategy()); + } + + @Bean + FatalExceptionStrategy customExceptionStrategy() { + return new CustomFatalExceptionStrategy(); + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java new file mode 100644 index 0000000000..8990381da2 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java @@ -0,0 +1,55 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ErrorHandler; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; + +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "listener-error") +public class ListenerErrorHandlerAmqpConfiguration { + + @Bean + public ErrorHandler errorHandler() { + return new CustomErrorHandler(); + } + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setErrorHandler(errorHandler()); + return factory; + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java new file mode 100644 index 0000000000..defabed306 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/RoutingKeyDLQAmqpConfiguration.java @@ -0,0 +1,55 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "routing-dlq") +public class RoutingKeyDLQAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java new file mode 100644 index 0000000000..ea129986ca --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "simple-dlq") +public class SimpleDLQAmqpConfiguration { + public static final String QUEUE_MESSAGES = "baeldung-messages-queue"; + public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq"; + public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) + .build(); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java new file mode 100644 index 0000000000..62907abee9 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/DLQCustomAmqpContainer.java @@ -0,0 +1,35 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; +import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT; + +public class DLQCustomAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(DLQCustomAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + public static final int MAX_RETRIES_COUNT = 2; + + public DLQCustomAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryHeaders(Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) + retriesCnt = 1; + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Discarding message"); + return; + } + log.info("Retrying message for the {} time", retriesCnt); + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java new file mode 100644 index 0000000000..17b65c58da --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java @@ -0,0 +1,63 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MessagesConsumer { + public static final String HEADER_X_RETRIES_COUNT = "x-retries-count"; + + + private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class); + private final RabbitTemplate rabbitTemplate; + + public MessagesConsumer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) + public void receiveMessage(Message message) throws BusinessException { + log.info("Received message: {}", message.toString()); + throw new BusinessException(); + } + + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "simple-dlq") + public SimpleDLQAmqpContainer simpleAmqpContainer() { + return new SimpleDLQAmqpContainer(rabbitTemplate); + } + + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "routing-dlq") + public RoutingDLQAmqpContainer routingDLQAmqpContainer() { + return new RoutingDLQAmqpContainer(rabbitTemplate); + } + + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "dlx-custom") + public DLQCustomAmqpContainer dlqAmqpContainer() { + return new DLQCustomAmqpContainer(rabbitTemplate); + } + + @Bean + @ConditionalOnProperty( + value = "amqp.configuration.current", + havingValue = "parking-lot-dlx") + public ParkingLotDLQAmqpContainer parkingLotDLQAmqpContainer() { + return new ParkingLotDLQAmqpContainer(rabbitTemplate); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java new file mode 100644 index 0000000000..34dccd408a --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/ParkingLotDLQAmqpContainer.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.QUEUE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; +import static com.baeldung.springamqp.errorhandling.consumer.MessagesConsumer.HEADER_X_RETRIES_COUNT; + +public class ParkingLotDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(ParkingLotDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + public static final int MAX_RETRIES_COUNT = 2; + + public ParkingLotDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryWithParkingLot(Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) + retriesCnt = 1; + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Sending message to the parking lot queue"); + rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + return; + } + log.info("Retrying message for the {} time", retriesCnt); + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + @RabbitListener(queues = QUEUE_PARKING_LOT) + public void processParkingLotQueue(Message failedMessage) { + log.info("Received message in parking lot queue {}", failedMessage.toString()); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java new file mode 100644 index 0000000000..fedce76880 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/RoutingDLQAmqpContainer.java @@ -0,0 +1,33 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +public class RoutingDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(RoutingDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + + public RoutingDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) + public void receiveMessage(Message message) throws BusinessException { + log.info("Received message: {}", message.toString()); + throw new BusinessException(); + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java new file mode 100644 index 0000000000..6f9da7b587 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/SimpleDLQAmqpContainer.java @@ -0,0 +1,31 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES_DLQ; + +public class SimpleDLQAmqpContainer { + private static final Logger log = LoggerFactory.getLogger(SimpleDLQAmqpContainer.class); + private final RabbitTemplate rabbitTemplate; + + public SimpleDLQAmqpContainer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessages(Message message) { + log.info("Received failed message: {}", message.toString()); + } + + @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + log.info("Received failed message, requeueing: {}", failedMessage.getMessageProperties().getReceivedRoutingKey()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java new file mode 100644 index 0000000000..faec96e2aa --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java @@ -0,0 +1,4 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +public class BusinessException extends Exception { +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java new file mode 100644 index 0000000000..5c5e9cdf13 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java @@ -0,0 +1,14 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.util.ErrorHandler; + +public class CustomErrorHandler implements ErrorHandler { + + @Override + public void handleError(Throwable t) { + if (!(t.getCause() instanceof BusinessException)) { + throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t); + } + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java new file mode 100644 index 0000000000..e14be4e73c --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java @@ -0,0 +1,10 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; + +public class CustomFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { + @Override + public boolean isFatal(Throwable t) { + return !(t.getCause() instanceof BusinessException); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java new file mode 100644 index 0000000000..c14fd5bf3b --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java @@ -0,0 +1,24 @@ +package com.baeldung.springamqp.errorhandling.producer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +@Service +public class MessageProducer { + + private static final Logger log = LoggerFactory.getLogger(MessageProducer.class); + private int messageNumber = 0; + private final RabbitTemplate rabbitTemplate; + + public MessageProducer(final RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + public void sendMessage() { + log.info("Sending message..."); + rabbitTemplate.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES, SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/resources/application.properties b/spring-amqp/src/main/resources/application.properties new file mode 100644 index 0000000000..c0c1cf1b47 --- /dev/null +++ b/spring-amqp/src/main/resources/application.properties @@ -0,0 +1,3 @@ +spring.rabbitmq.listener.simple.default-requeue-rejected=false +spring.main.allow-bean-definition-overriding=true +amqp.configuration.current=parking-lot-dlx \ No newline at end of file