RabbitMQ消息可靠性分析和应用

栏目: 后端 · 发布时间: 5年前

内容简介:RabbitMQ使用一些机制来保证可靠性,如持久化、消费确认及发布确认等。先看以下这个图:

RabbitMQ流程简介(带Exchange)

RabbitMQ使用一些机制来保证可靠性,如持久化、消费确认及发布确认等。

先看以下这个图:

RabbitMQ消息可靠性分析和应用

P为生产者,X为中转站(Exchange),红色部分为消息队列,C1、C2为消费者。

整个流程分成三部分:第一,生产者生产消息,发送到中转站;第二,中转站按定义的规则转发消息到消息队列;第三,消费者从消息队列获取消息进行消费(处理)。

RabbitMQ消息可靠性分析和应用

应用代码均使用C#客户端代码实现。

一、发布确认

生产者生产消息,发送到中转站的过程中,可能会因为网络丢包、网络故障等问题造成消息丢失。为了确保生产者发送的消息不会丢失,RabbitMQ提供了发布确认(Publisher Confirms)机制,从而提高消息的可靠性(注意:发布确认机制不能和事务机制一起使用)。

单条消息发布确认:

1

2

3

4

5

6

7

8

9

10

channel.ConfirmSelect(); //发布确认机制

string message =  "msg" ;

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(

     exchange:  "MarkTopicChange" ,

     routingKey:  "MarkRouteKey.one" ,

     basicProperties:  null ,

     body: body

     );

bool isPublished = channel.WaitForConfirms(); //通道(channel)里消息发送成功返回true

使用channel.ConfirmSelect,一旦信道进入确认模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始)。消息被投递到所有匹配的队列之后,RabbitMQ就会发送(Basic.Ack)给生产者(包含消息的唯一ID),生产者从而知道消息发送成功。

多条消息发布确认:

1

2

3

4

5

6

7

8

9

10

11

12

13

channel.ConfirmSelect(); //发布确认机制

foreach ( var itemMsg  in lstMsg)

{

   byte [] sendBytes = Encoding.UTF8.GetBytes(itemMsg);

   //发布消息

   channel.BasicPublish(

     exchange:  "MarkTopicChange" ,

     routingKey:  "MarkRouteKey.one" ,

     basicProperties:  null ,

     body: sendBytes

     );

}

bool isAllPublished = channel.WaitForConfirms(); //通道(channel)里所有消息均发送才返回true

注意:多消息发布确认机制情况下,倘若要发送100条消息,发送90条后,突然网络故障,后面的消息发送失败了,那么isAllPublished返回的是false,而前面90条消息已经发送到消息队列了。我们还不知道哪些消息是发送失败的,所以很多条消息发布确认,建议分几次发送或多通道发送。

此外,需要确保在中转站(Exchange)的消息可以顺利到达消息队列。

(1)首先需要定义匹配的Exchange和Queue,根据Exchange的类型和routingKey确定转发的关系。

(2)设置BasicPublish方法中mandatory参数为true,然后监听Exchange中没有匹配的队列的消息,然后进行相操作。

(3)确保消息队列有足够内存存储消息。

RabbitMQ默认配置vm_memory_high_watermark为0.4。意思是控制消息占40%内存左右。vm_memory_high_watermark_paging_ratio为0.5,当消息占用内存超过50%,RabbitMQ会把消息转移到磁盘上以释放内存。当磁盘剩余空间小于阀值disk_free_limit(默认为50M),所有生产者阻塞,避免充满磁盘,导致所有的写操作失败。

RabbitMQ配置文件一般在%APPDATA%\RabbitMQ\rabbitmq.config.

%APPDATA% 一般为 C:\Users\%USERNAME%\AppData\Roaming(Windows环境)

二、持久化

消息存放到消息队列后,在不配置消息持久化的情况下,若服务器重启、关闭或宕机等,消息都会丢失。配置持久化可以有效提高消息的可靠性。持久化需要同时配置消息持久化和队列持久化。单配置消息持久化,队列消失了,消息没有地方存放;单配置队列持久化,队列还在,消息没了。

