消息队列(四)如何处理消息丢失的问题?

  • 作者: 凯哥Java(公众号:凯哥Java)
  • RocketMQ
  • 时间:2021-08-09 20:11
  • 4430人已阅读
简介 一、为什么消息会丢失?跟消息重复问题类似,消息丢失也可能出现在生产者、MQ、消费者三者中。这三者导致消息丢失的原因是什么呢?生产者:生产者推送消息到MQ中,由于网络抖动等原因消息没有推送到MQ中,或者消息推送到MQ中了但是MQ内部出错了,导致消息丢失。MQ:MQ接收到消息后先把消息暂存在OSCache中,消费者还没消费的时候MQ自己挂了,导致消息丢失。消费者:消费者消费到了这条消息,但是还没来得及

🔔🔔好消息!好消息!🔔🔔

 如果您需要注册ChatGPT,想要升级ChatGPT4。凯哥可以代注册ChatGPT账号代升级ChatGPT4

有需要的朋友👉:微信号 kaigejava2022

一、为什么消息会丢失?

跟消息重复问题类似,消息丢失也可能出现在生产者、MQ、消费者三者中。这三者导致消息丢失的原因是什么呢?

  • 生产者:生产者推送消息到 MQ 中,由于网络抖动等原因消息没有推送到 MQ 中,或者消息推送到 MQ 中了但是 MQ 内部出错了,导致消息丢失。

  • MQ:MQ 接收到消息后先把消息暂存在 OS Cache 中,消费者还没消费的时候 MQ 自己挂了,导致消息丢失。

  • 消费者:消费者消费到了这条消息,但是还没来得及处理,消费者自己挂了,但是消费者已经告诉了 MQ 自己已经消费完了,导致消息丢失。

二、如何解决消息丢失的问题

不同的消息队列解决消息丢失的方法是不同的,下面分别介绍不同 MQ 是如何解决消息丢失问题的。

1、RabbitMQ

1.1 生产者导致消息丢失

RabbitMQ 有两种方案可以避免消息丢失,一种是 RabbitMQ 的事务机制,一种是 Confirm 模式。这里先看一下事务机制。


RabbitMQ 客户端中 Channel 接口有这么几个函数,channel.txSelect 用来开启一个事务,channel.txCommit 用来提交事务,channel.txRollback 用来回滚事务。为了避免消息丢失,我们可以在发送消息前,先开启执行 txSelect 方法开启一个事务,接着发送消息,如果消息投递失败,执行 txRollback 回滚事务,再执行重试操作重新发送,如果消息投递成功,执行 txCommit 方法提交事务。


这个方案可以保证我们的消息一定是投递成功的,但是几乎没有人使用这种方案。因为这个方案是同步阻塞的,也就是一条消息发送后,一定要等到 MQ 回应之后执行了提交或回滚事务操作,才能继续往下执行。大致过程如下图所示:


RabbitMQ 还提供了另一种方法避免生产者消息丢失问题,那就是 Confirm 模式。Confirm 模式是这样子的,生产者发送消息后,不需要等待 MQ 的回应,MQ 接收成功后,会回调生产者的 ack 接口通知生产者消息投递成功了,如果 MQ 接收失败,会回调 nack 接口通知生产者消息投递失败了,生产者可以重新对这条消息进行投递。大致过程如下图所示:

1.2 RabbitMQ 导致消息丢失

RabbitMQ 自己弄丢了数据是由于持久化导致的。通常 RabbitMQ 接收到消息之后写入 OS Cache 中,就会给生产者返回接收成功的回应,这时如果 RabbitMQ 挂了,消息也就丢失了。解决方法可以结合生产者的 Confirm 模式,配置 RabbitMQ 持久化到磁盘之后,才给生产者返回 ack 信号。

1.3 消费者导致消息丢失

RabbitMQ 在消费者端弄丢数据,是由于 RabbitMQ 的默认自动提交 ack 导致的。解决方法就是关闭 RabbitMQ 的自动响应 ack 即可。这样消息没有处理完成,消费者挂了,RabbitMQ 会认为消息没有处理成功,会再次推送消息给消费者处理。

2、Kafka

2.1 生产者导致消息丢失

对于 Kafka 来说,生产者基本不会弄丢消息,因为生产者发送消息会等待 Kafka 响应成功,如果响应失败,生产者会自动不断地重试。

2.2 Kafka 弄丢了数据

Kafka 通常会一台 leader + 两台 follower,当生产者消息刚写入 leader 成功,但是还没同步到 follower 时,leader 宕机了,此时会重新选举 leader,新的 leader 由于还未同步到这条数据,导致该条消息丢失。


解决办法是做一些配置,当有其他 follower 同步到了消息后才通知生产者消息接收成功了。配置如下:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。

  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower。

  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了


按上面的配置配置后,就可以保证在 Kafka Broker 端就算 leader 故障了,进行新 leader 选举切换时,也不会丢失数据。

2.3 消费者导致消息丢失

Kafka 消费端弄丢数据原因跟 RabbitMQ 类似,Kafka 消费者会在接收到消息的时候,会自动提交一个 offset 给 Kafka,告诉 Kafka 消息已经处理了。处理方法也跟 RabbitMQ 类似,关闭 offset 的自动提交即可。

3、RocketMQ

RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。


在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息流程大致如下图所示:


在上面的事务消息流程中,基于这三个业务流程:发送 half 消息 -> 处理其他业务 -> commit/rollback。我们来讨论下面的几种情况:

  • 万一生产者发送 half 消息失败,怎么办?

可以做重试或记录消息到如文件、数据库等地方,直接给用户返回失败,本次请求失败。

  • 万一生产者发送 half 消息成功,但是处理其他业务失败,又该怎么办呢?

生产者发送 rollback 请求回滚 RocketMQ 中该条消息,本次请求失败。

  • 万一生产者发送 half 消息成功,但是 RocketMQ 由于某些原因如网络超时等导致没有响应,怎么处理?

由于 half 消息已发送成功,此时 RocketMQ 中已经有该条消息了,RocketMQ 会有一个补偿机制,补偿机制会回调你开发好的一个接口,询问你这条消息是要 commit 还是 rollback。

  • 万一生产者发送 half 消息成功,但是请求 commit 或 rollback 的时候失败了呢?

这个问题与上面的问题一样,都是通过 RocketMQ 的补偿机制来处理。

三、总结

本文分别从生产者、MQ 自身、消费者介绍了导致消息丢失的原因,消息丢失问题是一个比较常见但有必须解决的问题,通常使用消息队列的业务都是比较重要的业务,不能接受数据的丢失。


接着介绍了不同的 MQ 是如何解决消息丢失问题的。消费端导致的消息丢失都是由于数据还未处理成功确提前通知 MQ 消息已经处理成功了,禁止自动提交或异步操作即可,处理起来比较简单;生产者和 MQ 自身导致的消息丢失则比较难处理,RabbitMQ 使用了 Confirm 模式避免消息丢失;Kafka 则配置所有 follower 同步成功才给生产者响应推送消息成功;RocketMQ 则使用事务消息来保证消息的零丢失,针对不同的异常情况还提供了补偿机制进行处理。

来源:https://xie.infoq.cn/article/b2548617f42117436afca7f4d


TopTop