Skip to content

27 基础功能:Topic、分区、订阅等基本功能是如何实现的?

你好,我是文强。

在基础篇和进阶篇,我们构建了一个分布式的消息队列集群。接下来我们就开始往这个集群里面添加各种功能。我们会用八节课来分析 消息队列的基本功能顺序消息和幂等延时和定时消息事务消息死信队列和优先级队列消息查询SchemaWebSocket 等功能的技术选型和设计思路。当我们将这些功能加入到集群之后,一个完整的消息队列基本就打造出来了。

在本节课程中,我们会重点讲解以下4个消息队列基础功能的实现,这四部分的内容相互独立,你可以挑有需求的知识点来学习。

  1. 静态/动态配置的实现:配置是集群的基础模块,看一下如何在集群中实现静态和动态配置。
  2. 集群和节点元数据的设计:集群构建需要设计、存储集群和节点的元数据,看一下它们应该包含什么内容,如何存储。
  3. Topic和分区的支持:Topic和分区是消息队列生产消费的最小单位,看一下Topic和分区的元数据应该包含哪些内容,以及如何存储这些元数据。
  4. 消费分组(订阅)的管理:详细讲一下消费进度的存储格式和保存机制的实现。

为了便于理解,我们假设现在的集群是基于第三方存储引擎 ZooKeeper 来实现的。具体如下图所示:

如何实现静态和动态配置

静态配置

静态配置是我们在业务开发中经常用到的,配置信息一般以YAML、JSON、Properties等格式存储。在服务启动时加载配置文件到内存当中,进行业务逻辑处理。

静态配置的好处是简单易用,能满足大部分的需求。缺点是每次变更都需要修改文件内容,并重启服务。在一些业务中,重启应用是比较重的操作,可能会对业务产生一定的影响。所以,就需要引入动态配置。

动态配置

动态配置是指服务可以从某个地方动态加载配置信息。如下图所示,即 Broker 启动时会通过某个第三方服务,比如ZooKeeper、etcd、某个名字服务等来加载配置。当需要变更配置时,直接往第三方服务写入新的配置即可,Broker会自动监听到配置的变更。

由于动态配置在技术上的实现方式比较多,这里我们就介绍两种常用的。一种是 基于第三方组件,另一种是 基于本地文件

第三方组件,我们以ZooKeeper的实现为例。

基于ZooKeeper是指我们可以在ZooKeeper创建持久节点存储配置信息,然后Broker通过ZooKeeper 提供的 Watch 机制来监听节点。当节点内容变更时可以及时感知,并进行后续的处理。

从实现上来看,我们可以在ZooKeeper上创建 /config 节点,在 /config 下创建Cluster、Broker、Topic 3 个节点,分别来保存集群、节点、Topic 的 3 个维度的配置信息。然后Broker监听这 3 个节点的变更来执行后续的配置变更操作。ZooKeeper的目录结构可以如下设计:

从运行机制上来看,Broker 监听ZooKeeper有以下两种思路:

  1. Broker 会通过 ZooKeeper 的Watch机制监听 /config 下的每个节点,感知到节点内容的变化,再进行后续的操作,比如更新配置。
  2. Broker 会监听一个专门通知配置更新的节点,比如/config/notification,然后根据这个通知节点的内容,再去判断哪个配置发生了变更。

这两种思路的主要区别在于: 第一种方案变更了配置后,Broker会立刻监听到,立即生效。第二种方案允许先变更配置而不立即生效,在需要生效的时候再给/config/notification写入通知数据,触发配置生效。

从灵活性来看,第二种思路会更合理,能满足更多场景。至于代码实现,你可以参考这个文档 Example usage for ZooKeeper Watcher

基于本地文件的思路是指在代码实现中,通过代码技巧监听本地配置文件的变更,只要本地文件变更了,就进行配置变更的操作。这种方案不需要依赖第三方组件,实现也简单,所以在很多场景中都会这么实现。缺点是,如果是分布式集群,配置变更就得变更所有的Broker节点上的配置,比较繁琐,流程也很长。一般都需要额外的系统来辅助变更,比如运营发布系统或Broker内置一个配置变更的接口供远程调用。

