java分布式事务——最终一致性,最大努力通知总结!

  • 作者: 凯哥Java(公众号:凯哥Java)
  • 分布式相关
  • 时间:2022-11-02 20:51
  • 3894人已阅读
简介 关于CAP,BASE理论,以及TCC,seata解决方案,可以参考我上一篇博客.《Java分布式事务-seata,tcc解决方案总结》本文是接着一篇继续的。4.分布式事务解决方案之可靠消息最终一致性 4.1.什么是可靠消息最终一致性事务     可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者

🔔🔔好消息!好消息!🔔🔔

 如果您需要注册ChatGPT,想要升级ChatGPT4。凯哥可以代注册ChatGPT账号代升级ChatGPT4

有需要的朋友👉:微信号 kaigejava2022

关于CAP,BASE理论,以及TCC,seata解决方案,可以参考我上一篇博客.《Java分布式事务-seata,tcc解决方案总结》 本文是接着一篇继续的。

4.分布式事务解决方案之可靠消息最终一致性

 4.1.什么是可靠消息最终一致性事务    

    可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

    此方案是利用消息中间件完成,如下图:

    事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。

a9ff8aa8b763db2204d8ea0b8723b17b.png

 因此可靠消息最终一致性方案要解决以下几个问题:

  1.本地事务与消息发送的原子性问题

本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最

    终一致性方案的关键问题。

    先来尝试下这种操作,先发送消息,再操作数据库:

begin transaction;
  //1.发送MQ
  //2.数据库操作
commit transation;

这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。你立马想到第二种方案,先进行数据库操作,再发送消息:

begin transaction;
  //1.数据库操作
  //2.发送MQ
commit transation;

  这种情况下貌似没有问题,如果发送MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但MQ其实已经正常发送了,同样会导致不一致。


  2、事务参与方接收消息的可靠性

    事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。

  3、消息重复消费的问题

    由于网络2的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。

    要解决消息重复消费的问题就要实现事务参与方的方法幂等性。


4.2.解决方案

  4.2.1.RocketMQ事务消息方案

     RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。

RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

    在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。

3d3465f38edfa73869345a25d57f590f.png

 执行流程如下:

    为方便理解我们还以注册送积分的例子来描述 整个流程。

    Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。

    1、Producer 发送事务消息

    Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

    本例中,Producer 发送 ”增加积分消息“ 到MQ Server。

    2、MQ Server回应消息发送成功

    MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

    3、Producer 执行本地事务

    Producer 端执行业务代码逻辑,通过本地数据库事务控制。本例中,Producer 执行添加用户操作。

    4、消息投递

    若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。

    MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

   5、事务回查

    如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

    以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

    RoacketMQ提供RocketMQLocalTransactionListener接口:

  public interface RocketMQLocalTransactionListener {
   /**
   ‐ 发送prepare消息成功此方法被回调,该方法用于执行本地事务
   ‐ @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
   ‐ @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
   ‐ @return 返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
     */
       RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
   /**
   ‐ @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
   ‐ @return 返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
     */
       RocketMQLocalTransactionState checkLocalTransaction(Message msg);
   }


5.分布式事务解决方案之最大努力通知

  5.1.什么是最大努力通知

    最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:

15c29f2a4b7bf3cd0925d2e0eba64d11.png

交互流程:

    1、账户系统调用充值系统接口

    2、充值系统完成支付处理向账户系统发起充值结果通知。若通知失败,则充值系统按策略进行重复通知

    3、账户系统接收到充值结果通知修改充值状态。

    4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

  通过上边的例子我们总结最大努力通知方案的目标:

    目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

  具体包括:

    1、有一定的消息重复通知机制。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

    2、消息校对机制。

    如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

    最大努力通知与可靠消息一致性有什么不同?

    1、解决方案思想不同

    可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。

最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

    2、两者的业务应用场景不同

    可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。

    3、技术解决方向不同

    可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。 


  5.2.解决方案                                                                                                             

    通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。

    方案1:

92c47d28e67d797d85b8f2b1f03db405.png


本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:

    1、发起通知方将通知发给MQ。使用普通消息机制将通知发给MQ。

    注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。(后边会讲)

    2、接收通知方监听 MQ。

    3、接收通知方接收消息,业务处理完成回应ack。

    4、接收通知方若没有回应ack则MQ会重复通知。

    MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。

    5、接收通知方可通过消息校对接口来校对消息的一致性。

方案2:

  本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:

084b7bdddc6832aab5bf34011f78c82d.png

交互流程如下:

    1、发起通知方将通知发给MQ。

    使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。

    2、通知程序监听 MQ,接收MQ的消息。

    方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。通知程序若没有回应ack则MQ会重复通知。

    3、通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消息。

    4、接收通知方可通过消息校对接口来校对消息的一致性。

  方案1和方案2的不同点:

    1、方案1中接收通知方与MQ接口,即接收通知方案监听 MQ,此方案主要应用与内部应用之间的通知。

    2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。


源码地址:https://github.com/kaixuanzhang123/dtx.git


TopTop