RabbitMQ RabbitMQ 学习笔记 -- 11 RabbitMQ 死信队列

tammie · 2020-10-24 17:16:30 · 热度: 3

死信队列

DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。其实 DLX 就是一个普通的交换机,和一般的交换机没有任何区别。 当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,RabbitMQ 会自动发送)。

什么是死信呢?什么样的消息会变成死信呢?

被设置了TTL的消息在过期后会成为 Dead Letter。其实在 RabbitMQ 中,一共有三种消息的“死亡”形式:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue = false.
  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

死信处理过程

  • DLX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中的消息做相应的处理。

应用场景分析:

在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

定义业务(普通)队列的时候指定参数:

  • x-dead-letter-exchange: 用来设置死信后发送的交换机
  • x-dead-letter-routing-key:用来设置死信的 routingKey

死信队列设置

大致流程


web 控制台操作

1.声明一个死信交换机,

其实和普通的交换机都一样,同样作用于将消息推送至相应的队列,只不过它充当的角色是用来转发那些死信消息的交换机。

这里声明的为 Topic 交换机,名字为 — rabbit_dlx_exchange

2.声明死信交换机的处理队列,并绑定。

我们这里声明两个队列 ( queue_dlx_A,queue_dlx_B),并用两个不同的路由键 ( routing.a.#,routing.b.# ),绑定在死信交换机上,方便演示。

绑定状态

3.声明普通的工作交换机,一个正常工作的交换机,

这里我们声明 Topic 类型交换机,名字为 — rabbit_work_exchange

4.声明工作交换机对应的工作队列,并绑定在工作交换机上,设置路由键为 #

声明队列 work_queue

这里需要重点说明一下,设置 队列 work_queue 的队列参数

  • x-dead-letter-exchange: 用来设置死信后发送的交换机,这里指明的为 rabbit_dlx_exchange 交换机
  • x-dead-letter-routing-key:用来设置死信的 routingKey,如果没有设置 routingKey,则使用原来携带的路由键。这里使用 死信路由键 为 routing.a.key,是为了方便测试路由到死信交换机后,具体推送到哪个队列上。

5.在工作交换机 rabbit_work_exchange 中发送一条消息,消息传递到队列 work_queue 中

6,接下来在队列 work_queue 中拒绝该条消息

因为 web 控制台界面拒绝消息的方式只有两种,一个是 Nack message requeue true ,但是 requeue = true,消息会直接返回队列中,所以不行,这里选择的是 Reject requeue false。

7.结果

可以发现 work_queue 中的消息经过死信交换机被转发到了 queue_dlx_A 中,

代码方式

声明交换机,队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class Config {

    // 声明死信交换机 rabbit_dlx_exchange
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange("rabbit_dlx_exchange");
    }
    @Bean
    public Queue dlxQueueA() {
        return new Queue("queue_dlx_A");
    }
    @Bean
    public Queue dlxQueueB() {
        return new Queue("queue_dlx_B");
    }
    @Bean
    public Binding bindDlxAQueue() {
        String routingKey = "routing.a.#";
        return BindingBuilder.bind(dlxQueueA()).to(dlxExchange()).with(routingKey);
    }
    @Bean
    public Binding bindDlxBQueue() {
        String routingKey = "routing.b.#";
        return BindingBuilder.bind(dlxQueueB()).to(dlxExchange()).with(routingKey);
    }

    // 声明正常工作的交换机 rabbit_work_exchange
    @Bean
    public TopicExchange workExchange() {
        return new TopicExchange("rabbit_work_exchange");
    }

    @Bean
    public Queue workQueue() {
        String queueName = "work_queue";
        // 要指定的死信交换机
        String deadExchangeName = "rabbit_dlx_exchange";
        // 路由键  这里模拟交给死信交换机下的 A 队列中
        String deadRoutingKey = "routing.a.key";
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-dead-letter-exchange", deadExchangeName);
        args.put("x-dead-letter-routing-key", deadRoutingKey);
        return new Queue(queueName, true, false, false, args);
    }
    @Bean
    public Binding bindWorkQueue() {
        String routingKey = "#";
        return BindingBuilder.bind(workQueue()).to(workExchange()).with(routingKey);
    }

}

在正常工作队列 work_queue 的配置中注入了 Map 参数,用来配置

x-dead-letter-exchange 标识一个交换机

x-dead-letter-routing-key 来标识一个绑定键。

消费者

@Component
public class Consumer {

    @RabbitListener(queues = "work_queue")
    public void receiver(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        System.out.println("工作队列 work_queue 消费信息:" + msg);
        System.out.println("将消息拒绝放回队列中");
        channel.basicReject(deliveryTag,  false);
        // 两种方式都可以,但 requeue 必须为 false
//        channel.basicNack(deliveryTag, false, false);
    }

    @RabbitListener(queues = "queue_dlx_A")
    public void receiver2(String msg, Channel channel) throws IOException {
        System.out.println("死信交换机下 queue_dlx_A 队列接收到消息:" + msg);
        channel.basicAck(1, false);
    }
    @RabbitListener(queues = "queue_dlx_B")
    public void receiver3(String msg, Channel channel) throws IOException {
        System.out.println("死信交换机下 queue_dlx_B 队列接收到消息:" + msg);
        channel.basicAck(1, false);
    }
}

生产者

@Test
public void producer() {
    String exchange = "rabbit_work_exchange";
    String routingKey = "a";
    String msg = "模拟一条死信消息";
    rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}

输出

从队列 work_queue 接收到消息:模拟一条死信消息
消费者拒绝消息
死信交换机下 queue_dlx_A 队列接收到消息:模拟一条死信消息

死信消息是 RabbitMQ 为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当明白了这些之后,这些 Exchange 和 Queue 想怎样配合就能怎么配合。

比如从死信队列拉取消息,然后发送邮件、短信等做异步处理逻辑。

或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

为您推荐与 rabbitmq 相关的帖子:

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册