Dead letter queue configuration with RabbitMQ

· September 6, 2016

In this blog post I am looking into dead letter queue configuration with RabbitMQ. A message from a queue can be ‘dead-lettered’ when one the following things occur:

  • the message is rejected and requeuing is set to false
  • TTL for the message expires
  • the queue length limit is exceeded

In order to demonstrate with an example I chose the first case when a message is rejected. The producer is going to to send PaymentOrders as messages which are going to be processed by a consumer. A PaymentOrder message will be rejected when there are insufficient funds on the payer’s account.

The producer

The producer is a Spring Boot application which uses the Spring AMQP library to send PaymentOrder messages to RabbitMQ.

The producer’s API

The first part of the producer’s API is to define the name of the exchange, routing key, incoming and dead letter queue.

public class Constants {
    public static final String EXCHANGE_NAME = "payment-orders.exchange";
    public static final String ROUTING_KEY_NAME = "payment-orders";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}

The second part is to define the message format. We are using JSON in this example. The following JSON document shows how we model a PaymentOrder

{
  "from":"SA54 22PS JCLV 7LWT 7LHY EBLO",
  "to":"IT23 K545 5414 339G WLPI 2YF6 VBP",
  "amount":54.75
}

Note that it is good practice not to use custom serialization format like Java serialization of the payload since that means you need to have a java based consumer. Good practice is to format the payload in JSON. Every platform and/or language can parse JSON.

The producer configuration

We need to configure the AMQP infrastructure. The dead letter queue configuration is encapsulated in the incoming queue declaration.

There is a concept of dead letter exchange (DLX) which is a normal exchange of type direct, topic or fanout. When failure occurs during processing a message fetched from a queue, RabbitMQ checks if there is a dead letter exchange configured for that queue. If there is one configured via x-dead-letter-exchange argument then it routes the failed messages to it with the original routing key. This routing key can be overridden via the x-dead-letter-routing-key argument.

In this example we are using the default exchange (no-name) as the dead letter exchange and using the dead letter queue name as the new routing key. This will work since any queue is bound to the default exchange with the binding key equal to the queue name.

@Configuration
public class AmqpConfig {

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(Constants.EXCHANGE_NAME);
    }

    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

The builder API for queues and exchanges are pretty convenient and is available starting with the 1.6 version of the Spring AMQP library.

In the RabbitMQ management console the DLX and DLK labels indicate that the dead letter exchange and dead letter routing key arguments are set on the incoming queue.

RabbitMQ dead letter queue

The producer logic

The producer generates random PaymentOrder messages every 5 second which are sent to RabbitMQ for further processing. Spring’s AmqpTemplate is auto-configured and it can be wired into our component. Since the message format is JSON the Jackson2JsonMessageConverter is defined which will be associated automatically to the auto-configured AmqpTemplate.

@Component
public class Producer {

    private AmqpTemplate amqpTemplate;

    public Producer(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }

    @Scheduled(fixedDelay = 1000L)
    public void send() {

        PaymentOrder paymentOrder = new PaymentOrder(
                Iban.random().toFormattedString(),
                Iban.random().toFormattedString(),
                new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));

        amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
    }
}

The consumer

For this simple example the consumer is also a Spring Boot application, but in real applications the consumer and producer don’t necessary are on the same platform/language.

The consumer API

The first part of the consumers’s API is to specify to which queue it is connected to.

public class Constants {
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}

The second part is to adapt to the message format which was defined by the producer. Note that in this case both applications are Java based, so I could have created a jar file containing the PaymentOrder class file and share it with consumer and producer. However this is bad practice since it introduces tight coupling based on a shared library. Better approach is to use a bit of code duplication (PaymentOrder class in this case) and use a more loose coupling approach by agreeing on the message format.

public class PaymentOrder {

    String from;
    String to;
    BigDecimal amount;

    @JsonCreator
    public PaymentOrder(@JsonProperty("from") String from,
                        @JsonProperty("to") String to,
                        @JsonProperty("amount") BigDecimal amount) {
        this.from = from;
        this.to = to;
        this.amount = amount;
    }

    // getters and toString()
}

The consumer configuration

The consumer only cares about the queue from where the messages are fetched. The incoming queue must exist otherwise the consumer will not start. Note that the dead letter queue does not have to exist in order for the consumer to start, but it should exist by the time messages need to be ‘dead-lettered’. If it is missing then, the messages will be silently dropped.

@Configuration
public class AmqpConfig {

    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

By default the requeuing is enabled. In order to ‘dead-letter’ messages you need to set the following property to false.

spring:
  rabbitmq:
    listener:
      default-requeue-rejected: false

However if you would like to enable requeuing in some error scenarios is better to leave the requeuing enabled and leverage the AmqpRejectAndDontRequeueException which will send the basic.reject with requeue=false.

The consumer logic

Whenever a message is available on the incoming queue the process method will be invoked with the deserialized PaymentOrder instance. Here we simulate the message rejection by throwing an InsufficientFundsException which extends the AmqpRejectAndDontRequeueException exception.

@Component
public class Consumer {

    @RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)
    public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {
        if (new Random().nextBoolean()) {
            throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
        }
    }

}

The following image shows an example of a PaymentOrder message which was rejected and as a result ended up in the dead letter queue

RabbitMQ dead letter queue message

Sometimes it helps to automatically retry a failed operation in case it might succeed on a subsequent attempt. The Spring AMQP library provides support for this with the help of RetryTemplate which is part of the Spring Retry project (was pulled out from Spring Batch). Spring Boot makes it super easy to configure the RetryTemplate as shown the the following example.

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000

With the above configuration the retry functionality is enabled (disabled by default), there should be maximum 2 attempts to deliver the message, between the first and the second attempt should be 2 seconds, later with a multiplier of 1.5 to the previous retry interval and up to 5 seconds. Running the consumer you will see in the logs

2016-09-07 21:56:53.396  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.399  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.401  WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])

Conclusion

As you could see the dead letter queue configuration is pretty simple using RabbitMQ. The example used in this blog post is available on my GitHub account.

Twitter