以上两种方案,从技术上看,基于第三方组件会更合理。但是在消息队列里面,因为需要保持架构的简洁度,基于本地文件也是一种常用的方案。比如Kafka和Pulsar就是基于ZooKeeper来实现的动态配置,因为架构中已经集成了ZooKeeper。RocketMQ 的Nameserver是一个缓存组件,没有实际的存储和Watch机制,无法实现类似ZooKeeper的效果,所以用的是热加载本地文件的方案。

接下我们来看一下集群和节点的元数据一般会包含哪些信息,以及如何存储。

集群和节点元数据的格式和存储

集群元数据

集群元数据是用来保存集群维度的一些基本信息。最简单的集群元数据一般只需要包含集群ID和集群名字两个信息。集群ID用来唯一标识这个集群,集群名称让我们可以直观识别集群的用途,所以集群的元数据结构一般如下所示:

{
  "ClusterId":"bmfursdfk",
  "ClusterName":"Trade"
}

集群维度的元数据需要持久化存储,所以我们可以在 ZooKeeper 上创建持久化节点 /Cluster 来存储集群的的元数据。

因此集群初始化的流程是:Broker 启动时检查 ZooKeeper的 /Cluster 节点是否创建,以确保集群已经经过了初始化。一般是集群中的第一台Broker节点启动时,会触发集群初始化的逻辑。初始化的逻辑会自动生成一个集群ID,然后根据配置好的集群名称,一起写入到 /Cluster 节点里面。

节点元数据

节点元数据是用来保存 Broker 维度的一些基本信息。Broker 元数据一般至少要包含节点的唯一标识BrokerID、节点的IP、节点监听的端口 3 个字段。

值得一提的是,有的消息队列会用“ 节点IP + 节点监听端口”二元组来标识Broker,而不是通过BrokerID。但是在容器化和云上的CVM环境中,IP 分配是随机的。当节点销毁后,重新分配出来的节点的IP可能会是一样的。从而在一些极端场景下,可能发生客户端识别集群错误,从而出现异常的情况。

所以,这里还是建议你使用BrokerID来唯一标识集群。所以Broker节点的元数据结构一般如下所示:

{
  "BrokerID":1, //BrokerID的类型可以是Int型,也可以是字符串型,区别不大。
  "BrokerIP":"192.2.1.1",
  "BrokerPort":8901
}

从技术上看,节点的元数据存储一般有这样两个思路:

  • 所有Broker元数据都存储在一个ZooKeeper Node,比如/node,节点内容如下:
[
    {
        "BrokerID": 1,
        "BrokerIP": "192.2.1.1",
        "BrokerPort": 8901
    },
    {
        "BrokerID": 2,
        "BrokerIP": "192.2.1.2",
        "BrokerPort": 8901
    },
    {
        "BrokerID": 3,
        "BrokerIP": "192.2.1.3",
        "BrokerPort": 8901
    }
]
  • 每个Broker元数据独立一个 ZooKeeper Node 存储,比如 /nodes/broker1、/nodes/broker2 等,节点内容如下:
{
    "BrokerID": 1,
    "BrokerIP": "192.2.1.1",
    "BrokerPort": 8901
}

因为Broker数量不会很多,一个集群大概是百或千的量级,所以如果从Broker数量来看,这两个方案的区别不大。目前业界主要使用的是第二个思路,主要的原因是: Broker元数据分开存储方便管理,避免节点间相互影响,也可以避免单个ZooKeeper Node的数据量过大存不下。

所以在持久化存储方面,我们可以在ZooKeeper上创建持久化的 /brokers 节点,然后在这个节点下为每台Broker创建名称为BrokerID的临时节点,用来存储Broker的元数据。所以 Broker 在集群中的元数据的存储结构就如下图所示:

