28 顺序消息和幂等:如何实现顺序消息和数据幂等?
你好,我是文强。
这节课我们来讲一下消息队列中的顺序消息和幂等机制实现。
在消息队列中,消息是否能有序是一个常常被问到的问题。因为在我们的业务中,比如在有序事件处理、数据实时增量同步等情况下,就需要消息队列支持顺序消息的机制。接下来我们就来看看消息队列中顺序消息的定义和实现。
顺序消息的定义和实现
在消息队列中,消息的顺序性一般指的是时间的顺序性,排序的依据就是时间的先后。从功能来看,即 生产端发送出来的消息的顺序和消费端接收到消息的顺序是一样的。牢记这个定义,对于我们后面理解顺序消息的实现很重要。
消息队列的存储结构特性
先来回顾一下我们在 第05讲 讲过的消息队列的底层存储结构。消息队列的底层消息是直接顺序写入到文件的,没有用到B树、B+树等任何数据结构。从技术上看主要有如下两个方面的原因:
- 复杂的数据结构会影响数据写入和读取的性能。
- 消息队列功能需求较为简单,不太需要复杂的数据结构来支持检索等操作。
所以,理想情况下顺序消息的实现是: 生产端按顺序发送消息,Broker端按接收到的顺序存储消息,消费端按照Broker端存储的顺序消费消息。那技术上的实现真的有这么简单吗?我们继续分析。
如上图所示,实现顺序消息的核心就是:Broker 接收到的消息的顺序要和生产端发出来的顺序是一致的。那在实际的场景中,会发生什么事呢?
- 如果有多个生产者,因为发送时间点不一样、网络延迟不一样,此时多个生产者发送到Broker的顺序就无法保证有序,此时落盘的顺序就无法保证。
- 如果是单个生产者且是异步发送,在 第07讲 我们知道了异步发送的底层实现是有多个异步线程在负责发送数据,此时可能会失败重试。然后就会出现 Broker 接收到消息的顺序和发送的顺序不一样的情况,此时落盘的顺序也无法保证。
- 如果Topic有多个分区,即使是一个生产者同步发送,因为生产者的生产分区分配策略的存在,消息可能会被分发到多个分区里面,此时也无法保证顺序性。
所以基于消息队列架构上的这些特性,我们接下来看看在消息队列中如何实现顺序消息。
基于顺序存储结构的设计
基于顺序存储结构的设计,是指保持消息队列底层顺序存储结构不变的前提下,实现顺序消息的技术方案。
如上图所示,这是一个三分区的Topic的底层存储结构。结合前面提到的3种实际的场景,为了实现顺序消息就需要满足单一生产者、同步发送、单一分区三个因素。就是说在当前的顺序存储结构中,消息队列实现顺序消息的前提是: 一个生产者同步发送消息到一个分区才能保证消息的有序。
其中, 单一生产者+同步发送 是为了解决生产端数据发送的有序性,只有前面一条数据写入成功后,后面的数据才能继续写入。从而规避重试发送、TCP网络延迟带来的不同的请求包到达Broker的时间不一样,导致Broker保存数据的顺序和客户端发送的顺序不一致的问题。
单一分区 是为了解决消息数据被发送到不同的分区中,导致的不同消费者消费不同分区中的消息的顺序不可控的情况。
那就有一个问题:如果只能是单一分区,消息队列就失去了水平扩容能力,而无法水平扩容对性能的影响会很大。那有解决办法吗?
有一个点我们需要知道, 在实际的业务场景中很少会要求所有的的数据是有序的。比如在数据库Binlog订阅的场景中,假设一个数据库里面有100张表,此时一般只需要保证每张表或者某一行记录的变更记录是有序的即可,因为下游的处理的主体一般是表或者行。所以在实际的场景中,我们只需要保证局部有序即可,而不需要全局有序。
那么从技术上看,我们就只要保证局部有序的数据写入同一个分区即可,即 根据某个标识将需要有序的数据发送到同一个分区中。我们举个例子来说明这个标识的作用。
如上图所示,给每一批需要排序的消息赋予一个标识,然后把标识为k1、k2、k3的消息分别固定发送到分区一、二、三,从而在满足单一生产者、同步发送、单一分区三个因素的同时,也可以具备水平扩容的能力。
这个方案有个缺点是:如果某一个标识的数据量特别大,就可能会出现写入数据倾斜。比如k1的数据量非常大,k2、k3的数据量很小,数据就会全部写入到k1中,从而导致集群Broker之间的负载出现倾斜,影响集群的稳定性。
目前来看,在当前消息队列的架构中,数据倾斜纯依靠服务端很难解决,需要客户端配合,将数据打散写入到多个分区中,不过这样做就无法保证消息的顺序了。
所以如果要同时解决数据倾斜和保证顺序消息,就需要引入复杂的数据结构。即类似MySQL的实现,在消费的时候对数据进行排序,然后再返回给客户端。这点就不展开了,我们放在思考题中解答。
接下来我们来看一下,目前主流消息队列对于顺序消息的支持方式。
主流消息队列的实现机制
四款主流消息队列对于顺序消息的实现机制如下:
Kafka 和 Pulsar是通过生产端按Key Hash的方案将数据写入到同一个分区。RocketMQ是通过消息组(功能上类似消息Key)将同一个消息组的数据写入到不同的MessageQueue。RabbitMQ是通过Exchange和Route Key的机制,将数据写入到不同的Queue里面。接下来我们来看一下实现的细节。
Kafka 和 Puslar 的生产端支持按 Key Hash 的生产分区分配策略。我们只需要给每条消息赋予一个消息Key,比如将属于AppID 1001 客户的消息的Key都设置为1001,此时 Key 为1001的消息会被固定发送到同一个分区。配合生产端的单个生产者和同步发送机制,就可以保证属于AppID 为1001的数据被有序存储。
RocektMQ 支持消息组(MessageGroup)的概念。在生产端指定消息组,则同一个消息组的消息就会被发送到同一个分区中。此时这个消息组起到的作用和Kakfa 的消息的Key是一样的。
RabbitMQ 在生产时没有生产分区分配的过程。它是通过Exchange和Route Key机制来实现顺序消息的。Exchange 会根据设置好的 Route Key 将数据路由到不同的 Queue 中存储。此时Route Key的作用和Kafka的消息的Key是一样的。
总结来看,Kafka 和 Pulsar 中的消息Key,RocketMQ的消息组,RabbitMQ的Route Key就是我们提到的 标识。只要标识一样数据,就会被路由到同一个分区进行存储,从而保持消息有序。如果你对生产端的Key Hash策略的实现还不太清楚,建议回顾一下 第07讲。关于这四款主流消息队列的生产端的实现,可以回顾一下第10~13讲。
接下来我们来看看幂等的定义和实现机制,先来看看什么是幂等。
幂等机制的定义和实现
我们在开发一些 App 服务时,经常会提到 接口幂等 这个词。接口幂等是指无论调用多少次,接口执行的结果都是一样的。以写入接口举例,接口幂等的语义就是不管写入多少次,这条消息只会被真正写入一次,不会被重复写入。查询、删除、变更类型的接口的幂等语义也是一样的。
那么消息队列中的幂等是指什么呢?
消息队列中幂等的定义
消息队列中的幂等主要指生产幂等和消费幂等,当然还有其他一些集群管控类的操作的幂等,比如创建Topic、切换Leader等。
生产幂等 通常指同一条消息不会被重复写入到Broker。即同一条消息客户端无论重复发送多少次,服务端也只会保存一份这条消息。
消费幂等 很少被单独提到。在 第08讲 和 第09讲 讲到的消息队列主要是基于消费位点的消费机制。只要客户端不提交消费位点信息,此时消费天生就是幂等的。即不管怎么消费,返回的都是同一条消息。而如果提交了Offset,就会自动消费下一条数据,也符合设计预期。在提交位点的操作中,即使重复提交了同一个位点,消费位点保存的都是同一个值,对消费也不会产生影响。
因此消费端谈的更多的是Exactly Once,即如何保证一条消息只会被消费一次。这个话题我们在后面讲消息队列事务时再展开细讲。所以接下来我们就重点来看看生产端的幂等是如何实现的。
生产幂等的设计实现
我们先来思考一个问题:如果一条消息客户端发送了多次,而Broker端只能保存一份,此时最核心要解决的是什么问题呢?
答案应该是: Broker 怎么识别接收到的多条消息是指同一条消息 ? 即如果 Broker 不知道收到的消息是否为同一条,那就无法拒绝重复的消息。
我们先来看下面这个示例:
你认为生产端的第二、三行是同一条消息吗?答案是:不是同一条消息。原因是Producer 主动调用了两次 send 方法,所以应该认为客户端发送了两条消息。此时这两条消息都需要保存。
再来看一下下面这个场景:
Producer 只调用了一次 send 方法,只发送了一条消息。但是因为底层网络或者其他故障,这条消息在底层被重传了2次,导致Broker端收到两条内容都是abc的消息。此时从Broker端看来,无法区分清楚收到的两条内容都是abc的消息,客户端是主动send了一次还是两次?
讲到这里,我们就知道了消息的唯一性应该是以Producer 的 send 调用为准。即 Producer send 一次就表示发送了一条消息,send 两次表示发送两条消息。所以就需要在客户端调用 send 的时候标识消息的唯一性,以标识消息的唯一。
从技术上来看,主要有两种方案。
1. 通过消息唯一 ID 实现幂等
通过消息唯一ID实现幂等是指在发送消息的时候,为每条消息分配唯一的消息ID(MsgID),来表示消息的唯一性。如下图所示,消息中携带 MsgID 发送到Broker,Broker 根据 MsgID 判断这条消息是否已经接收,如果没有就保存数据,否则就拒绝写入。
基于这种方案的前提是, 需要在生产端开启按Key Hash的机制,以保证同一个MsgID的消息可以发送到同一个分区中。否则如果同一条消息多次发送投递到不同分区,此时就无法判断之前是否接收过这条消息了。
这种方案主要有两个技术问题需要重点解决:一个是消息唯一ID的生成策略,另一个是Broker如何识别之前没有接收过这个消息ID。
消息唯一ID的生成策略,也可以理解为分布式唯一ID的生成,这样就比较好理解。技术上最简单的就是UUID,但是因为UUID的内容长度和有可能重复等问题,在消息队列中并不适用。
技术上看,我们在业务系统中常常基于MySQL/Redis/ZooKeeper等第三方系统或基于雪花(Snowflake)算法生成分布式唯一ID。但是在消息队列客户端直接集成 MySQL/Redis/ZooKeeper 等引擎的复杂度太高,所以这个方案基本不可用,因此雪花算法算是比较常用的方案。
雪花算法的目标是生成一个唯一的 int64(long) 的整形数字。它的核心思想是将long型的64位区分为5段进行组合,以生成规定的唯一数字。来看一下64位long型的结构:
如上图所示,64位long型的结构一般可以分为 符号位、 时间戳、 数据中心 ID、 机器 ID、 计数器 5个字段,每个字段的意义是不一样的。算法的核心是数据中心ID和机器ID这两个字段,只要这两个字段的内容是唯一的,那么就可以保证生成的long型的值是唯一的。
一般情况下从业务上看,只需要在Topic维度保持消息的顺序性就可以。我们在 第27讲 中提到消息队列架构中,会有集群ID和TopicID来唯一标识集群和Topic,因此可以将雪花算法中的数据中心ID和机器ID替换为集群ID和TopicID,这样就可以使生成的消息ID是唯一的。
Broker如何识别之前没有接收过这个消息ID,就是说需要在 Broker 端设计保存已经接收到的所有MsgID,用来在接收到消息后将消息ID和当前接收过的所有的消息ID做一个比较,以判断消息是否重复。
此时有一个问题是,理论上 Broker 需要保留已经接收过的所有消息ID的集合,而消息队列作为一个大吞吐的存储组件,Broker 历史接收的消息量会很大。消息ID的记录、匹配的性能肯定会有问题,并且也需要占用大量硬盘空间。
从实际实现的角度来看,有几个小技巧可以分享一下。
- 可以在 Topic 维度保存消息ID,不需要将集群所有的消息ID都存在一起,这样可以提高消息ID保存、加载和查询的性能。
- 因为消息队列的消息有过期的机制,消息ID的集合可以只保留当前还在生命周期内的消息ID。好处是消息ID的数量就会减少很多,从而提高性能、减少存储空间。缺点是客户端可能重新发送过期的消息(这点理论上可能性较低,几乎可以忽略),另外需要给消息ID集合设计过期机制,会增加一定的开发成本。
- 可以引入布隆过滤器来判断消息ID是否在已接收过的消息ID集合中,用来提高消息ID去重判断的性能。布隆过滤器的作用是用于检索一个元素是否在一个集合中。布隆过滤器就不展开讲了,它是大数据领域比较常用的算法,有兴趣的同学可以看一下这个文档 布隆过滤器。
整个方案的思路看下来,如果通过在 Broker 上识别重复的 MsgID 来实现幂等,需要在代码层面做很多细致的工作,代码工作量不小。那有没有简单一些的方案呢?接下来,我们就来看一下通过生产者ID和自增序号来实现幂等的方案。
2. 通过生产者 ID 和自增序号实现幂等
在前半部分中我们讲到,我们认为 Producer 调用一次 send 就是一条新的消息,所以幂等的逻辑主体应该是生产者。
所以该方案的核心思路是:为每个生产者赋予唯一的ID,生产者ID是全局唯一的。然后生产者启动时生成一个从0开始的自增序号,用来表示这个生产者发送出去的消息,每条消息分别有一个自增序号,比如 0、1、2……即通过Producer和seqnum二元组来唯一标识消息。
特别说明一下,生产者ID可以用上面提到的Snowflake算法来生成,因为生产者很少,甚至直接用UUID也可以,毕竟UUID重复的几率也非常低。
来看一下下面这张图:
在上图中,生产者有一个唯一ID p1,消息中会携带 ProducerID 和 seqnum 两个值来唯一标识这条消息。此时Broker会根据这个二元组判断是否收到过这条消息,是就保存,否就拒绝。
从实现的角度,服务端理论上依旧要保留这个生产者所有发送成功的seqnum的集合,这样才能判断消息是否有重复。此时如果生产者很多并且生产者一直没有重启的话,服务端就需要保留非常多的Producer和seqnum数据。 此时开发复杂度和上一个方案是差不多的,技术实现上只是把标识从MsgID 换为Producer + seqnum 而已,没有本质区别。
既然没有本质区别,那么这个方案跟第一种方案的区别哪里呢?从技术上看,有一个技巧可以不保存生产者所有发送成功的seqnum集合,但是又可以识别出所有已发送的seqnum。
这个技巧的思路是:我们不保留所有的seqnum,只保留最新收到的seqnum。此时如果收到的消息的seqnum是下一条msg,那么就正常保存数据。否则就放进队列中先等待,等待下一条msg收到后,再来判断是否保存该数据,甚至可以直接拒绝消息写入。
我们通过下面这张图来说明一下:
Broker 收到的 Producer ID 为p1的生产者的最新的seqnum 为4(current seqnum),那么下一条允许收到的seqnum是5。如果下一条是5,则保存数据,然后current seqnum更新为5,并等待seqnum为6的数据。而如果此时发送过来的是8,就可以有两种策略:
- 策略一:先把8缓存在Broker内存中,等待收到6和7后,再把8写入存储。这种方案的缺点是6和7可能永远接收不到,而Broker就需要一直保存8这条数据。因此可能会发生内存溢出或占用额外的存储空间。
- 策略二:给客户端返回可重试错误,触发客户端的重复发送机制。此时客户端重试写入时,如果Broker已经收到7的数据,在等待8了,此时这条消息就可以顺序写入了。
目前业界主流的消息队列只有Kafka支持幂等,其他三款消息队列RocketMQ、RabbitMQ、Pulsar都不支持。所以接下来我们简单看一下Kafka幂等机制的实现方式。
Kafka的幂等机制的实现方案
kafka的生产者在启动时会为每个生产者分配一个唯一ID。这个唯一ID是客户端从Broker申请的,不是自己生成的。来看下图:
从实现上看,Broker 通过在ZooKeper创建一个节点来生成自增ID,然后返回给客户端,从而保证生产者的ID是这个集群唯一的,属于基于第三方系统来生成分布式唯一ID。这里你要注意的是,唯一ID是 Broker 和 ZooKeeper 交互生成的,而不是客户端直接和 ZooKeeper 交互生成的,这是为了避免给客户端引入了过高的复杂度。
我们知道,Kafka 支持 Batch 语义,所以在发送消息的时候会为每批次消息赋予一个seqnum,用来标识这个生产者发送的消息的唯一性。和我们上面的思路有些不一样的是,Kafka 在每个Topic-Partition 维度都会有一个独立的seqnum,即通过PID、Topic、PartitionNum、seqnum 四元组来唯一标识一条消息。我们稍后再来说明为何这样实现。
我们先重点来看一下,Kafka 是怎样判断消息是否重复的?
从具体实现上看,Broker 端会缓存 PID 对应 Topic-Partition 的五个最近的 batch 信息。比如曾经接收过1、2、3、4、5、6六个消息,此时只会缓存 2~6 五个消息ID。如下图所示,Broker接收到数据后,会循环缓存中的数据,判断是否重复,重复就拒绝,不重复就直接保存。
此时有一个问题是,如果消息1过期后,客户端再把消息1发送过来,此时判断结果就是不重复,从而写入数据,那么数据就没办法达到真正意义上的幂等了。所以从具体实现上来看,Kafka的幂等是无法实现完全幂等的,只能支持部分的幂等。
既然这样,那为什么还用这个方案呢?
我们上面提到过,如果使用保留判断最后一个seqnum的方案,就会对性能有影响。如果实现强幂等则需要缓存全量seqnum,对内存和存储空间的压力较大,判断匹配性能也会较低,从而对写入性能产生影响。
Kafka 的实现方案可以说是一个取舍的方案。因为Kafka主打的是高性能,不能因为幂等的特性导致性能下降太多。通过缓存少量的数据来实现大部分情况下的幂等,也不会对内存和性能造成太大影响,只是付出的代价是不能支持完全的幂等。
现在回到上面的问题,为什么要用PID、Topic、PartitionNum、seqnum 四元组来唯一标识一条消息?
你可以想一下基于这个实现机制,如果每台节点只保留5条数据,那么几乎一下子缓存就被刷掉了。即使调大缓存大小,因为有些分区的数据量大,有些分区数据量小,此时一些小分区的数据缓存一下子就被挤出去了,从而完全无法实现幂等。因此基于这个四元组来缓存数据,就是一个可以理解的方案了。
至于为什么是5条,我这里也没答案,个人判断是拍的一个数字。 如果非得从技术方面解释,可以这么解释:
因为数据是存在内存中的,需要保证这个功能的缓存数据不会对内存造成压力,因此需要控制内存的使用总量。因此我们假设单台Broker可支持的分区数为P,单台生产者的数量为M,存的消息数量为N,此时消耗的内存总量为T,因此:
因为M是完全不可控的,P取决于用户的运营策略,某种意义也不可控,所以内核可控的就是N。因此N如果太大,则会对内存造成太多压力,所以N就不能太大。基于此,可能就拍了个5吧。值得一提的是,5 是hard code在代码里面的,不能改动。
总结
消息队列中的顺序消息是指时间先后的顺序,即生产消息的顺序和消费消息的顺序需要保持一致。主要实现思路是基于底层顺序存储的结构特点来设计的。这种实现方式成本较低,也比较贴合消息队列架构和功能的特点。
核心逻辑是需要满足单一生产者、同步发送、单一分区三个因素。单一生产者+同步分区是为了解决生产端数据发送的有序性,单一分区解决的是多分区消费的无序性。
在现实场景中,一般不需要数据全量有序,而是局部数据的有序。 因此可以为需要局部有序的消息赋予同一个唯一标识,然后将同一个标识的消息发送到一个分区,从而解决一个Topic只有一个分区,从而导致性能不足的问题。
目前主流消息队列的顺序消息的方案,都是基于上面的思路来实现的。单从技术合理性来看,数据的顺序性也可以通过查询时的重排序来完成,即在查询的时候,根据需要排序的信息完成排序,然后发送结果给客户端。这种方案的技术复杂度较高,并且性能较低,不太适合消息队列,所以消息队列用得比较少。
消息队列的幂等分为生产幂等、消费幂等、集群管控类操作幂等三种类型。从功能上来看,在消息队列中的幂等主要指生产幂等。
生产幂等的技术实现方案主要分为“通过消息唯一ID实现幂等”和“通过生产者ID和自增序号实现幂等”两种思路。主要区别是,解决 Broker 如何唯一识别一条消息的思路不一样。第一种是通过唯一的消息ID来唯一识别一条消息,第二种是通过生产者ID和消息序号seqnum来唯一识别一条消息。
从实现的角度来看,两种方案的实现思路差别不大。开发成本上各有各有优劣势,都可以使用。个人会比较倾向于第一种,因为第一种思路中的消息ID是消息队列的一个基本需求,这部分的开发工作量不会浪费。从而只需要在服务端实现消息ID的去重即可,整体来看实际收益较高。
目前业界主流消息队列Kafka的幂等机制,就是基于第二种思路来实现的。
思考题
前面我们讲过,排序的另外一个思路是查询时的重排序,这里我问一个MySQL排序的问题,来和消息队列顺序消息的实现方案做对比,让你有一个借鉴的效果。
从技术上看,MySQL如何实现数据的有序?
期待你的思考,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!
上节课思考闭环
假设将元数据存储服务从 ZooKeeper 换成MySQL,元数据的增删改查应该是怎样的一个流程?
1. 写入操作:将ZooKeeper的create操作,换成MySQL的insert操作。
2. 更新操作:将ZooKeeper的update操作,换成MySQL的update操作。
3. 删除操作:将ZooKeeper的delete操作,换成MySQL的delete操作。
4. 心跳超时:原先只需要Zookeeper的客户端在ZooKeeper上创建临时节点就可以了,在ZooKeeper的底层集成了连接超时、删除临时节点的操作。如果是MySQL,大致的思路是在Broker中维护一个线程定时去更新MySQL中的行记录的时间,然后通过另外一个线程去检查是否超期,然后删除记录。
5. Watch机制:ZooKeeper自带了Watch机制,很多监听操作ZooKeeper底层自动完成了。如果是MySQL,同样的得不停地定时去表里面查询配置是否更新,然后执行后续的操作。
从上面几步看下来,有两个比较明显的结论:
1. 可以通过 Interface 机制去定义接口,然后用不同的实现方式去实现这些接口,从而支持多种底层引擎的元数据服务。
2. 使用ZooKeeper这种分布式协调服务,比使用MySQL这种持久化存储引擎来存储元数据更加简单好用。