Kafka 消息队列消息丢失了,怎么办?

alejandro · 2021-05-18 09:39:42 · 热度: 29

消息队列会丢失消息吗?

答案是肯定的,所以对于业务严谨的数据,我们要确保其在消息队列中的安全,不能丢。

要想解决不丢的问题,首先要弄清楚 消息是怎么丢的呢?

丢消息的关键点有3个:

  • Producer 发送消息的过程
  • 消息队列的消息存储
  • Consumer 消费消息的过程

下面挨个看看都是怎么丢的,以及解决方案。

会以 RabbitMQ 和 Kafka 这两个常用的消息系统来说明。

1. Producer 弄丢消息

Producer 向 MQ 发消息,很简单,发过去就完事儿了。

但是,在发送图中是存在危险的,例如网络问题等等,导致 MQ 没有正常收到。

怎么解决呢? 思路很简单,让 MQ 发一个 接受确认声明(ack) 就行了,就像快递需要签收一样。

例如 RabbitMQ,有两种方式可以确保发送消息的安全。

1)事务消息

Producer 发送消息之前,先开启事务,然后再发送。

如果 RabbitMQ 没有正常收到消息,Producer 会收到异常信息,回滚事务。

如果正常接收了,Producer 就提交事务。

很可靠,但效率低,因为这个事务模式是同步的,会产生阻塞。

2)confirm 确认模式

Producer 开启 confirm 模式,发送消息的时候,RabbitMQ 会给这个消息分配一个唯一的 ID。

成功写入队列之后,RabbitMQ 会向 Producer 发送一个 ack 消息,说明此 ID 的消息已经成功发送。

confirm 模式还有一个回调机制,Producer 可以准备一个失败的接口,供 RabbitMQ 在接收失败时调用。

Producer 收到失败通知,或者超时了,可以执行相应的处理逻辑,例如重发。

confirm 模式是异步的,比事务消息更高效,使用更为广泛。

Kafka 也是使用的 ack 方式,使用方式很简单,只要配置:

ack=all

确保 Kafka 在完全接收成功后才发送确认通知,这样就一定不会发丢了。

2. MQ 在存储期间弄丢消息

MQ 成功接收消息之后,需要保存起来,等着 Consumer 消费。

在这个保存期间,也可以能丢失消息。

这通常是由 MQ 故障引起的。

RabbitMQ 想要保障消息不丢,需要开启持久化,消息就会写入磁盘。

即使 RabbitMQ 宕机了,只要磁盘没事儿,重启之后还可以重新把消息加载进来。

如果想进一步的保障消息安全,就需要配置 RabbitMQ 的镜像集群了,来确保高可用。

Kafka 是天然的分布式系统,Topic 分为多个 Partition,每个 Partition 又有多个副本。

Partition 的多个副本,分为 Leader 和 Follower。

Leader 负责处理消息的读写,Follower 负责备份。

前面说的 Kafka 配置 ack=all,就是告诉Kafka,Leader 和所有 Follower 全都接收到了,才算发送 ack 确认,只有 Leader 自己接收成功是不算的。

否则的话,如果 Leader 接收完成就告诉 Producer OK 了,在 Leader 同步给 Follower 之前,Leader 宕机了,Kafka 会从 Follower 中选举出新的 Leader。那么,老 Leader 在临终前没有同步的消息就丢失了。

为了保障消息的安全,这 4 个参数要设置好:

replication.factor

用于指定 Partition 副本的数量,必须大于 1,就是至少要有 2 个副本,一个 Leader 一个 Follower。

min.insync.replicas

用于指定几个副本成功写入才提交消息,只有提交之后的消息才能被 Consumer 消费。

此值至少大于 1,这样就保障 Leader 之外至少有一个副本同步到了这条消息,不怕 Leader 宕掉了。

acks=all

用于指定几个副本接收到消息之后向 Producer 发送 ack。例如值为 1,表示 Leader 收到就可以了,“all” 表示 “所有副本”,也可以写 “-1”,等同于 “all”。

retries=999

用于指定 Producer 发送失败后的重试次数,可以设为一个很大的数,表示失败了就重试,提升发送成功几率。

3. Consumer 弄丢消息

例如 Consumer 成功接收到了消息 “123”,MQ 就会移除这条消息。

但在 Consumer 处理完这条消息之前,宕机了。

Consumer 重启之后继续从 MQ 拿消息,这次拿到的就是下一条消息 “124”,那么 “123” 就丢了。

所以,Consumer 只是接收到消息是不够的,成功处理完成才行。

这就和 MQ 的消费确认机制有关了。

RabbitMQ 默认是 Consumer 成功接收消息之后就发送 ack 确认,RabbitMQ 就认为消费成功了。

关闭自动的 Consumer ack 就行,改为手动发送确认通知。

Kafka 的 Consumer 发送的不是 ack 确认,而是 offset,告诉 Kafka 已经消费到哪个位置了。

默认是 Consumer 接收后自动提交 offset,所以也需要关闭,改为手动提交。

小结一下,要想消息不丢,需要发消息的时候确认发送成功了,MQ 存储的时候要是高可靠的,Consumer 消费的时候,不能接收之后就确认,真正处理完成才行。

猜你喜欢:
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册