Broker 初始化的流程是:Broker启动时会在 /Brokers 下面创建名称为自身BrokerID的临时节点,然后写入自己的元数据。Broker 异常时,会自动删除注册的节点。

接下来,我们来看一下如何在一个空的集群中支持Topic和分区。

在集群中支持Topic和分区

我们在前面的课程中讲到Topic和分区是消息队列集群的一个基本概念。不知道你有没有过这样的疑问:消息队列一定要有Topic和分区的概念呢?

一定要有Topic和分区吗?

从业界主流消息队列来看,RocketMQ、Pulsar、Kafka都有Topic的概念。而RabbitMQ 只有Queue,没有Topic概念,但是RabbitMQ中的Exchange + Route起到的就是Topic的作用。消息发送到Exchange 中后,会根据配置的路由关系,将数据发送到不同的Queue中。

从性能角度来看,如果只有一个分区,就会有性能瓶颈,无法提供水平扩容的能力。此时就需要在前面包一层概念,让它来组织多个分区,这就是Topic。

所以如下图所示,在消息队列中 Topic和分区是必须的概念,Topic是用来组织分区的逻辑概念分区用来存储实体的消息数据

接下来看一下,Topic和分区的元数据都包含哪些信息呢?

Topic和分区的元数据

从功能上来看,Topic 至少需要TopicID、Topic名称、分区和副本在集群中的分布3个元素。我们用一个简单的JSON格式字符串来表示Topic的元数据信息。

{
    "TopicID": "shlnjdlfsakw",
    "TopicName": "test",
    "Replica": [
        {
            "Partition": 0,
            "Leader": 0,
            "Rep": [
                0,
                1
            ]
        }
    ]
}

可以看到,在上面的示例中,TopicID、TopicName、Replica分别表示Topic的唯一ID、Topic的名称以及分区副本的分布信息。其中 Replica 里面的 Partition 表示当前是第几个分区,Leader表示分区的Leader是哪台Broker,Rep表示副本是分布在哪些Broker上的。

我们知道Topic的信息是需要持久化保存的,那如何存储这些元数据呢?

元数据的持久化存储

在基于 ZooKeeper 的架构中,我们可以在ZooKeeper 集群中创建一个持久化的节点来存储Topic的相关信息。

如下图所示,我们可以在ZooKeeper的根目录创建持久节点 /topics 存储Topic信息,然后为Topic1、Topic2、Topic3这 3 个Topic分别创建/topics/topic1、/topics/topic2、/topics/topic3 节点来存储对应Topic的元数据。

在集群启动时,就会为每一个分区选举出来一个Leader,然后更新对应的Leader字段信息。当节点出现变动时,也会触发Leader节点切换流程和更新Leader节点信息的逻辑。

Leader切换、创建Topic和分区,以及如何生成分区和副本分布关系的流程,你可以参考 第15讲,这节课不再重复。

接下来我们来看一下消费分组的进度是如何管理和保存的。

消费分组(订阅)的进度管理

消费进度主要有两个关联操作:

  1. 消费者根据消费分组名称来获取分区的消费进度信息。
  2. 消费者在消费分组维度提交分区的消费进度信息。

从功能上来看,获取消费进度的频率比较低,一般初始化的时候拉取一次,而更新Offset是每次消费请求都要执行的,所以消费进度是一个低查询、高更新的操作。围绕着这些特性,我们先来看一下消费进度的存储格式应该是什么样子的。

消费进度的存储格式

前面的课程中我们讲过,消费进度的信息是在消费分组记录的,一个消费分组可以消费多个Topic。所以我们应该根据“ 消费分组 + Topic + 分区号”三元组来记录消费进度。

假设消费分组 group1 同时消费 Topic1、Topic2、Topic3 三个Topic,这三个Topic的分区数量分别为3个、1个、2个。那么消费位点的存储格式可以如下所示:

