27 基础功能:Topic、分区、订阅等基本功能是如何实现的?
你好,我是文强。
在基础篇和进阶篇,我们构建了一个分布式的消息队列集群。接下来我们就开始往这个集群里面添加各种功能。我们会用八节课来分析 消息队列的基本功能、 顺序消息和幂等、 延时和定时消息、 事务消息、 死信队列和优先级队列、 消息查询、 Schema、 WebSocket 等功能的技术选型和设计思路。当我们将这些功能加入到集群之后,一个完整的消息队列基本就打造出来了。
在本节课程中,我们会重点讲解以下4个消息队列基础功能的实现,这四部分的内容相互独立,你可以挑有需求的知识点来学习。
- 静态/动态配置的实现:配置是集群的基础模块,看一下如何在集群中实现静态和动态配置。
- 集群和节点元数据的设计:集群构建需要设计、存储集群和节点的元数据,看一下它们应该包含什么内容,如何存储。
- Topic和分区的支持:Topic和分区是消息队列生产消费的最小单位,看一下Topic和分区的元数据应该包含哪些内容,以及如何存储这些元数据。
- 消费分组(订阅)的管理:详细讲一下消费进度的存储格式和保存机制的实现。
为了便于理解,我们假设现在的集群是基于第三方存储引擎 ZooKeeper 来实现的。具体如下图所示:
如何实现静态和动态配置
静态配置
静态配置是我们在业务开发中经常用到的,配置信息一般以YAML、JSON、Properties等格式存储。在服务启动时加载配置文件到内存当中,进行业务逻辑处理。
静态配置的好处是简单易用,能满足大部分的需求。缺点是每次变更都需要修改文件内容,并重启服务。在一些业务中,重启应用是比较重的操作,可能会对业务产生一定的影响。所以,就需要引入动态配置。
动态配置
动态配置是指服务可以从某个地方动态加载配置信息。如下图所示,即 Broker 启动时会通过某个第三方服务,比如ZooKeeper、etcd、某个名字服务等来加载配置。当需要变更配置时,直接往第三方服务写入新的配置即可,Broker会自动监听到配置的变更。
由于动态配置在技术上的实现方式比较多,这里我们就介绍两种常用的。一种是 基于第三方组件,另一种是 基于本地文件。
第三方组件,我们以ZooKeeper的实现为例。
基于ZooKeeper是指我们可以在ZooKeeper创建持久节点存储配置信息,然后Broker通过ZooKeeper 提供的 Watch 机制来监听节点。当节点内容变更时可以及时感知,并进行后续的处理。
从实现上来看,我们可以在ZooKeeper上创建 /config 节点,在 /config 下创建Cluster、Broker、Topic 3 个节点,分别来保存集群、节点、Topic 的 3 个维度的配置信息。然后Broker监听这 3 个节点的变更来执行后续的配置变更操作。ZooKeeper的目录结构可以如下设计:
从运行机制上来看,Broker 监听ZooKeeper有以下两种思路:
- Broker 会通过 ZooKeeper 的Watch机制监听 /config 下的每个节点,感知到节点内容的变化,再进行后续的操作,比如更新配置。
- Broker 会监听一个专门通知配置更新的节点,比如/config/notification,然后根据这个通知节点的内容,再去判断哪个配置发生了变更。
这两种思路的主要区别在于: 第一种方案变更了配置后,Broker会立刻监听到,立即生效。第二种方案允许先变更配置而不立即生效,在需要生效的时候再给/config/notification写入通知数据,触发配置生效。
从灵活性来看,第二种思路会更合理,能满足更多场景。至于代码实现,你可以参考这个文档 Example usage for ZooKeeper Watcher。
基于本地文件的思路是指在代码实现中,通过代码技巧监听本地配置文件的变更,只要本地文件变更了,就进行配置变更的操作。这种方案不需要依赖第三方组件,实现也简单,所以在很多场景中都会这么实现。缺点是,如果是分布式集群,配置变更就得变更所有的Broker节点上的配置,比较繁琐,流程也很长。一般都需要额外的系统来辅助变更,比如运营发布系统或Broker内置一个配置变更的接口供远程调用。
以上两种方案,从技术上看,基于第三方组件会更合理。但是在消息队列里面,因为需要保持架构的简洁度,基于本地文件也是一种常用的方案。比如Kafka和Pulsar就是基于ZooKeeper来实现的动态配置,因为架构中已经集成了ZooKeeper。RocketMQ 的Nameserver是一个缓存组件,没有实际的存储和Watch机制,无法实现类似ZooKeeper的效果,所以用的是热加载本地文件的方案。
接下我们来看一下集群和节点的元数据一般会包含哪些信息,以及如何存储。
集群和节点元数据的格式和存储
集群元数据
集群元数据是用来保存集群维度的一些基本信息。最简单的集群元数据一般只需要包含集群ID和集群名字两个信息。集群ID用来唯一标识这个集群,集群名称让我们可以直观识别集群的用途,所以集群的元数据结构一般如下所示:
集群维度的元数据需要持久化存储,所以我们可以在 ZooKeeper 上创建持久化节点 /Cluster 来存储集群的的元数据。
因此集群初始化的流程是:Broker 启动时检查 ZooKeeper的 /Cluster 节点是否创建,以确保集群已经经过了初始化。一般是集群中的第一台Broker节点启动时,会触发集群初始化的逻辑。初始化的逻辑会自动生成一个集群ID,然后根据配置好的集群名称,一起写入到 /Cluster 节点里面。
节点元数据
节点元数据是用来保存 Broker 维度的一些基本信息。Broker 元数据一般至少要包含节点的唯一标识BrokerID、节点的IP、节点监听的端口 3 个字段。
值得一提的是,有的消息队列会用“ 节点IP + 节点监听端口”二元组来标识Broker,而不是通过BrokerID。但是在容器化和云上的CVM环境中,IP 分配是随机的。当节点销毁后,重新分配出来的节点的IP可能会是一样的。从而在一些极端场景下,可能发生客户端识别集群错误,从而出现异常的情况。
所以,这里还是建议你使用BrokerID来唯一标识集群。所以Broker节点的元数据结构一般如下所示:
从技术上看,节点的元数据存储一般有这样两个思路:
- 所有Broker元数据都存储在一个ZooKeeper Node,比如/node,节点内容如下:
[
{
"BrokerID": 1,
"BrokerIP": "192.2.1.1",
"BrokerPort": 8901
},
{
"BrokerID": 2,
"BrokerIP": "192.2.1.2",
"BrokerPort": 8901
},
{
"BrokerID": 3,
"BrokerIP": "192.2.1.3",
"BrokerPort": 8901
}
]
- 每个Broker元数据独立一个 ZooKeeper Node 存储,比如 /nodes/broker1、/nodes/broker2 等,节点内容如下:
因为Broker数量不会很多,一个集群大概是百或千的量级,所以如果从Broker数量来看,这两个方案的区别不大。目前业界主要使用的是第二个思路,主要的原因是: Broker元数据分开存储方便管理,避免节点间相互影响,也可以避免单个ZooKeeper Node的数据量过大存不下。
所以在持久化存储方面,我们可以在ZooKeeper上创建持久化的 /brokers 节点,然后在这个节点下为每台Broker创建名称为BrokerID的临时节点,用来存储Broker的元数据。所以 Broker 在集群中的元数据的存储结构就如下图所示:
Broker 初始化的流程是:Broker启动时会在 /Brokers 下面创建名称为自身BrokerID的临时节点,然后写入自己的元数据。Broker 异常时,会自动删除注册的节点。
接下来,我们来看一下如何在一个空的集群中支持Topic和分区。
在集群中支持Topic和分区
我们在前面的课程中讲到Topic和分区是消息队列集群的一个基本概念。不知道你有没有过这样的疑问:消息队列一定要有Topic和分区的概念呢?
一定要有Topic和分区吗?
从业界主流消息队列来看,RocketMQ、Pulsar、Kafka都有Topic的概念。而RabbitMQ 只有Queue,没有Topic概念,但是RabbitMQ中的Exchange + Route起到的就是Topic的作用。消息发送到Exchange 中后,会根据配置的路由关系,将数据发送到不同的Queue中。
从性能角度来看,如果只有一个分区,就会有性能瓶颈,无法提供水平扩容的能力。此时就需要在前面包一层概念,让它来组织多个分区,这就是Topic。
所以如下图所示,在消息队列中 Topic和分区是必须的概念,Topic是用来组织分区的逻辑概念, 分区用来存储实体的消息数据。
接下来看一下,Topic和分区的元数据都包含哪些信息呢?
Topic和分区的元数据
从功能上来看,Topic 至少需要TopicID、Topic名称、分区和副本在集群中的分布3个元素。我们用一个简单的JSON格式字符串来表示Topic的元数据信息。
{
"TopicID": "shlnjdlfsakw",
"TopicName": "test",
"Replica": [
{
"Partition": 0,
"Leader": 0,
"Rep": [
0,
1
]
}
]
}
可以看到,在上面的示例中,TopicID、TopicName、Replica分别表示Topic的唯一ID、Topic的名称以及分区副本的分布信息。其中 Replica 里面的 Partition 表示当前是第几个分区,Leader表示分区的Leader是哪台Broker,Rep表示副本是分布在哪些Broker上的。
我们知道Topic的信息是需要持久化保存的,那如何存储这些元数据呢?
元数据的持久化存储
在基于 ZooKeeper 的架构中,我们可以在ZooKeeper 集群中创建一个持久化的节点来存储Topic的相关信息。
如下图所示,我们可以在ZooKeeper的根目录创建持久节点 /topics 存储Topic信息,然后为Topic1、Topic2、Topic3这 3 个Topic分别创建/topics/topic1、/topics/topic2、/topics/topic3 节点来存储对应Topic的元数据。
在集群启动时,就会为每一个分区选举出来一个Leader,然后更新对应的Leader字段信息。当节点出现变动时,也会触发Leader节点切换流程和更新Leader节点信息的逻辑。
Leader切换、创建Topic和分区,以及如何生成分区和副本分布关系的流程,你可以参考 第15讲,这节课不再重复。
接下来我们来看一下消费分组的进度是如何管理和保存的。
消费分组(订阅)的进度管理
消费进度主要有两个关联操作:
- 消费者根据消费分组名称来获取分区的消费进度信息。
- 消费者在消费分组维度提交分区的消费进度信息。
从功能上来看,获取消费进度的频率比较低,一般初始化的时候拉取一次,而更新Offset是每次消费请求都要执行的,所以消费进度是一个低查询、高更新的操作。围绕着这些特性,我们先来看一下消费进度的存储格式应该是什么样子的。
消费进度的存储格式
前面的课程中我们讲过,消费进度的信息是在消费分组记录的,一个消费分组可以消费多个Topic。所以我们应该根据“ 消费分组 + Topic + 分区号”三元组来记录消费进度。
假设消费分组 group1 同时消费 Topic1、Topic2、Topic3 三个Topic,这三个Topic的分区数量分别为3个、1个、2个。那么消费位点的存储格式可以如下所示:
group1,Topic1,0,0
group1,Topic1,1,0
group1,Topic1,2,0
group1,Topic2,0,0
group1,Topic3,0,0
group1,Topic3,1,0
在上面的实例中,每一行分别表示消费分组、Topic、分区号、消费进度四个信息。
从存储格式上来看,一般存储的格式会用 JSON格式或者自定义的行格式(行格式也可以是二进制格式),用JSON格式和行格式存储的示例如下所示:
// 存储格式:
{
"GroupName":"group1",
"TopicName": "topic1",
"Partition":0,
"Offset":0
}
// 行格式
group1,topic1,0,0
两种格式的区别主要在空间占用和搜索效率两个方面。所以在设计存储格式的时候,就需要考虑这两个事情:
- 搜索效率高。 设计好合适的存储数据结构,比如哈希表、B树等。这里细节太多,就先不展开,有需要的话我们在课后进行讨论。
- 不要占用太多额外的存储空间。 这一点考虑的就是存储的格式定义,比如JSON或二进制格式。
从功能和开发成本上看,两者的区别不大。从可读性上,个人建议直接用JSON格式就可以了。 主要考虑是这个消费位点的数据量不会很大,JSON的可读性较好。即使用JSON格式,占用的额外空间也不会太大。
接下来,我们来看一下消费进度是如何保存的。
消费进度的保存机制
从技术上来看,最常用的保存消费进度的思路有存ZooKeeper、存本地文件、存内部的Topic、存其他存储引擎4种思路。下面我们逐一分析下。
存ZooKeeper是当前架构下一种最简单直观的方法。 通过在 ZooKeeper 中为每个消费分组创建一个持久化的节点,比如/groups/group1,来保存每个消费分组的消费进度信息。
因为 ZooKeeper 自带了分布式存储和一致性,所以从功能实现上是最简单直接的。但是当消费分组或者消费者数量很多时,就会占用较多的 ZooKeeper 存储空间。另外消费进度提交时都需要频繁对ZooKeeper节点进行更新,这样会给 ZooKeeper 造成较大的压力,从而容易使ZooKeeper集群高负载,导致集群异常。
存本地文件也是一种常用的思路。 这种方案需要选择合适的数据结构来存储数据,以便进行高效的写入和更新。一般情况下,如果实现优雅,性能则不用担心。另外存本地文件,数据在本地硬盘存储,存储容量一般是够的。
这个方案最大的问题是,消费进度的数据文件不是分布式存储的。当单机节点损坏或单机故障时,就会导致无法读取消费进度数据或者消费进度丢失。为了解决这个问题,我们就需要在不同节点上同步消费进度数据,并保持消费数据的一致性。
这里你可以想一下,要在不同节点同步文件数据,并且保持文件数据的一致性,是不是和分区副本的模型很像?从技术上来看,是一样的。所以,就有了将消费进度数据存储在某一个内部的Topic的思路。
存内部的Topic是指在集群中创建一个特殊名字的Topic,比如consumer_group_offset。此时这个Topic不允许普通用户读写,只允许用来保存消费进度。
如下图所示,我们知道分区的数据是顺序存储的,并且消息队列的分区是没有搜索和更新的能力的。此时就有一个问题,消费进度如何读取和更新呢?
从功能特性上来看,一个消费分组在一个分区维度的消费进度的值只有一个,即最新的那个值。
所以有一种思路是: 提供有压缩功能的Topic,即Topic支持根据消息的Key对消息进行压缩。比如消费分组 group 消费Topic1的0分区的进度的消息Key为group1-topic1-0,Value为消费位点,比如100。根据消息的Key进行压缩,只保留最后一条数据。此时提交Offset的时候就不需要更新,只需要将最新的消费进度的数据 “group1-topic1-0:101” 写入到Topic中。
依托Topic的压缩特性,只保留最后一条数据,就可以让数据量大幅度减少。再配合一些编码技巧,如缓存、索引、二分查找等,就能大大提升获取当前消费分组的位点信息的速度。
存其他存储引擎是指将消费进度存储在其他的存储引擎中,比如MySQL、Redis或其他第三方引擎等。这种方案本质上和存ZooKeeper的思路是一样的。遇到的问题也一样,主要是性能和稳定性方面的不足。和存ZooKeeper最大的区别在于,如果在消息队列Broker上采用这个方案,就得单独为存储消费进度信息而引入一个存储组件,这样会增加系统复杂度,还要考虑第三方组件的稳定性问题。
一般将消费进度存储到第三方引擎的场景是在消费者客户端。即消费者不使用消费分组来管理消费进度,而是自定义管理消费进度信息,此时可能会将消费进度存储到第三方存储引擎中,以保证消费进度不丢失。
从选择上来看,目前Kafka的消费位点保存方案最开始用的是存ZooKeeper,后来用的是存内部的Topic的方案。RocketMQ用的是存本地文件的方案。Pulsar用的是存其他存储引擎BookKeeper的方案,Pulsar能用这种方案的原因在于它是存算分离的架构,天然自带了存储引擎BookKeeper,因此不会因为存储消费进度而引入额外的复杂度和成本。
总结
这节课的元数据都是以必备的、最基础的角度来设计的。随着业务功能的增加,元数据就会不断丰富。所以主流消息队列的元数据,因为功能和架构的不同都会很复杂,但是基础的信息都是一样的。
集群和Broker元数据是构成消息队列集群的基础。最基础的集群元数据主要包含集群ID和集群名称两个信息。最基础的Broker主要包含 BrokerID、BrokerIP、BrokerPort 三个信息。只要有这几个信息,就能构建成一个消息队列集群。
Topic是消息队列最基本的概念,从理论上来说没它不行。它的主要功能是给分区提供一个横向扩容的能力,并用来组织多个分区的逻辑关系。最基础的Topic和分区的元数据主要包含TopicID、Topic名称、分区和副本在集群中的分布信息三部分。在分布信息中还会包含分区号、分区Leader、副本的分布3个子信息。
消费进度一般以“消费分组 + Topic + 分区号”三元组来记录消费进度。记录的格式一般有JSON格式和自定义行格式两种。从功能和开发成本来看,一般情况下JSON格式就够了。
消费进度的保存机制主要有存ZooKeeper、存本地文件、存内部的Topic、存其他存储引擎四种思路。它们在不同的架构中,优缺点不一样,根据当前的架构选择合适的方案即可。从业界主流消息队列来看,四种方案都有在用。
思考题
假设将元数据存储服务从 ZooKeeper 换成MySQL,元数据的增删改查应该是怎样的一个流程?
期待你的分享,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!
上节课思考闭环
为什么在去ZooKeeper的路上选择了可插拔的元数据存储框架,而不是去掉第三方存储引擎?
在我看来,引入其他元数据存储引擎只能缓解,不能根治,在ZooKeeper上遇到的问题,在其他引擎上也会遇到。并且引入其他引擎后,架构的部署成本也没有降低,复杂度依旧很高。我认为,比较好的方式是去掉第三方元数据引擎,在Broker集群内部实现元数据的自我组织和管理。
那么为什么Pulsar还会走可插拔的路径呢?
在我看来和开发成本有关,因为去掉第三方元数据存储依赖是一件工作量非常非常大的事情,需要对架构做一个非常大的改动,会消耗社区大量的人力,不适合Pulsar当前快速发展的阶段。使用可插拔框架的思路,改动量较小,可以引入 etcd 这样的性能较高的引擎来缓解当前遇到的问题,引入单机的RocksDB来降低集群的部署复杂度,引入内存存储来降低单机版本的部署成本等等,总之会带来蛮多好处。有兴趣的话,你可以看一下这篇文章 《Apache Pulsar 轻装上阵:迈向轻 ZooKeeper 时代》,对照着 Kafka 500 的KIP来看,你还可以对这两个方案做进一步的思考,感悟会更深。