16 集群:如何构建分布式的消息队列集群?(下)
你好,我是文强。
我们接着上节课的内容,继续来看如何构建集群,我们先来看元数据存储服务的设计选型。在消息队列的集群架构中,元数据存储服务的选型和实现是整个架构设计的核心,其他模块的设计实现都是围绕着元数据存储服务来展开的。
元数据存储服务设计选型
在上节课我们讲到过,业界主要有基于第三方存储引擎和集群内部自实现元数据存储两种方案。我们先来分析一下这两种方案的具体实现。
基于第三方存储引擎
这个方案最重要的一件事就是 组件选型。
从技术上来看,一般只要具备可靠存储能力的组件都可以当作第三方引擎。简单的可以是单机维度的内存、文件,或者单机维度的数据库、KV存储,进一步可以是分布式的协调服务ZooKeeper、etcd等等。
正常来讲,大多数消息队列只会支持一种存储引擎,因为一种存储引擎基本够用了。但是也会有如 Pulsar 支持插件化的元数据存储服务,用来简化不同场景下的部署成本,比如单机运行、测试集成、现网部署等等。
从分布式的角度来看,单机维度的存储能满足的场景有限,也会有单机风险。所以业界主流的分布式消息队列都是选用分布式的协调服务,如 ZooKeeper、etcd 等来当集群的元数据存储服务。所以基于第三方存储引擎的集群架构图一般是下面这样子。
如图所示,这是一个由单独的元数据存储集群和多台Broker节点组成的消息队列集群。Broker 连接上Metadata Service 完成节点发现、探活、主节点(Controller)选举等功能。 其中Controller 的角色是由某一台 Broker 兼任的。
在图中你可以看到 Controller 和 Metadata Service是分开的,各自承担着不同的职能。Controller 是无状态的,因为它不负责保存数据,只负责计算逻辑。所以在这种情况下,一般就会让集群中的某台Broker来承担Controller的功能。当这台Broker挂了后,可以依赖元数据存储服务把 Controller 切换到新的 Broker。因为它是无状态的,所以切换是非常快的。
多说一句,在这个架构图中,如果你把元数据服务替换成ZooKeeper,这个架构就是Pulsar和Kafka了。如果你把元数据服务换成 NameServer,这个架构就是RocketMQ了。
不过你会发现,如果使用这个方案,集群最少得有6个节点,这会导致部署成本、运维复杂度变高。那有没有可能简化架构呢?我们继续来看集群内部自实现元数据存储的方案。
集群内部自实现元数据存储
从技术上看,可以通过在多台 Broker 的进程中实现分布式的元数据存储,从而解决依赖第三方组件的一些弊端。整体架构如下图所示:
从技术实现来看,主要有三个思路:
- 直接在 Broker 内部构建一个小型的元数据存储集群来提供服务。
- 通过某些可以内嵌到进程的小型的分布式存储服务来完成元数据的存储。
- 通过某些可以内置的单机持久化的服务,配合节点间的元数据同步机制来完成元数据的存储。
第一种方案需要在Broker中实现一个元数据集群。这个元数据集群和Broker集群最大的差别在于它只需要承担单个集群的元数据管理存储,数据量和规模很小,集群一般不需要扩容。所以这个集群适合使用“通过配置发现节点的方案”来构建集群。Kafka 的 KRaft 架构用的就是这种方案。
第二种方案是利用某种可以内嵌到进程的存储服务来存储元数据,比如Mnesia或RocksDB。如果是单机的存储引擎,比如RocksDB,那么主要适用于单机部署的场景。单机存储引擎的方案如果要实现元数据的集群化,那么就得在节点之间实现相互同步数据的机制,这个就相对复杂许多。而如果是分布式的存储引擎,如Mnesia,那么就简单许多,几乎没有工作量,直接调用存储引擎的接口存储元数据即可。
第三种方案是在节点上部署一个持久化的单机存储引擎,如RocksDB等。然后在Broker内维护节点间的元数据数据的一致性。这种方式也是一种实现比较简单的方案,开发难度低于第一种方案,高于第二种方案。
从业界实现来看,目前第一种和第二种方案都有在使用。第三种方案主要用在单机模式下,问题是要维护多个节点的存储服务之间的数据一致性,有一定的开发工作量,并且保持数据强一致比较难。
总结来看,在集群中实现元数据服务的优点是,后期架构会很简洁,不需要依赖第三方组件。缺点是需要自研实现,研发投入高。而如果使用独立的元数据服务,因为是现成的组件,产品成型就会很快,这也是当前主流消息队列都是依赖第三方组件来实现元数据存储的原因。所以当前主流消息队列的架构如下所示:
接下来我们就用实际案例结合前面这些基础知识点,来看一下 ZooKeeper、Kafka是如何构建集群的,先来看一下 ZooKeeper 。
ZooKeeper的集群构建
ZooKeeper 是一个分布式的数据协调服务,本质上是一个简单的、分布式、可靠的数据存储服务。核心操作就是数据的写入、分发、读取和 Hook。从客户端看,主要操作就是写入和读取。从服务端看,主要操作就是集群构建、数据接收、存储、分发和 Hook。
在集群构建上,它会事先在配置中定义好集群中所有节点的IP列表。然后集群启动时,会在这些节点之间进行选举,经过多数投票机制,选举出一个Leader节点,从而构建成为集群。在单节点上,集群构建相关的配置一般如下所示,配置中会包含所有节点信息。
在节点启动的时候,节点之间就会两两进行通信,触发投票。然后根据票数的多少,基于多数原则,选择出一个Leader出来。当 Leader 节点发生宕机或者增加节点时,就会重新触发选举。
多数投票是一个经常用到的投票机制,即某个节点获得票数超过可投票的节点的一半后,就可以当选为Leader。 从实现角度,一般是通过集群中节点之间的通信和间隔随机投票的机制来完成投票,以保证能够在短时间内完成选举。
当选举完成后,Leader 会主动给各个Follower节点发送ping-pong请求,以确定节点是否还活着。当Follower心跳异常时,就会剔除该节点,当集群中可用的节点数少于总节点数的一半,就会选举不出Leader,从而导致集群异常。
因为ZooKeeper只是一个数据存储服务,并没有很多管控操作,Leader节点就负责数据的写入和分发,Follower 不负责写入,只负责数据的读取。当Leader收到操作请求时,比如创建节点、删除节点、修改内容、修改权限等等,会保存数据并分发到多个Follower,当集群中有一半的Follower返回成功后,数据就保存成功了。当Follower收到写入请求时,就把写入请求转发给Leader节点进行处理。
因为功能和定位上的差异,ZooKeeper上是没有Controller和元数据存储的概念的。它是比较典型的基于固定配置构建集群的方式。
Kafka的集群构建
我们在前面课程中说过,目前主流消息队列集群主要是基于第三方组件来构建的。而 Kafka 正是这种方案的典型实现,接下来我们就来看一下 Kafka 基于 ZooKeeper 和基于 KRaft 构建集群的两种实现方式。先来看一下基于 ZooKeeper 的构建思路。
基于ZooKeeper的集群
在这种架构中,Kafka 将 ZooKeeper 作为节点发现和元数据存储的组件,通过在 ZooKeeper 上创建临时节点来完成节点发现,并在不同的节点上保存各种元数据信息。
如图所示,Broker 在启动或重连时,会根据配置中的 ZooKeeper 地址找到集群对应的ZooKeeper 集群。然后会在 ZooKeeper 的 /broker/ids 目录中创建名称为自身BrokerID的临时节点,同时在节点中保存自身的 Broker IP 和 ID 等信息。当 Broker 宕机或异常时,TCP连接就会断开或者超时,此时临时节点就会被删除。如下两张图,分别是 /broker/ids 节点下的结构和BrokerID中的内容。
注册完这些信息后,节点发现就算完成了。节点之间的探活依赖 ZooKeeper 内置的探活机制,前面讲过,这里不再赘述。接下来来看一下 Kafka 中的 Controller。
在Kafka中,Controller是一个虚拟概念,是运行在某台Broker上的一段代码逻辑。集群中需要确认一台Broker 承担 Controller的角色,那 Controller 是怎么选出来的呢?我们来看一看。
Kafka 的 Controller 选举机制非常简单,即在 ZooKeeper 上固定有一个节点 /controller。每台Broker启动的时候都会去 ZooKeeper 判断一下这个节点是否存在。如果存在就认为已经有Controller了,如果没有,就把自己的信息注册上去,自己来当Controller。集群每个Broker 都会监听/Controller 节点,当监听到节点不存在时,都会主动去尝试创建节点,注册自己的信息。 哪台节点注册成功,这个节点就是新的 Controller。
Controller 会监听 ZooKeeper 上多个不同目录,主要监听目录中子节点的增加、删除,节点内容变更等行为。比如会通过监听 /brokers/ids 中子节点的增删,来感知集群中 Broker 的变化。即当Broker 增加或删除时,ZooKeeper 目录中就会创建或删除对应的节点。此时Controller 通过 Hook 机制就会监听到节点发生了变化,就可以拿到变化节点的信息,根据这些信息,触发后续的业务逻辑流程。
Kafka 集群中每台 Broker 中都有集群全量的元数据信息,每台节点的元数据信息大部分是通过Controller 来维护的,比如Topic、分区、副本。当这些信息发生变化时,Controller就会监听到变化。然后根据不同的Hook(如创建、删除Topic等),将这些元数据通过TCP调用的形式通知给集群中其他的节点,以保持集群中所有节点元数据信息是最新的。
接下来我们用Topic的创建流程,来串联一下集群中的Controller的集群管理和元数据存储。
如图所示,Kafka 创建Topic有两种形式(图中1和2),即通过 Broker 来创建和通过 ZooKeeper来创建。当调用 Broker 创建Topic时,Broker 会根据本地的全量元数据信息,算出新Topic的分区、副本分布,然后将这些数据写入到 ZooKeeper。然后 Controller 就会Hook到创建Topic的行为,更新本地缓存元数据信息,通知对应的Broker节点创建分区和副本。 所以,也可以通过直接生成计划然后、写入到ZooKeeper的方式来创建Topic。
接下来来看一下基于 KRaft 构建的 Kafka 集群。
基于KRaft的集群
如下图所示,从架构的角度,基于KRaft 实现的Kafka集群做的事情就是将集群的元数据存储服务从 ZooKeeper 替换成为内部实现的 Metadata 模块。这个模块会同时完成 Controller 和元数据存储的工作。
我们前面讲过, 集群元数据需要分布式存储才能保证数据的高可靠。所以 Kafka KRaft 架构的 Metadata 模块是基于 Raft 协议实现的 KRaft,从而实现元数据可靠存储的。
因为 Kafka的 Metadata 模块只需要完成元数据存储,所以它的设计思路和 ZooKeeper 是一样的,是主从架构。即通过在配置文件中配置节点列表,然后通过投票来选举出Leader节点。这个节点会承担集群管控、元数据存储和分发等功能。
Metadata 模块的配置如下所示。即通过配置项 controoler.quorum.votes 配置允许成为 Controller 的节点列表,然后这些节点之间会通过投票选举出Leader节点,这个Leader会完成Controller和元数据存储的工作。这个Leader相当于基于 ZooKeeper 版本中的Controller和ZooKeeper的Leader。
所以在这个版本架构的实现中,就只有Controller了,然后 Controller 自带了元数据存储的功能。所以架构上就会变成了下面这个样子。
如上图所示,Broker 之间通过投票选举出来的Leader节点就是Controller。此时所有 Broker 都会和 Controller 保持通信,以维护节点的在线状态,从而完成节点发现。当 Controller 发现Broker增加或异常时,就会主动执行后续的操作。
所以,从链路来看,这个架构简化了通过监听 ZooKeeper 来发现节点变更的流程,链路更短,稳定性更高。和基于 ZooKeeper 的架构一样,每台 Broker 依旧有集群全量的元数据信息,这些元数据信息的维护也是通过Controller完成的。
接下来,我们来看一下 KRaft 架构下创建 Topic 的流程,来看下图:
这里因为没有 ZooKeeper,所以创建 Topic 只有通过Broker创建的方式。通过 Admin SDK 调用Broker创建Topic,如果 Broker 不是 Controller,这个请求就会转发到当前的Controller上。Controller 会根据本地的元数据信息生成新Topic的分区、副本的分布,然后调用对应的Broker节点完成分区和副本数据的创建,最后会保存元数据。
其他的操作,如删除Topic、修改配置、创建ACL等操作的流程是一样的。更多细节你可以去看一下官方的KIP, KIP-500。
讲到这里,你会发现基于KRaft的Kafka架构比基于ZooKeeper架构简单清晰非常多,操作链路也短很多。这样可以解决基于 ZooKeeper 架构中一些难以解决的问题,如集群可承载分区数量上限较低、缓存不一致等等。
总结
目前,消息队列的主流实现方式都是依赖第三方组件来完成数据存储,常见的有ZooKeeper、etcd等。为了简化架构,我们还可以通过在集群内自建元数据存储服务来替代第三方组件,虽然需要研发投入,但从架构长期演进的合理性来看,我是推荐这种方式的,毕竟后期架构会很简洁。
ZooKeeper 集群的组件,是基于配置文件中指定集群中其他节点的IP地址和端口来实现节点发现的,属于单播发现机制。这种方式的缺点就是扩容需要修改配置、重启集群。所以,还有一种通过配置多播地址和端口来实现集群发现的方式,其好处是可以动态发现节点,属于单播的一种升级,目前Elasticsearch和消息队列RabbitMQ都属于多播的实现。
从Kafka的集群构建来看,基于独立的元数据存储服务,会导致架构复杂和引入缓存不一致等问题。集群内部实现元数据存储,可以简化架构,避免不一致。从技术合理性来看,我更建议你使用内置元数据存储的方案。
思考题
请你简单描述一下 Kafka 集群中修改配置/权限操作的流程?
期待你的分享,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!
上节课思考闭环
基于上面的集群方案,如果要实现分区缩容你会怎么做?你觉得需要注意哪些事情?
大体流程如下:
1. 客户端发送缩容分区的请求给Controller,请求参数中包含要缩容哪个Topic、缩容几个分区。
2. Controller 判断请求是否合理,计算出哪些分区可以缩容。
3. Controller 向可以缩容的分区所在的Broker节点发送删除分区请求。
4. 当删除分区成功后,Controller更新Topic分区相关的元数据。
这里需要注意的是,一旦删除分区后,数据就会被删除,此时数据就无法恢复了。所以在删除分区的时候,要考虑分区中的数据如何处理,比如迁移走或者先禁止写入、支持消费,然后等分区中的数据都过期后再删除。