group1,Topic1,0,0
group1,Topic1,1,0
group1,Topic1,2,0
group1,Topic2,0,0
group1,Topic3,0,0
group1,Topic3,1,0

在上面的实例中,每一行分别表示消费分组、Topic、分区号、消费进度四个信息。

从存储格式上来看,一般存储的格式会用 JSON格式或者自定义的行格式(行格式也可以是二进制格式),用JSON格式和行格式存储的示例如下所示:

// 存储格式:
{
  "GroupName":"group1",
  "TopicName": "topic1",
  "Partition":0,
  "Offset":0
}

// 行格式
group1,topic1,0,0

两种格式的区别主要在空间占用和搜索效率两个方面。所以在设计存储格式的时候,就需要考虑这两个事情:

  1. 搜索效率高。 设计好合适的存储数据结构,比如哈希表、B树等。这里细节太多,就先不展开,有需要的话我们在课后进行讨论。
  2. 不要占用太多额外的存储空间。 这一点考虑的就是存储的格式定义,比如JSON或二进制格式。

从功能和开发成本上看,两者的区别不大。从可读性上,个人建议直接用JSON格式就可以了。 主要考虑是这个消费位点的数据量不会很大,JSON的可读性较好。即使用JSON格式,占用的额外空间也不会太大。

接下来,我们来看一下消费进度是如何保存的。

消费进度的保存机制

从技术上来看,最常用的保存消费进度的思路有存ZooKeeper、存本地文件、存内部的Topic、存其他存储引擎4种思路。下面我们逐一分析下。

存ZooKeeper是当前架构下一种最简单直观的方法。 通过在 ZooKeeper 中为每个消费分组创建一个持久化的节点,比如/groups/group1,来保存每个消费分组的消费进度信息。

因为 ZooKeeper 自带了分布式存储和一致性,所以从功能实现上是最简单直接的。但是当消费分组或者消费者数量很多时,就会占用较多的 ZooKeeper 存储空间。另外消费进度提交时都需要频繁对ZooKeeper节点进行更新,这样会给 ZooKeeper 造成较大的压力,从而容易使ZooKeeper集群高负载,导致集群异常。

存本地文件也是一种常用的思路。 这种方案需要选择合适的数据结构来存储数据,以便进行高效的写入和更新。一般情况下,如果实现优雅,性能则不用担心。另外存本地文件,数据在本地硬盘存储,存储容量一般是够的。

这个方案最大的问题是,消费进度的数据文件不是分布式存储的。当单机节点损坏或单机故障时,就会导致无法读取消费进度数据或者消费进度丢失。为了解决这个问题,我们就需要在不同节点上同步消费进度数据,并保持消费数据的一致性。

这里你可以想一下,要在不同节点同步文件数据,并且保持文件数据的一致性,是不是和分区副本的模型很像?从技术上来看,是一样的。所以,就有了将消费进度数据存储在某一个内部的Topic的思路。

存内部的Topic是指在集群中创建一个特殊名字的Topic,比如consumer_group_offset。此时这个Topic不允许普通用户读写,只允许用来保存消费进度。

如下图所示,我们知道分区的数据是顺序存储的,并且消息队列的分区是没有搜索和更新的能力的。此时就有一个问题,消费进度如何读取和更新呢?

从功能特性上来看,一个消费分组在一个分区维度的消费进度的值只有一个,即最新的那个值。

所以有一种思路是: 提供有压缩功能的Topic,即Topic支持根据消息的Key对消息进行压缩。比如消费分组 group 消费Topic1的0分区的进度的消息Key为group1-topic1-0,Value为消费位点,比如100。根据消息的Key进行压缩,只保留最后一条数据。此时提交Offset的时候就不需要更新,只需要将最新的消费进度的数据 “group1-topic1-0:101” 写入到Topic中。

依托Topic的压缩特性,只保留最后一条数据,就可以让数据量大幅度减少。再配合一些编码技巧,如缓存、索引、二分查找等,就能大大提升获取当前消费分组的位点信息的速度。

