AY C# RabbitMQ 2019 微笔记6

栏目: ASP.NET · 发布时间: 5年前

内容简介:消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。这种场景: 延迟任务知识:消息的TTL和死信Exchange

消息超时处理,比如订单 超过24小时不处理,系统取消订单。这个需求如果用 数据库去轮 就不好了。

这种场景: 延迟任务

知识:消息的TTL和死信Exchange

RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。

rabbitmq-delayed-message-exchange这个插件也可以实现,看C#实现。

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。

超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。

所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。

这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

Dead Letter Exchanges

定义消息死亡

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

2. 上面的消息的TTL到了,消息过期了。

3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

就是个普通的Exchange,存放死掉的消息

两个控制台:

P端(以后生产者只说P了,消费者是 C端):

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQ.Product2
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };
            using (var connection = factory.CreateConnection())
            {
                while (Console.ReadLine() != null)
                {
                    using (var channel = connection.CreateModel())
                    {
                        Dictionary<string, object> dic = new Dictionary<string, object>();
                        dic.Add("x-expires", 10000);
                        dic.Add("x-message-ttl", 8000);//队列上消息过期时间,应小于队列过期时间  
                        dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由  
                        dic.Add("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey  
                        //创建一个名叫"deadQueue"的消息队列
                        channel.QueueDeclare(queue: "deadQueue",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: dic);

                        var message = "Hello World!";
                        var body = Encoding.UTF8.GetBytes(message);

                        //向该消息队列发送消息message
                        channel.BasicPublish(exchange: "",
                                                    routingKey: "deadQueue",
                                                    basicProperties: null,
                                                    body: body);
                        Console.WriteLine(" [x] Sent {0}", message);
                    }
                }
            }

            Console.ReadKey();
        }
    }
}

运行项目,按任意键,创建1个队列,刷新web管理

AY C# RabbitMQ 2019 微笔记6

然后过了8秒,队列就删除了。

这里队列有个TTL Exp DLX DLK属性,正好上面3个属性的设置 AY C# RabbitMQ 2019 微笔记6

我们也可以在管理端看到这些消息参数:

AY C# RabbitMQ 2019 微笔记6

单击的时候,就可以设置了,

在网上找了一些资料,下面代码可能 java 的设置,.net简单如上键值就行了

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于 redis 中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);

channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于 mongodb 中的固定集合,例如保存最新的100条消息, Feature=Lim

Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高( 数值更大的 )的消息先被消费,

Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

Master locator(x-queue-master-locator) 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。

(设置“x-queue-master-locator”参数。)

整理下:  1个 Exchange 可以多个 Queue   1个Queue可以多个Message  Message可以设置优先级,Queue可以设置最多多少个Message,多大字节的存储,也可以设置优先级。

上次持久化的问题理解:

设置消息持久化必须先设置队列持久化,要不然 队列不持久化消息持久化队列都不存在了,消息存在还有什么意义 。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。

上面代码,我们看到了,消息过期了,就会被扔到一个叫exchange-direct的 邮箱,真好,那么等他过期了,那么消费者 监听这个邮箱,是不是8秒过后就收到了消息,达到了 延迟的效果。跟RPC的设计异曲同工之妙。如果没有指定邮箱,8秒后消息就会被删除了。 10秒后 队列就会被删除。

AY C# RabbitMQ 2019 微笔记6

接下来,

消费者,就是创建个 被转发的 exchange的监听,

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AyTestMQ2
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Title = "AY 2019 2018-12-5";

            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    //string name = "deadQueue";
                    channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay");

                    //回调,当consumer收到消息后会执行该函数
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                                        {
                                            var body = ea.Body;
                                            var message = Encoding.UTF8.GetString(body);
                                            Console.WriteLine(ea.RoutingKey);
                                            Console.WriteLine(" [x] Received {0}", message);
                                        };

                    //Console.WriteLine("name:" + name);
                    //消费队列"hello"中的消息
                    channel.BasicConsume(queue: name,
                                         autoAck: true,
                                         consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }

            Console.ReadKey();

        }
    }
}

打开生产端,按下回车,进入循环,然后打开消费端,8秒后消费者就收到了消息。

如果8秒后,消息都被转发了,消费者打开,是收不到消息的。

====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========

推荐您阅读更多有关于“RabbitMQ,”的文章


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

AJAX企业级开发

AJAX企业级开发

Davec Johnson、Alexeic White、Andrec Charland / 张祖良、荣浩、高冰 / 人民邮电出版社 / 2008 / 49.00元

本书首先解释了AJAX 为什么在大规模的开发中能有如此广阔的应用前景,接着系统地介绍了当前重要的AJAX 技术和组件。你将看到把数据表、Web 窗体、图表、搜索和过滤连接在一起用于构建AJAX应用程序的框架开发的整个过程;在此基础上,本书给出了已经过证实的AJAX 架构模式,以及来源于实际的.NET 和Java AJAX 应用程序的案例研究。一起来看看 《AJAX企业级开发》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换