diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java new file mode 100644 index 0000000000..c006684083 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/ErrorHandlingApp.java @@ -0,0 +1,25 @@ +package com.baeldung.springamqp.errorhandling; + +import com.baeldung.springamqp.errorhandling.producer.MessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@EnableScheduling +public class ErrorHandlingApp { + @Autowired + MessageProducer messageProducer; + + public static void main(String[] args) { + SpringApplication.run(ErrorHandlingApp.class, args); + } + + @EventListener(ApplicationReadyEvent.class) + public void doSomethingAfterStartup() { + messageProducer.sendMessage(); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java new file mode 100644 index 0000000000..ba358cba52 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXCustomAmqpConfiguration.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; + +//@Configuration +public class DLXCustomAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java new file mode 100644 index 0000000000..6e576109f7 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXDefaultAmqpConfiguration.java @@ -0,0 +1,43 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; + +//@Configuration +public class DLXDefaultAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java new file mode 100644 index 0000000000..c11914605b --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/DLXParkingLotAmqpConfiguration.java @@ -0,0 +1,60 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.*; + +//@Configuration +public class DLXParkingLotAmqpConfiguration { + public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx"; + public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot"; + public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot"; + + @Bean + FanoutExchange parkingLotExchange() { + return new FanoutExchange(EXCHANGE_PARKING_LOT); + } + + @Bean + Queue parkingLotQueue() { + return QueueBuilder.durable(QUEUE_PARKING_LOT).build(); + } + + @Bean + Binding parkingLotBinding() { + return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange()); + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES) + .build(); + } + + @Bean + FanoutExchange deadLetterExchange() { + return new FanoutExchange(DLX_EXCHANGE_MESSAGES); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + Binding deadLetterBinding() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java new file mode 100644 index 0000000000..3447c70420 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/FatalExceptionStrategyAmqpConfiguration.java @@ -0,0 +1,54 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import com.baeldung.springamqp.errorhandling.errorhandler.CustomFatalExceptionStrategy; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; +import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.ErrorHandler; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; + +@Configuration +public class FatalExceptionStrategyAmqpConfiguration { + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setErrorHandler(errorHandler()); + return factory; + } + + @Bean + public ErrorHandler errorHandler() { + return new ConditionalRejectingErrorHandler(customExceptionStrategy()); + } + + @Bean + FatalExceptionStrategy customExceptionStrategy() { + return new CustomFatalExceptionStrategy(); + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java new file mode 100644 index 0000000000..5c0c0afaaf --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/ListenerErrorHandlerAmqpConfiguration.java @@ -0,0 +1,46 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import com.baeldung.springamqp.errorhandling.errorhandler.CustomErrorHandler; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.util.ErrorHandler; + +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.QUEUE_MESSAGES; + +//@Configuration +public class ListenerErrorHandlerAmqpConfiguration { + + @Bean + public ErrorHandler errorHandler() { + return new CustomErrorHandler(); + } + + @Bean + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, + SimpleRabbitListenerContainerFactoryConfigurer configurer) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setErrorHandler(errorHandler()); + return factory; + } + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java new file mode 100644 index 0000000000..26da2d59ba --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/configuration/SimpleDLQAmqpConfiguration.java @@ -0,0 +1,34 @@ +package com.baeldung.springamqp.errorhandling.configuration; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; + +//@Configuration +public class SimpleDLQAmqpConfiguration { + public static final String QUEUE_MESSAGES = "baeldung-messages-queue"; + public static final String QUEUE_MESSAGES_DLQ = QUEUE_MESSAGES + ".dlq"; + public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange"; + + @Bean + Queue messagesQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES) + .withArgument("x-dead-letter-exchange", "") + .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ) + .build(); + } + + @Bean + Queue deadLetterQueue() { + return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build(); + } + + @Bean + DirectExchange messagesExchange() { + return new DirectExchange(EXCHANGE_MESSAGES); + } + + @Bean + Binding bindingMessages() { + return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java new file mode 100644 index 0000000000..3af9a84678 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/consumer/MessagesConsumer.java @@ -0,0 +1,75 @@ +package com.baeldung.springamqp.errorhandling.consumer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import com.baeldung.springamqp.errorhandling.errorhandler.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +import static com.baeldung.springamqp.errorhandling.configuration.DLXParkingLotAmqpConfiguration.EXCHANGE_PARKING_LOT; +import static com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES; + +@Service +public class MessagesConsumer { + public static final String HEADER_X_RETRIES_COUNT = "x-retries-count"; + public static final int MAX_RETRIES_COUNT = 1; + + private static final Logger log = LoggerFactory.getLogger(MessagesConsumer.class); + private final RabbitTemplate rabbitTemplate; + + public MessagesConsumer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + @RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES) + public void receiveMessage(final Message message) throws BusinessException { + log.info("Received message: {}", message.toString()); + throw new BusinessException(); + } + + //@RabbitListener(queues = DLXCustomAmqpConfiguration.QUEUE_MESSAGES_DLQ) + public void processFailedMessages(final Message message) { + log.info("Received failed message: {}", message.toString()); + } + + //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRequeue(final Message failedMessage) { + log.info("Received failed message, requeueing: {}", failedMessage.toString()); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + //@RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryHeaders(final Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) retriesCnt = 0; + log.info("Retrying message for the {} time", retriesCnt); + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Discarding message"); + return; + } + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + // @RabbitListener(queues = QUEUE_MESSAGES_DLQ) + public void processFailedMessagesRetryWithParkingLot(final Message failedMessage) { + Integer retriesCnt = (Integer) failedMessage.getMessageProperties().getHeaders().get(HEADER_X_RETRIES_COUNT); + if (retriesCnt == null) retriesCnt = 0; + log.info("Retrying message for the {} time", retriesCnt); + if (retriesCnt > MAX_RETRIES_COUNT) { + log.info("Sending message to the parking lot queue"); + rabbitTemplate.send(EXCHANGE_PARKING_LOT, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + return; + } + failedMessage.getMessageProperties().getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt); + rabbitTemplate.send(EXCHANGE_MESSAGES, failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage); + } + + //@RabbitListener(queues = QUEUE_PARKING_LOT) + public void processParkingLotQueue(final Message failedMessage) { + log.info("Received message in parking lot queue"); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java new file mode 100644 index 0000000000..faec96e2aa --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/BusinessException.java @@ -0,0 +1,4 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +public class BusinessException extends Exception { +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java new file mode 100644 index 0000000000..5c5e9cdf13 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomErrorHandler.java @@ -0,0 +1,14 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.util.ErrorHandler; + +public class CustomErrorHandler implements ErrorHandler { + + @Override + public void handleError(Throwable t) { + if (!(t.getCause() instanceof BusinessException)) { + throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t); + } + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java new file mode 100644 index 0000000000..e14be4e73c --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/errorhandler/CustomFatalExceptionStrategy.java @@ -0,0 +1,10 @@ +package com.baeldung.springamqp.errorhandling.errorhandler; + +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; + +public class CustomFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { + @Override + public boolean isFatal(Throwable t) { + return !(t.getCause() instanceof BusinessException); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java new file mode 100644 index 0000000000..c14fd5bf3b --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/errorhandling/producer/MessageProducer.java @@ -0,0 +1,24 @@ +package com.baeldung.springamqp.errorhandling.producer; + +import com.baeldung.springamqp.errorhandling.configuration.SimpleDLQAmqpConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +@Service +public class MessageProducer { + + private static final Logger log = LoggerFactory.getLogger(MessageProducer.class); + private int messageNumber = 0; + private final RabbitTemplate rabbitTemplate; + + public MessageProducer(final RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + } + + public void sendMessage() { + log.info("Sending message..."); + rabbitTemplate.convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES, SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++); + } +} \ No newline at end of file diff --git a/spring-amqp/src/main/resources/application.properties b/spring-amqp/src/main/resources/application.properties new file mode 100644 index 0000000000..1353a0e447 --- /dev/null +++ b/spring-amqp/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.rabbitmq.listener.simple.default-requeue-rejected=false \ No newline at end of file