存其他存储引擎是指将消费进度存储在其他的存储引擎中,比如MySQL、Redis或其他第三方引擎等。这种方案本质上和存ZooKeeper的思路是一样的。遇到的问题也一样,主要是性能和稳定性方面的不足。和存ZooKeeper最大的区别在于,如果在消息队列Broker上采用这个方案,就得单独为存储消费进度信息而引入一个存储组件,这样会增加系统复杂度,还要考虑第三方组件的稳定性问题。

一般将消费进度存储到第三方引擎的场景是在消费者客户端。即消费者不使用消费分组来管理消费进度,而是自定义管理消费进度信息,此时可能会将消费进度存储到第三方存储引擎中,以保证消费进度不丢失。

从选择上来看,目前Kafka的消费位点保存方案最开始用的是存ZooKeeper,后来用的是存内部的Topic的方案。RocketMQ用的是存本地文件的方案。Pulsar用的是存其他存储引擎BookKeeper的方案,Pulsar能用这种方案的原因在于它是存算分离的架构,天然自带了存储引擎BookKeeper,因此不会因为存储消费进度而引入额外的复杂度和成本。

总结

这节课的元数据都是以必备的、最基础的角度来设计的。随着业务功能的增加,元数据就会不断丰富。所以主流消息队列的元数据,因为功能和架构的不同都会很复杂,但是基础的信息都是一样的。

集群和Broker元数据是构成消息队列集群的基础。最基础的集群元数据主要包含集群ID和集群名称两个信息。最基础的Broker主要包含 BrokerID、BrokerIP、BrokerPort 三个信息。只要有这几个信息,就能构建成一个消息队列集群。

Topic是消息队列最基本的概念,从理论上来说没它不行。它的主要功能是给分区提供一个横向扩容的能力,并用来组织多个分区的逻辑关系。最基础的Topic和分区的元数据主要包含TopicID、Topic名称、分区和副本在集群中的分布信息三部分。在分布信息中还会包含分区号、分区Leader、副本的分布3个子信息。

消费进度一般以“消费分组 + Topic + 分区号”三元组来记录消费进度。记录的格式一般有JSON格式和自定义行格式两种。从功能和开发成本来看,一般情况下JSON格式就够了。

消费进度的保存机制主要有存ZooKeeper、存本地文件、存内部的Topic、存其他存储引擎四种思路。它们在不同的架构中,优缺点不一样,根据当前的架构选择合适的方案即可。从业界主流消息队列来看,四种方案都有在用。

思考题

假设将元数据存储服务从 ZooKeeper 换成MySQL,元数据的增删改查应该是怎样的一个流程?

期待你的分享,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!

上节课思考闭环

为什么在去ZooKeeper的路上选择了可插拔的元数据存储框架,而不是去掉第三方存储引擎?

在我看来,引入其他元数据存储引擎只能缓解,不能根治,在ZooKeeper上遇到的问题,在其他引擎上也会遇到。并且引入其他引擎后,架构的部署成本也没有降低,复杂度依旧很高。我认为,比较好的方式是去掉第三方元数据引擎,在Broker集群内部实现元数据的自我组织和管理。

那么为什么Pulsar还会走可插拔的路径呢?

在我看来和开发成本有关,因为去掉第三方元数据存储依赖是一件工作量非常非常大的事情,需要对架构做一个非常大的改动,会消耗社区大量的人力,不适合Pulsar当前快速发展的阶段。使用可插拔框架的思路,改动量较小,可以引入 etcd 这样的性能较高的引擎来缓解当前遇到的问题,引入单机的RocksDB来降低集群的部署复杂度,引入内存存储来降低单机版本的部署成本等等,总之会带来蛮多好处。有兴趣的话,你可以看一下这篇文章 《Apache Pulsar 轻装上阵:迈向轻 ZooKeeper 时代》,对照着 Kafka 500 的KIP来看,你还可以对这两个方案做进一步的思考,感悟会更深。