队列持久化在定义队列时候配置

1

2

3

4

5

6

7

8

//定义队列

channel.QueueDeclare(

   queue:  "Mark_Queue" //队列名称

   durable:  true //队列磁盘持久化                  

   exclusive:  false , //是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除

   autoDelete:  false , //是否自动删除,一般设成false

   arguments:  null

   );

消息持久化在发布消息时候配置

1

2

3

4

5

6

7

8

9

10

//消息持久化,把DeliveryMode设成2

IBasicProperties properties = channel.CreateBasicProperties();

properties.DeliveryMode = 2;

   //发布消息

   channel.BasicPublish(

     exchange:  "MarkTopicChange" ,

     routingKey:  "MarkRouteKey.one" ,

     basicProperties: properties,

     body: sendBytes

     );

如何配置了事务机制或发布确认(publisher confirm)机制,服务端的返回Basic.Ack是在消息落盘之后执行的,进一步的提高了消息的可靠性。

为了防止磁盘损坏带来的消息丢失,可以配置镜像队列,这里不作介绍。

三、消费确认

为了确保消息被消费者消费,RabbitMQ提供消费确认模式(consumer Acknowledgements)。自动确认模式,当消费者 成功接收到 消息后,自动通知RabbitMQ,把消息队列中相应消息删除。这很大程度上满足不了我们,假如消费者接收到消息后,服务器宕机,消息还没处理完成,这样就会造成消息丢失。手动确认模式,当消费者 成功处理完 消息后,手动发消息通知RabbitMQ,把消息队列中相应消息删除。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

consumer.Received += (model, ea) =>

{

   var body = ea.Body;

   var message = Encoding.UTF8.GetString(body);

   var routingKey = ea.RoutingKey;

   Console.WriteLine( " [x] Received '{0}':'{1}'" ,

            routingKey,

            message);

//确认该消息已被消费,发删除消息给RabbitMQ,把消息队列中的消息删除

channel.BasicAck(ea.DeliveryTag,  false );

//消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者

//channel.BasicReject(ea.DeliveryTag, true);

//消费消息失败,拒绝多条消息,重回队列,让它们可以继续发送到其他消费者

//channel.BasicNack(ea.DeliveryTag, true, true);

};

//手动确认消息,把autoAck设成false

channel.BasicConsume(queue:  "Mark_Queue" ,

            autoAck:  false ,

            consumer: consumer);

这里值得注意的是,消息处理完成后,一定要把处理完成的消息发送到RabbitMQ(channel.BasicAck(ea.DeliveryTag, false)),不然RabbitMQ会一直等待,从而造成内存泄露。若处理消息过程中发生异常,可以使用channel.BasicReject(ea.DeliveryTag, true)来拒绝此消息,让它重回队列。若RabbitMQ收不到消费者任何确认消息的信号(包括确认信号,拒绝信号灯),直到此消费者断开连接,消息才能重回队列,继续发送到其他消费者。

提醒一下,假如消费者消费消息的方法不支持并发(取决于需求),可以限制消费者每次只接收一条消息。

1

channel.BasicQos(0, 1,  false );


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

响应式Web设计实践

响应式Web设计实践

[美] Tim Kadlec / 侯鸿儒 / 人民邮电出版社 / 2013-3-1 / 55.00元

随着各种各样的移动设备不断地涌现到使用者面前,Web设计的适应性已经成为设计师们所面临的最为艰巨的挑战。你设计出的网站不仅要在桌面计算机的大尺寸屏幕上可以为用户提供友好的UI和用户体验,同时在小尺寸屏幕上也应该可以提供一致的用户体验,并可以让用户能够在桌面大屏幕上和移动小屏幕上平滑切换,同时没有任何的不适应感觉。 本书作者是一位出色的开发者,在本书中,他将诸多技术和设计理念杂糅在一起,再辅以......一起来看看 《响应式Web设计实践》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

随机密码生成器
随机密码生成器

多种字符组合密码