30 事务消息:如何实现一个完整的事务消息模块?
你好,我是文强。
上节课我们讲完了延时消息,这节课我们来看看消息队列中的事务消息。作为一个研发人员,我们对于事务的概念可以说是如雷贯耳了,最熟悉的应该就是MySQL或Redis的事务。
事务有一个特点,它的概念很明确,也很常见,但是它在不同的存储引擎的作用以及实现都是不一样的。所以如果我们想使用某个引擎中的事务功能,就必须先理解一下引擎中实现的事务的功能是什么,能达到什么效果,再去理解和使用它,不能想当然地把经典的 MySQL 的事务的功能套入到新的引擎去使用。
下面我们就从最基础的开始,先来看一下消息队列中的事务消息是什么,以及用户在什么情况下会用到事务消息。
消息队列中的事务是什么
从原始概念来看,事务是并发控制的单位,是用户定义的一个操作序列。简单解释,事务是一批操作的集合。它有ACID四个特性,分别是: 原子性、 一致性、 隔离性、 持久性。
- 原子性指这批操作要么全部成功,要么全部失败。
- 一致性指事务中的所有操作的结果符合预期,都能达到想要的结果。
- 隔离性指不同事务间是完全隔离的,不相互干扰。
- 持久性指事务一旦被提交,那么它的执行结果则是永久的。
总结一句话就是,一批操作执行后,具备ACID四个特性,那么这批操作就是事务操作。
又因为一致性、隔离性、持久性并不具有唯一性,在其他场景也会有这些概念。所以,事务可以进一步理解为: 一批操作必须具备原子性,必须同时成功或同时失败。
举个例子,在MySQL,操作集合中的操作指的是SQL语句,即MySQL事务指的是一批SQL语句执行后满足ACID的特性。比如下面这批语句指的就是MySQL的事务。
#set autocommit = false;
#开始事务
start transaction;
#查看当前表的数据
select * from t_test;
#删除整张表的数据
delete from t_test;
#查询该表数据,发现显示删除后的结果
select * from t_test;
#回滚
rollback
#查看当前表的数据,发现又回来了
select * from t_test;
#删除整张表的数据
delete from t_test;
#提交事务
commit;
所以事务在消息队列中也应该也是一系列操作的集合。那么问题就来了,是什么操作的集合呢?
在前面的课程中,我们知道消息队列中的核心操作是生产、消费、集群管控三种类型的操作。其中值得引入事务的主要是生产和消费操作。从客户端的视角来看,就有 生产事务、 消费事务、 生产+消费事务、消费+处理+生产 等等多种组合场景。从业界来看,不同消息队列实现的是满足不同场景的事务,不同事务的具体技术实现也是不一样的。
所以接下来我们先从功能层面来看一下业界四款主流消息队列 RabbitMQ、RocketMQ、Kafka、Pulsar 都支持什么形态的事务消息。
主流消息队列的事务功能
RabbitMQ的事务消息
RabbitMQ 支持的事务满足的是 生产消息的事务。即一批生产操作要么全部成功,要么全部失败。
如上图所示,RabbitMQ 的事务是在 Channel 维度实现的。将通道(Channel)设置为事务模式后,所有发送到该通道的消息都将被缓存下来。事务提交后,这些消息才会被投递到具体的Exchange中。如果事务提交失败,可以进行事务回滚,使消息不会被发送到目标Exchange。
RocketMQ的事务消息
RocketMQ 支持的事务满足的是 生产消息 和 本地事务 相结合的一种事务形态。这句话不太好理解,我们通过购物下单的例子来说明一下。
在下单流程中,我们一般需要将订单数据插入DB,并往消息队列发送订单消息。此时可能有两种情况:出现消息生产成功,DB插入失败;DB插入成功,生产写入失败。
RocketMQ 为了满足这种场景而设计实现当前的事务形态。即事务的操作集合有 发送消息 和 本地操作(比如插入DB)两种类型。只有两种操作都成功,事务才算成功。
如上图所示,生产者发送事务消息到Broker,Broker 会在Commitlog持久化存储这条消息并标记为不可见。当客户端本地操作执行完成后,再提交二次确认结果,将消息标记为可见,让消费端可以消费到消息。但特殊的是RocketMQ提供了客户端回查机制,也就是说当生产消息成功、本地事务失败时,Broker会根据一定的策略对客户端的本地事务发起回查,以尽量保证事务的成功率。
RocketMQ事务消息仅支持在 MessageType 为 Transaction 的 Topic 内使用。也就是说事务是在 Topic 维度生效的,事务消息也只能发送到类型为事务消息的 Topic 中。
Kafka的事务消息
Kafka 支持的事务跟 RabbitMQ 一样,满足的也是 生产消息 的事务。即保证一批生产操作要么全部成功,要么全部失败。和 RabbitMQ 事务不同的是,Kafka 的事务是在事务ID维度生效的。
如上图所示,客户端会先设置一个事务ID,多个生产者中设置的事务ID可以是一样的。用这个事务ID开启事务后,可以实现对多个 Topic 、多个 Partition 的原子性的写入。Broker 收到消息后,会按正常流程保存事务消息,只是将这些消息标记为不可见。当提交事务后,才将这些消息标记为可见,让消费端可以消费到。
另外在底层实现中,事务ID以及事务相关的状态保存在一个叫做__transaction_state 的内部Topic中,用来持久化保存事务ID、状态等信息。
Pulsar的事务消息
Pulsar 主要满足的是 消费+处理+生产 的事务。简单理解就是用来满足流场景将消费、处理、生产消息整个过程定义为一个原子操作,以保证整个操作的原子性,所以Pulsar 事务包含的操作有消费、处理、生产三种操作。
如上图所示,从底层实现来看,Pulsar 的事务处理流程与 Kafka 的事务处理思路大致保持一致。都是通过事务ID来标记事务,开启事务投递消息,都会将消息标记为不可见,同时往一个内部的Topic记录事务的状态数据。等全流程处理都成功后,才会提交事务。此时在生产端标记消息可见,在消费端提交消费位点,从而完成整个流程。 Pulsar 的事务可以看作是 Kafka 事务的升级版,它保证的是流处理操作的原子性。
讲到这里,我们可以总结出三点关键信息:
- 不同消息队列对事务消息的功能定义都不一样。
- 都是基于两阶段事务来设计的,分为生产消息(准备阶段)和提交事务(确认阶段)。
- 生产事务都是先将消息标记为不可见,等提交事务后再将消息标记为可见。
基于这三点,接下来我们从技术角度来拆解一下消息队列的事务消息的实现。
分布式事务理论基础
我们从事务的理论基础开始讲起。
从技术上来看,事务可以分为 单机事务 和 分布式事务。单机事务是指在单机层面完成一系列原子操作,比如MySQL的事务就属于单机事务。消息队列是分布式架构,所以消息队列实现的事务都属于分布式事务范畴。
从技术上看,分布式事务的解决方案一般有XA(2PC/3PC)和TCC两种。XA 指的是XA分布式事务协议,通常包含两阶段事务(2PC)和三阶段事务(3PC)两种实现方式。
两阶段事务(2PC)
两阶段事务将事务的提交分为两个阶段:请求阶段(Commit-request)和提交阶段(Commit)。简单来讲就是所有参与者将各自的执行结果告知协调者,协调者根据收到的结果决定所有参与者是提交还是回滚操作。
- 协调者询问所有参与者是否可以进行提交,并等待所有参与者响应。
- 所有参与者开始执行事务(但是不提交事务),并告知协调者自己的执行结果是成功(本地事务执行成功)还是失败(本地事务执行失败),然后等待协调者通知最终是提交事务还是回退事务。
- 如果协调者收到所有节点都执行成功了,那么通知所有节点全部进行提交事务操作,否则只要存在一个参与者执行失败,或者协调者超时了还没有收到全部参与者的执行结果,那么就通知所有参与者回退事务。
- 所有参与者根据协调者的通知,统一进行提交或者回退事务,并反馈信息。
二阶段事务本身存在一些缺陷,比如同步阻塞问题、单点故障、数据不一致等,所以业界提出了三阶段事务,对两阶段事务做了一些优化。
三阶段事务(3PC)
三阶段事务是二阶段事务的改进版,将2PC中的准备阶段一分为二,用于保证在最后提交阶段之前,所有的节点状态都是一致的。并且在协调者和参与者中都引入了超时机制,一旦参与者长时间没有收到协调者的通知,那么参与者将执行提交事务操作。
- CanCommit阶段:
- 协调者询问所有参与者是否可以进行提交,并等待所有参与者响应;
- 所有参与者预估判断是否可以提交(这里不执行事务),将结果(YES/NO)反馈给协调者;
- 如果上一阶段存在参与者返回NO,或者协调者等待超时,那么中断事务,不继续后面的操作;
- 如果所有参与者都返回YES,则进入PreCommit阶段。
- PreCommit阶段:
- 协调者通知参与者进入准备阶段,并等待参与者响应;
- 参与者执行事务(但不提交),并将执行结果反馈给协调者,然后等待协调者通知最终是提交事务还是回退事务。
- DoCommit阶段:
- 如果所有参与者都反馈了YES,那么协调者向参与者发送提交事务的通知;
- 参与者返回NO,或者协调者等待超时,那么协调者向参与者发送回退事务的通知。
3PC存在的问题是,协调者在向所有参与者发送回退事务指令的情况下,如果因为网络原因导致参与者没有收到通知,当参与者等待超时后会自动执行提交事务,这样就造成了数据不一致的现象。
TCC
TCC 主要在应用层面上,需要我们自己编写业务逻辑,TCC将业务分为Try、Confirm、Cancle三部分逻辑。Try为尝试执行业务,如果Try阶段执行成功则进入Confirm阶段,确认执行业务,否则进入Cancle阶段,取消执行业务。
- Try阶段:尝试执行业务,完成业务的检查,预留业务需要的资源。
- Confirm阶段:直接使用Try阶段的预留资源执行业务,这里不需要进行业务校验,因为在Try阶段已经校验过了。
- Cancle阶段:取消执行业务。
结合这些理论基础和前面提到的知识点,我们就可以知道, 消息队列事务其实就是 XA 两阶段提交的实现。而且 2PC 是目前消息队列事务的主要实现方式,所以接下来我们就来看一下消息队列基于2PC理论实现事务的技术要点。
消息队列的事务方案设计
从技术上来看,我们先以 生产事务 举例说明来拆解一下技术核心点。先来看下图,这是消息队列实现事务消息的大致流程。
总共可以分为4步:
- 初始化事务,假设我们以事务ID来标识一个事务,初始化的时候我们就需要把事务ID信息存储到事务协调者上。
- 第一阶段客户端会把消息都发送到不同的Topic和不同的分区中,因此数据是发送到不同Broker的,此时在事务没有提交的时候,数据应该是不可见的。
- 第二阶段客户端会提交事务,告诉协调者所有的操作都成功了,此时可以把这次事务相关的信息都提交给事务协调者,比如本次事务所包含的Topic和分区等。
- 当客户端提交事务后,协调者会通知把所有Broker上的这些数据都变为可见。
从中我们可以拆解出以下3个技术点:
- 因为事务的状态需要存储、查询,所以 需要将事务状态信息进行持久化存储。
- 因为多台数据是发送到多台Broker,所以在提交事务时客户端需要通知事务协调者,让事务协调者去通知所有Broker的数据变为可见,所以 需要一个事务协调器。
- 因为第一阶段提交的事务消息是不可见,第二阶段事务提交后可见或回滚,所以 需要设计一个将数据从不可见变为可见的机制。
下面我们来逐一分析下这几个技术要点。
如何存储事务状态信息
从前面的流程我们就知道,事务状态是用来记录、查询的,在提交事务的阶段去通知各个Broker 去执行提交事务的操作,所以需要为事务相关的元数据找一个地方存储。
我们知道,消息队列本身就是一个存储引擎,所以很多消息队列就会选择创建一个内部的Topic 用来存储事务相关的数据,这样实现起来就没有额外的开发成本。比如Kakfa和Pulsar都是存在一个内部的Topic里面的;RocketMQ的事务状态信息是存储在Commitlog中的;RabbitMQ的事务状态信息没有一个集中式的存储,而是依赖于 TCP 连接和信道级别的处理,都依赖于该信道内的上下文来获取事务信息。
所以在设计的时候,可以根据当前架构的特点去选择合适的存储。如果是在消息队列中,我比较建议是通过内部Topic来存储,因为可以复用Topic的分布式存储的能力。
事务协调者如何设计
事务的协调者从具体实现来讲就是一段代码逻辑。它主要负责 保存事务的状态信息 和 通知事务的提交、回滚。
参考图示,从技术上看,事务协调者的设计整体分为以下3步:
- 如何选出事务协调者?
- 协调者如何保存状态?
- 协调者如何通知其他Broker提交、回滚事务?
一般情况下,事务协调者都是由某台Broker担任的。所以直观上看,可以固定某一台Broker当作事务的协调者,这是最简单的,但是这会涉及到这台Broker故障时候的事务协调者的可用性切换问题。如果是单独实现,肯定技术上卡点不大,只是开发成本较高。
所以在消息队列中,比如Kafka或Pulsar,它们的实现是:
- 先创建一个内部的Topic来存储事务状态数据。
- 通过一定的算法将事务ID 哈希计算后,算出这个事务ID的数据存储在哪个分区。
- 这个分区Leader所在的Broker 就是这个事务ID的事务协调者。
选出事务协调者后,客户端就和这台Broker进行交互,直接将事务状态数据写入到算出来的这个分区中。然后当接收到客户端提交事务的请求时,再通过内部接口通知各个Broker提交事务。
如何实现消息从不可见到可见
其实这个问题和我们上节课讲到的延迟消息的消息从不可见到可见的思路基本是一致的。
从技术上来看主要有两种思路:
- 先将消息写入到其他的地方,然后等事务提交的时候再将数据写入到实际的Topic当中,目前主流消息队列RabbitMQ用的是这个方案。
- 数据按照原先的流程直接写入原先的Topic中,只是这条消息会加上事务ID相关信息,同时标记为事务消息,在事务没有提交的时候标记这条消息为不可见。
其中第一种方案我们在上一节课讲了很多了,也比较好理解,这里就不展开了。我们着重讲一下第二种方案的实现思路。
先来看一下下面这张图:
我们在基础篇讲过,消息队列底层是顺序存储的结构。所以如上图所示,如果以事务ID为标识来实现事务,此时在某个Topic中就有可能既有事务消息,又有非事务消息。其中事务消息可能有已提交的事务(如上图中黄色的m3),也有未提交的事务(如上图中粉色的m2)。
此时在顺序存储结构中,数据是顺序消费的。不过这里有一个问题,如果遇到未提交的事务,消费端能不能继续往下消费呢?
有两种情况:
- 不能向下消费。这种相当于消费者顺序消费,如果遇到有未确认的事务消息,此时依旧不往下消费。因为事务消息有过期时间,等到这条事务消息过期了或者被提交了再继续消费。这种方案是最简单的,没有太多工作量,也不需要对主流程进行修改。
- 可以向下消费。此时相当于会跳过这些未确认的消息,但是在后续的消费过程中都需要来判断一下这些消息是否已经被确认,是的话就需要投递给消费者。这种方式一般需要单独维护未确认的事务消息列表,以提高消费时检查的性能,实现起来成本较高。
在目前的主流消息队列中,这两种方案都有人在使用。第一种方案可能会出现消费卡顿、消费较慢的问题,但是可以保持消费顺序,且实现简单。第二种方案,消费速度较高,但是会有消费乱序的情况,且实现相对复杂。所以,从实现上来看,我们得考虑是否有顺序消息、消费速度的需求,然后选择其中一种合适的方案。
另外还有一个问题,我们知道在消息队列的顺序存储结构中,消息的内容是不能改的。那在这种模式下如何将一个消息从未确认变为已确认、从不可见变为可见呢?
如上图所示,为了解决这个问题,因为消息内容是不能更改的,所以一般需要引入另外一个数据结构来存储事务的状态,标记这个事务消息是否提交,然后在消费的时候进行判断。
总结
事务是一批操作的集合,这批操作要么全部成功,要么全部失败。在消息队列中,操作主要分为生产和消费,所以消息队列的事务也可以说是一批生产、消费的操作的集合。
不同消息队列实现的事务是不一样的。简单说集合中支持的操作是不一样的,从客户端的视角来看,就有生产事务、消费事务、生产+消费事务、消费+处理+生产等等多种组合场景。
目前主流消息队列RabbitMQ和Kakfa支持的是生产的事务,RocketMQ支持的是生产的事务+本地的事务,Pulsar支持的是消费 + 处理 + 生产的流场景的事务。
事务分为单机事务和分布式事务,消息队列的事务属于分布式事务的范畴。分布式事务的实现主要有两阶段事务、三阶段事务、TCC 三种。目前消息队列的事务主要都是基于两阶段事务的理论基础来实现的。
消息队列的事务方案设计,主要关注如何存储事务状态信息、如何实现事务协调者、如何实现消息从不可见到可见三个问题。
事务状态信息的存储一般建议通过内置的Topic来实现,成本较低。事务协调者可以参考Kafka和Pulsar的事务协调者的实现,它们的方案比较优雅。消息从不可见变为可见主要有延迟投递和标记两个思路,目前用的比较多的是标记。在消息生产时标记为不可见,事务提交时标记为可见。
思考题
为什么消息队列的事务不选择三阶段事务或者TCC呢?
欢迎分享你的想法,如果觉得有收获,也欢迎你把这节课分享给感兴趣的朋友。我们下节课再见!
上节课思考闭环
我们知道消息队列是Topic和分区模型,此时有个问题是:在存储引擎选择层面,是每个Topic或分区独享存储结构,还是Broker上所有的 Topic 共享存储结构?
这个问题要分为两个方面回答。
1. 基于轮询检测机制的实现。从个人的角度,因为延时消息天然根据时间排序,所以不需要在分区维度去单独存储。而每台Broker 所有Topic 共享一个存储结构,当有的Topic 延时消息太多,可能导致其他Topic的延时消息无法被及时处理。所以建议开启延时消息属性的Topic,独立一个存储结构。
2. 基于时间轮机制的实现。基于时间轮的方案,就可以一台Broker 共用一个时间轮,一份存储结构。因为时间轮每个刻度的数据是全部需要处理的,不需要查找,拿出来定位处理即可。