05 存储:消息数据和元数据的存储是如何设计的?
你好,我是文强。今天我们讲消息队列的存储模块。
存储模块作为消息队列高吞吐、低延时、高可靠特性的基础保证,可以说是最核心的模块。从技术架构的角度来看,存储模块包含 功能实现和性能优化 两个方面,我们今天先聊存储模块的功能设计和实现。
上节课我们讲过,存储模块的主流程是数据的写入、存储、读取、过期。读写、持久化存储是基本功能,但因为消息队列独有的产品特性,主要被用来当缓冲分发,它的数据存储是临时的,数据持久化存储后,在一定的时间或操作后,需要能自动过期删除。
那对于消息队列这样有特殊需求的存储模块,我们在实现功能的时候要注意哪些事情呢?带着这个问题,我们开始今天的学习。
首先,一个前置信息你要清楚,消息队列中的数据一般分为 元数据和消息数据。元数据是指Topic、Group、User、ACL、Config等集群维度的资源数据信息,消息数据指客户端写入的用户的业务数据。下面我们先来看元数据信息的存储。
元数据信息的存储
元数据信息的特点是数据量比较小,不会经常读写,但是需要保证数据的强一致和高可靠,不允许出现数据的丢失。同时,元数据信息一般需要通知到所有的Broker节点,Broker会根据元数据信息执行具体的逻辑。比如创建Topic并生成元数据后,就需要通知对应的Broker执行创建分区、创建目录等操作。
所以元数据信息的存储,一般有两个思路。
- 基于第三方组件来实现元数据的存储。
- 在集群内部实现元数据的存储。
基于第三方组件来实现元数据的存储是目前业界的主流选择。 比如Kafka ZooKeeper版本、Pulsar、RocketMQ 用的就是这个思路,其中Kakfa和Pulsar的元数据存储在ZooKeeper中,RocketMQ存储在NameServer中(准确说是存储在Broker+NameServer中,后面会详细说明)。
这个方案最大的优点是集成方便,开发成本低,能满足消息队列功能层面的基本要求,因为我们可以直接复用第三方组件已经实现的一致性存储、高性能的读写和存储、Hook机制等能力,而且在后续集群构建中也可以复用这个组件,能极大降低开发难度和工作成本。
但也有缺点。引入第三方组件会增加系统部署和运维的复杂度,而且第三方组件自身的稳定性问题会增加系统风险,第三方组件和多台Broker之间可能会出现数据信息不一致的情况,导致读写异常。
另一种思路, 集群内部实现元数据的存储是指在集群内部完成元数据的存储和分发。 也就是在集群内部实现类似第三方组件一样的元数据服务,比如基于Raft协议实现内部的元数据存储模块或依赖一些内置的数据库。目前Kafka 去ZooKeeper的版本、RabbitMQ的Mnesia、Kafka的C++版本RedPanda用的就是这个思路。
这个方案的优缺点跟第一个正好相反。优点是部署和运维成本低,不会因为依赖第三方服务导致稳定性问题,也不会有数据不一致的问题。但缺点是开发成本高,前期要投入大量的开发人力。
总的来说,当前主流选择第一种方案,主要是出于开发成本考虑。
用第三方组件导致的稳定性问题,大部分可以通过后期的运维、运营、编码技巧来解决规避或降低发生频率,但如果前期开发成本太大、架构太复杂,会影响项目的成型和业务的使用。所以在项目前期,大部分都会选择这个方案。
如果消息队列核心架构已成熟或者前期允许有较大投入,我才会建议你选择第二种方案。因为第一种方案虽然开发成本较低,但其使用成本、机器资源成本、运维成本还是偏高,另外,一些稳定性问题,比如元数据不一致,因为第三方组件的存在是无法根治的,会有长久的隐患。
消息数据的存储
了解了元数据,接下来我们讲消息数据的存储。一般情况下,消息队列的存储主要是指消息数据的存储,分为存储结构、数据分段、数据存储格式、数据清理四个部分。
数据存储结构设计
我们先看数据存储目录结构设计。在消息队列中,跟存储有关的主要是Topic和分区两个维度。用户可以将数据写入Topic或直接写入到分区。
不过如果写入Topic,数据也是分发到多个分区去存储的。所以从实际数据存储的角度来看, Topic和Group不承担数据存储功能,承担的是逻辑组织的功能,实际的数据存储是在在分区维度完成的。
从技术架构的角度,数据的落盘存储也有两个思路。
- 每个分区单独一个存储“文件”。
- 每个节点上所有分区的数据都存储在同一个“文件”。
特别说明下,这里的“文件”是一个虚指,即表示所有分区的数据是存储在一起,还是每个分区的数据分开存储的意思。在实际的存储中,这个“文件”通常以目录的形式存在,目录中会有多个分段文件。接下来讲到的文件都是表示这个意思。
第一个思路,每个分区对应一个文件的形式去存储数据。具体实现时,每个分区上的数据顺序写到同一个磁盘文件中,数据的存储是连续的。因为消息队列在大部分情况下的读写是有序的,所以 这种机制在读写性能上的表现是最高的。
但如果分区太多,会占用太多的系统FD资源,极端情况下有可能把节点的FD资源耗完,并且硬盘层面会出现大量的随机写情况,导致写入的性能下降很多,另外管理起来也相对复杂。Kafka在存储数据的组织上用的就是这个思路。
具体的磁盘的组织结构一般有“目录+分区二级结构”和“目录+分区一级结构”两种形式。不过从技术上来看,没有太大的优劣区别。
目录+分区二级结构:
├── topic1
│ ├── partrt0
│ ├── 1
│ └── 2
└── topic2
├── 0
├── 1
目录+分区一级结构:
├── topic1-0
├── topic1-1
├── topic1-2
├── topic2-0
├── topic2-1
└── topic2-2
第二种思路,每个节点上所有分区的数据都存储在同一个文件中,这种方案需要为每个分区维护一个对应的索引文件,索引文件里会记录每条消息在File里面的位置信息,以便快速定位到具体的消息内容。
因为 所有文件都在一份文件上,管理简单,也不会占用过多的系统FD资源,单机上的数据写入都是顺序的,写入的性能会很高。缺点是同一个分区的数据一般会在文件中的不同位置,或者不同的文件段中,无法利用到顺序读的优势,读取的性能会受到影响,但是随着SSD技术的发展,随机读写的性能也越来越高。如果使用SSD或高性能SSD,一定程度上可以缓解随机读写的性能损耗,但SSD的成本比机械硬盘高很多。
目前RocketMQ、RabbitMQ和Pulsar的底层存储BookKeeper用的就是这个方案。
这种方案的数据组织形式一般是这样的。假设这个统一的文件叫commitlog,则commitlog就是用来存储数据的文件,.index是每个分区的索引信息。
那怎么选择呢? 核心考虑是你对读和写的性能要求。
- 第一种方案,单个文件读和写都是顺序的,性能最高。但是当文件很多且都有读写的场景下,硬盘层面就会退化为随机读写,性能会严重下降。
- 第二种方案,因为只有一个文件,不存在文件过多的情况,写入层面一直都会是顺序的,性能一直很高。但是在消费的时候,因为多个分区数据存储在同一个文件中,同一个分区的数据在底层存储上是不连续的,硬盘层面会出现随机读的情况,导致读取的性能降低。
不过随机读带来的性能问题,可以通过给底层配备高性能的硬件来缓解。所以当前比较多的消息队列选用的是第二种方案,但是 Kafka 为了保证更高的吞吐性能,选用的是第一种方案。
关于FD的占用问题。Linux上的FD数是可以配置的,比如配置几十万个FD没问题,所以我们一般不会用完系统的FD限制,这一点在实际的落地中不需要太担心。
但是不管是方案一还是方案二,在数据存储的过程中,如果单个文件过大,在文件加载、写入和检索的时候,性能就会有问题,并且消息队列有自动过期机制,如果单个文件过大,数据清理时会很麻烦,效率很低。所以,我们的消息数据都会分段存储。
消息数据的分段实现
数据分段的规则一般是根据大小来进行的,比如默认1G一个文件,同时会支持配置项调整分段数据的大小。看数据目录中的文件分段示意图。
从技术上来看,当数据段到达了规定的大小后,就会新创建一个新文件来保存数据。
如果进行了分段,消息数据可能分布在不同的文件中。所以我们在读取数据的时候,就需要先定位消息数据在哪个文件中。为了满足这个需求,技术上一般有 根据偏移量定位或根据索引定位 两种思路。
根据偏移量(Offset)来定位消息在哪个分段文件中,是指通过记录每个数据段文件的起始偏移量、中止偏移量、消息的偏移量信息,来快速定位消息在哪个文件。
当消息数据存储时,通常会用一个自增的数值型数据(比如Long)来表示这条数据在分区或commitlog中的位置,这个值就是消息的偏移量。
在实际的编码过程中,记录文件的起始偏移量一般有两种思路:单独记录每个数据段的起始和结束偏移量,在文件名称中携带起始偏移量信息。因为数据是顺序存储的,每个文件记录了本文件的起始偏移量,那么下一个文件的起始偏移量就是上一个文件的结束偏移量。
如果用索引定位,会直接存储消息对应的文件信息,而不是通过偏移量来定位到具体文件。
具体是通过维护一个单独的索引文件,记录消息在哪个文件和文件的哪个位置。读取消息的时候,先根据消息ID找到存储的信息,然后找到对应的文件和位置,读取数据。RabbitMQ和RocketMQ用的就是这个思路。
这两种方案所面临的场景不一样。 根据偏移量定位数据,通常用在每个分区各自存储一份文件的场景;根据索引定位数据,通常用在所有分区的数据存储在同一份文件的场景。因为在前一种场景,每一份数据都属于同一个分区,那么通过位点来二分查找数据的效率是最高的。第二种场景,这一份数据属于多个不同分区,则通过二分查找来查找数据效率很低,用哈希查找效率是最高的。
接下来,我们继续看消息数据的存储格式,看看每行记录长什么样子,都存储了哪些信息。
消息数据存储格式
消息数据存储格式一般包含消息写入文件的格式和消息内容的格式两个方面。
消息写入文件的格式指消息是以什么格式写入到文件中的,比如JSON字符串或二进制。从性能和空间冗余的角度来看,消息队列中的数据基本都是以二进制的格式写入到文件的。这部分二进制数据,我们不能直接用vim/cat等命令查看,需要用专门的工具读取,并解析对应的格式。
比如,我们想查看Kafka消息数据存储文件中的数据,如果用cat命令查看是乱码,用日志解析工具kafka.tools.DumpLogSegments查看,才是格式化的数据。
# cat 00000000000000000000.log
>f0z�sl{]�sl{���������������xlobo
# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 LogAppendTime: 1681268702091 size: 74 magic: 2 compresscodec: NONE crc: 1714453217 isvalid: true
| offset: 0 LogAppendTime: 1681268702091 keysize: 2 valuesize: 4 sequence: -1 headerKeys: [] key: xu payload: lobo
消息内容的格式是指写入到文件中的数据都包含哪些信息。 对于一个成熟的消息队列来说,消息内容格式不仅关系功能维度的扩展,还牵涉性能维度的优化。
如果消息格式设计得不够精简,功能和性能都会大打折扣。比如冗余字段会增加分区的磁盘占用空间,使存储和网络开销变大,性能也会下降。如果缺少字段,则可能无法满足一些功能上的需要,导致无法实现某些功能,又或者是实现某些功能的成本较高。
所以,在数据的存储格式设计方面,内容的格式需要尽量完整且不要有太多冗余。
听起来有点抽象,我们分析一下Kafka和RocketMQ的消息内容格式设计,让你对具体的数据存储格式有更直观的感受。
以一个具体的消息内容截图为例,我们看Kakfa的V2版本存储格式的内容。
看每个字段的含义。可以看到,Kafka的消息内容包含了业务会感知到的消息的Header、Key、Value,还包含了时间戳、偏移量、协议版本、数据长度和大小、校验码等基础信息,最后还包含了压缩、事务、幂等Kafka业务相关的信息。
特别说明,因为Kafka支持Batch特性,所以消息格式中还包含base和last等Batch相关信息。
再看RocketMQ的存储格式的内容。
看每个字段的含义。RocketMQ的存储格式中也包含基础的Properties(相当于Kafka中的Header)、Value、时间戳、偏移量、协议版本、数据长度和大小、校验码等信息,还包含了系统标记、事务等RocketMQ特有的信息,另外还包含了数据来源和数据目标的节点信息。
对比看,消息数据的存储格式虽然没有统一的规范,但是一般包含通用信息和业务信息两部分。通用信息主要包括时间戳、CRC、消息头、消息体、偏移量、长度、大小等信息,业务信息主要跟业务相关,包含事务、幂等、系统标记、数据来源、数据目标等信息。前面讲过,消息队列的数据在持久化存储后,需要在一定策略后自动过期删除。那消息队列的数据过期机制如何实现呢?
消息数据清理机制
消息队列中的数据最终都会删除,时间周期短的话几小时、甚至几分钟,正常情况一天、三天、七天,长的话可能一个月,基本很少有场景需要在消息队列中存储一年的数据。
消息队列的数据过期机制一般有手动删除和自动删除两种形式,从实现上看主要有三种思路。
- 消费完成执行ACK删除数据
- 根据时间和保留大小删除
- ACK机制和过期机制相结合
消费完成执行ACK删除数据,技术上的实现思路一般是 : 当客户端成功消费数据后,回调服务端的ACK接口,告诉服务端数据已经消费成功,服务端就会标记删除该行数据,以确保消息不会被重复消费。ACK的请求一般会有单条消息ACK和批量消息ACK两种形式。
因为消息队列的ACK一般是顺序的,如果前一条消息无法被正确处理并ACK,就无法消费下一条数据,导致消费卡住。此时就需要死信队列的功能,把这条数据先写入到死信队列,等待后续的处理。然后ACK这条消息,确保消费正确进行。
这个方案,优点是不会出现重复消费,一条消息只会被消费一次。缺点是ACK成功后消息被删除,无法满足需要消息重放的场景。
根据时间和保留大小删除指消息在被消费后不会被删除,只会通过提交消费位点的形式标记消费进度。
实现思路一般是服务端提供偏移量提交的接口,当客户端消费成功数据后,客户端会回调偏移量提交接口,告诉服务端这个偏移量的数据已经消费成功了,让服务端把偏移量记录起来。然后服务端会根据消息保留的策略,比如保留时间或保留大小来清理数据。一般通过一个常驻的异步线程来清理数据。
这个方案,一条消息可以重复消费多次。不管有没有被成功消费,消息都会根据配置的时间规则或大小规则进行删除。优点是消息可以多次重放,适用于需要多次进行重放的场景。缺点是在某些情况下(比如客户端使用不当)会出现大量的重复消费。
我们结合前两个方案,就有了 ACK机制和过期机制相结合的方案。实现核心逻辑跟方案二很像,但保留了ACK的概念,不过ACK是相对于Group概念的。
当消息完成后,在Group维度ACK消息,此时消息不会被删除,只是这个Group也不会再重复消费到这个消息,而新的Group可以重新消费订阅这些数据。所以在Group维度避免了重复消费的情况,也可以允许重复订阅。
纵观业界主流消息队列,三种方案都有在使用,RabbitMQ选择的是第一个方案,Kafka和RocketMQ选择的是第二种方案,Pulsar选择的是第三种方案。不同消息队列的方案选择,主要都是考虑架构设计和组件开发时业务场景的影响。我个人觉得第三种比较合理。
前面我们虽然反复提到“删除”,但数据实际怎么删除也有讲究。
我们知道消息数据是顺序存储在文件中的,会有很多分段数据,一个文件可能会有很多行数据。那么在ACK或者数据删除的时候,一个文件中可能既存在可删除数据,也存在不可删除数据。如果我们每次都立即删除数据,需要不断执行“读取文件、找到记录、删除记录、写入文件”的过程,即使批量操作,降低频率,还是得不断地重复这个过程,会导致性能明显下降。
当前主流的思路都是 延时删除,以段数据为单位清理,降低频繁修改文件内容和频繁随机读写文件的操作。
只有该段里面的数据都允许删除后,才会把数据删除。而删除该段数据中的某条数据时,会先对数据进行标记删除,比如在内存或 Backlog 文件中记录待删除数据,然后在消费的时候感知这个标记,这样就不会重复消费这些数据。
总结
消息队列的存储分为元数据存储和消息数据存储两方面。
元数据的存储主要依赖第三方组件实现,比如ZooKeeper、etcd或者自研的简单元数据存储服务等等。在成熟的消息队列架构中,基于简化架构和提升稳定性的考虑,都会考虑在集群内部完成元数据的存储和管理。
消息数据的存储在功能层面包含数据存储结构设计、数据分段存储、数据存储格式、数据清理机制四个方面。
消息数据的存储主要包含Topic和分区两个维度。Topic起逻辑组织作用,实际的数据存储是在分区维度完成的。所以在数据存储目录结构上,我们都以分区为最小粒度去设计,至于选择每个分区单独一个存储文件,还是将每个节点上所有分区的数据都存储在同一个文件,方案各有优劣,你可以根据实际情况去选择。
因为大文件存在性能和资源占用、数据清理成本等问题,一般情况下,我们都需要对数据文件进行分段处理,分段的策略一般都是按照文件大小进行的。
数据存储格式可以分为基础信息和业务信息两个维度,数据格式需要遵循极简原则,以达到性能和成本的最优。
数据的过期策略一般有三种,ACK删除、根据时间和保留大小删除数据、两者结合。目前业界的实现比较多样,从选择上来看,两者结合的方案更合理。
思考题
如果让你从头实现一个消息队列的存储模块,你的思考路径是什么?
欢迎分享你的方案,如果觉得有收获,欢迎你把这节课分享给身边的朋友。我们下节课再见。
上节课思考闭环
假如你的团队需要开发一款新的消息队列,你需要完成网络模块的选型开发设计,你的思考路径是什么?
1. 你要了解这款消息队列需要满足什么场景,比如消息、流、IOT等。
2. 理解目标场景的业务形态,比如IOT就需要管理大量连接,消息就需要尽量保证低延时,流的话就需要考虑吞吐问题等等。
3. 根据业务特点分析出技术架构的瓶颈和难点。
4. 考虑技术语言的选型问题,用哪种语言合适,比如Java、Go、Rust、C++等。这点应该结合技术需要和团队本身的技术栈来思考选择哪种语言。
5. 理解这个语言当前网络编程的相关库,网络库、网络库框架,并且调研该语言主流网络编程技巧。
6. 基于理解的网络模块编程思想,结合网络库去实现网络模块。
7. 在最后需要设计压测场景,利用自研或开源的压测工具,最后完成性能和稳定性验证。