内容简介:TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject
基于队列和基于消息的TTL
TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。
死信交换机DLX
队列中的消息在以下三种情况下会变成死信 (1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false; (2)消息的过期时间到期了; (3)队列长度限制超过了。 当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.
@Bean public Queue lindQueue() { return QueueBuilder.durable(LIND_QUEUE) .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机 .withArgument("x-message-ttl", makeCallExpire) .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey .build(); }
实现的过程
graph TD publisher-->DLX DLX-->dead.queue dead.queue-->正常queue 正常queue-->subscriber
完整的代码
@Component public class AmqpConfig { /** * 主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列, * 正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息. */ public static final String LIND_EXCHANGE = "lind.exchange"; public static final String LIND_DL_EXCHANGE = "lind.dl.exchange"; public static final String LIND_QUEUE = "lind.queue"; public static final String LIND_DEAD_QUEUE = "lind.queue.dead"; public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange"; /** * 单位为微秒. */ @Value("${tq.makecall.expire:60000}") private long makeCallExpire; /** * 创建普通交换机. */ @Bean public TopicExchange lindExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true) .build(); } /** * 创建死信交换机. */ @Bean public TopicExchange lindExchangeDl() { return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true) .build(); } /** * 创建普通队列. */ @Bean public Queue lindQueue() { return QueueBuilder.durable(LIND_QUEUE) .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//设置死信交换机 .withArgument("x-message-ttl", makeCallExpire) .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//设置死信routingKey .build(); } /** * 创建死信队列. */ @Bean public Queue lindDelayQueue() { return QueueBuilder.durable(LIND_DEAD_QUEUE).build(); } /** * 绑定死信队列. */ @Bean public Binding bindDeadBuilders() { return BindingBuilder.bind(lindDelayQueue()) .to(lindExchangeDl()) .with(LIND_DEAD_QUEUE); } /** * 绑定普通队列. * * @return */ @Bean public Binding bindBuilders() { return BindingBuilder.bind(lindQueue()) .to(lindExchange()) .with(LIND_QUEUE); } /** * 广播交换机. * * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(LIND_FANOUT_EXCHANGE); } } //----------------- @Component public class Publisher { @Autowired private RabbitTemplate rabbitTemplate; public void publish(String message) { try { rabbitTemplate .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE, message); } catch (Exception e) { e.printStackTrace(); } } } //----------------- @Component @Slf4j public class Subscriber { @RabbitListener(queues = AmqpConfig.LIND_QUEUE) public void customerSign(String data) { try { log.info("从队列拿到数据 :{}", data); } catch (Exception ex) { e.printStackTrace(); } } }
以上所述就是小编给大家介绍的《rabbitmq实现延时队列(死信队列)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- SpringBoot集成RabbitMQ(死信队列)
- rabbitmq死信队列详解与使用
- 这些 MQ 概念你都懂吗:死信队列、重试队列、消息回溯等
- 利用 RabbitMQ 死信队列和 TTL 实现定时任务
- 消息队列(三)常见消息队列介绍
- 消息队列探秘 – RabbitMQ 消息队列介绍
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Domain-Driven Design
Eric Evans / Addison-Wesley Professional / 2003-8-30 / USD 74.99
"Eric Evans has written a fantastic book on how you can make the design of your software match your mental model of the problem domain you are addressing. "His book is very compatible with XP. It is n......一起来看看 《Domain-Driven Design》 这本书的介绍吧!