Skip to content

32 消息查询:如何实现消息查询功能?

你好,我是文强。

这节课我们来讲讲在消息队列中如何实现消息查询。

从功能上来看,消息队列的核心功能是生产和消费,查询并不是它的主要工作,但在一些场景中用户还是需要对消息进行查询。最常见的场景是: 用户觉得某条消息丢了,需要查询这条消息是否保存在Broker中 此时你会怎么做呢? 除此之外,还有哪些场景会用到消息查询的功能呢?这节课我们就重点解决这两个问题。

什么时候会用到消息查询

首先,我们来看下面两个行格式和JSON格式的消息数据示例。它们主要包含时间戳、消息位点、消息ID、消息Key、消息内容等 5 个部分。

  • Nginx日志
timestamp:1691711859099
messageId:kvhnfdskui
offset:1
key:空
value:
66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1" 404 177 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
  • JSON格式的日志
timestamp:1691711859099
messageId:vkjfikdsfd
offset:2
key:c816991f-adfe-4617-8cf3-9997aea90ded
value:
{
"@timestamp": 1.64880350063659E9,
"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
"pod_name": "tke-es-687995d557-n29jr",
"namespace_name": "default",
"pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"host": "10.0.96.47",
"container_name": "nginx",
"docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba"
}

所以基于上面的消息数据,从用户视角看,一般就有以下 4 种查询需求:

  1. 根据消息位点(Offset)信息查询消息。
  2. 根据某个时间点或时间范围查询消息。
  3. 根据消息ID查询消息。
  4. 根据消息Key或消息中的某个内容查询消息。

另外我们在 第22讲 讲过,消息轨迹数据需要导入到第三方引擎进行存储和查询。如果能在Topic 中实现查询功能,那么就能省掉轨迹数据导出和第三方引擎的成本。这也是消息队列内核内置查询功能的收益之一。

接下来我们来看一下关于消息队列支持查询的几个理论型知识点。

消息队列支持查询的理论基础

先来回顾一下我们在 第05讲 讲过的消息数据的存储结构。

消息数据存储结构

我们知道,消息队列的底层是分段存储,也知道底层存储结构有每个分区单独一个存储“文件”和每个节点上所有分区的数据都存储在同一个“文件”这两个方案。因为从消息查询的角度来看,两种方案的技术实现基本一致,所以接下来我们就基于方案一来分析一下消息查询的实现。

下面是基于方案一实现的分区维度的底层分段数据结构图。可以看到,一个分区由多个数据段组成,比如 Offset 从 1~10000 的数据段,Offset 从 10000~30000 的数据段。

每个数据段里面的数据由多个数据组成,每个数据又由消息位点和消息组成。

那怎么在这个数据结构上进行数据查询呢?从技术上来看,核心是 数据索引的构建。所以接下来我们就来看一下为什么需要索引?以及如何构建索引?

关于索引的一些知识点

问题1:为什么一定要构建索引呢?

从技术上看,查询的核心就是 速度,不能每次都去遍历所有的数据,也不能直接从硬盘中检索数据,因为基于硬盘搜索数据的性能很低。所以构建索引最主要的原因是,可以把数据加载进内存,提高数据的处理效率。

问题2:如何构建索引?

在我看来构建索引需要思考两个点,一是索引不能占用太多内存空间,二是索引元素的增加、删除、检索的时间复杂度要尽量低。

所以构建索引的核心就是选择合理的数据结构存储索引数据。从业界来看,MySQL用的是“B+树”来构建索引,Elasticsearch使用的是“倒排索引”,从而支持高性能、复杂的查询。消息队列查询的核心也是设计合理的数据结构来构建索引,以满足对应的查询需求。

不过索引是需要持久化存储的,我们应该怎么持久化存储索引呢?

索引的持久化存储肯定是存储在文件上的,而消息数据底层是分段存储的。所以如下图所示,从技术上看,在索引的构建上有 所有段文件构建一个索引每个数据段构建索引 两个思路。

技术实现上,这两种方案都会用到,根据不同的场景来选择合适的方案即可。至于如何选择,我们在后面会细讲。

接下来我们来看一下内核是如何支持查询的。

内核支持简单查询

先来看看内核一般会支持哪些类型的查询,分为以下 3 种:

  1. 根据Offset查询,即根据Topic、分区、Offset去查询到对应的消息。
  2. 根据时间戳查询,即根据时间戳去查询到这个时间戳对应的消息,或根据时间戳范围查询到这个范围内的消息。
  3. 根据消息ID查询,即根据消息ID去查询到这个消息ID对应的消息。

根据 Offset 查询数据

从功能上看,根据Offset查询数据一般是根据 Topic、分区、Offset 三元组 来查询数据。基于上述的存储模型可以知道,此时分区的存储结构已经是一个多级索引,具体说是一个三级索引。

  • 第一级:Topic。因为每个Topic的数据单独存储。
  • 第二级:分区。因为每个分区的数据单独存储。
  • 第三级:Offset索引。构建Offset和具体文件位置的索引。

不难看出,我们的主要工作是实现第三级索引。从上面讲到的理论基础可以知道,其实就是选择合适的数据结构来存储索引。那是不是上来就引入B+树、倒排索引、红黑树呢?

当然不是。我们需要先分析当前我们的底层存储和数据的特征,然后结合需求来实际分析如何实现。

我们知道消息队列底层是 顺序存储的模型,所以Offset是顺序递增的。 简单理解,数据已经是一个天然顺序了,基于一个天然顺序的数据来做检索就非常简单,直接引入二分查找就可以。

所以我们索引文件的数据结构实现就很简单。

如上图所示,我们可以用顺序链表来存储索引数据。每个链表节点存储消息位点(Offset)和消息所在的文件的位置(Position)两个元素。基于顺序链表,如果要搜索某个Offset的数据,直接使用二分查找(折半查找)即可,查找的时间复杂度为O(logn),顺序链表的插入的时间复杂度为O(1)。

因为要持久化存储,所以文件中索引数据的格式可以用下面的格式进行存储。

offset: 1 position: 10
offset: 10 position: 20
offset: 20 position: 55
offset: 30 position: 70
offset: 300 position: 90

在上面的示例中,不知道你是否注意到了Offset和Position是单向递增的,但都不是连续的,这是为什么呢?

你可以试想一下,假设我们有10亿条数据,按照上面的设计应该也有10亿个索引节点。以此类推,如果数据更大,索引数据会占用大量的存储空间,所以我们在顺序链表的基础上,可以引入 跳跃表 来节省空间。

如上图所示,引入跳跃表的主要思路是按照一定的间隔跳跃着保留中间元素。当检索数据时,通过二分算法找到离目标最近的前一个跳跃表元素。如果恰好是需要寻找的元素,就直接返回,否则就往后遍历数据找到数据。

举个例子说明,比如我们要找到上图中的元素22,流程就是:

  1. 通过二分算法找到离22最近的前一个跳跃表元素20,得到20对应的Offset=20和Position=55,查找的时间复杂度为O(logn)。
  2. 因为步骤1找到的节点不是我们需要的22,所以我们向后遍历两个元素就可以找到数据22,这一步理论的时间复杂度为O(n)。但是我们可以通过控制两个跳跃表索引元素之间的节点数量,来降低时间复杂度。比如我们固定为间隔10,此时时间复杂度就是一个常量10,所以从算法来看,这个时间复杂度是可以忽略的。

引入跳跃表结构后,通过 牺牲一定的时间复杂度换取了空间复杂度的大幅度降低,是一个蛮推荐的方案。

最后再来看一下内存占用率的使用控制,先来算个数据。

假设每个数据段都有1千万条数据,那么引入跳跃表前占用的内存空间是:

2个int型的容量 * 1000w =8byte * 1000w ~=76MB。

引入跳跃表后占用的内存空间是:

2个int型的容量 * 20w =8byte * 20w ~=1.52MB

因此可知,引入索引不会占用太多的内存空间,引入跳跃表后还可以大幅度降低对内存空间的占用。接下来我们来看看如何实现根据时间戳进行查询。

根据时间戳查询数据

从需求上看,根据时间戳查询主要有以下两种查询场景:

  1. 根据时间戳查询到指定位置的数据。
  2. 根据时间戳范围查询到范围内的数据。

消息队列内核一般只支持第一种场景。因为从技术上看,构建高性能的范围查询是一个复杂的话题,涉及到的知识点非常多。从消息队列内核的角度看,从技术上也能实现,但是代码复杂度会提高很多,要增加大量内核开发维护成本。所以目前消息队列内核支持第二种场景的比较少。

如下图所示,从技术上看,消息队列实现根据时间查询的思路是:先根据时间戳找到对应的Offet,然后再根据Offset查询到对应的数据。

所以技术上的核心思路是: 构建时间戳和Offset对应的索引。

因为消息数据是根据时间戳递增存储的,所以时间戳和Offset索引也是基于顺序链表构建的。链表节点由毫秒时间戳和消息位点组成。

同样的因为需要持久化存储,所以底层索引文件内容格式可以如下所示:

timestamp: 1691236897071 offset: 744267032
timestamp: 1691236898193 offset: 744267036
timestamp: 1691236899167 offset: 744267040
timestamp: 1691236899752 offset: 744267044
timestamp: 1691236900204 offset: 744267048

现在你应该会发现,时间戳和Offset也不是连续的。时间戳不连续是合理的,因为可能有的时间没有数据。但是Offset 理论上应该是连续的,不连续的原因和其实上面一样,主要是为了节省空间引入了跳跃表的实现。

基于上面的设计,我们来看一个示例。假设我们需要根据时间戳来找到某条数据实,流程就是:

  1. 通过二分算法查找到最近的前一个时间戳,获取到它对应的Offset,时间复杂度为O(logn)。
  2. 根据这个Offset去读取文件,遍历后续的数据找到大于等于这个时间戳的的数据,读取数据。

在这个模型下,如果想根据时间范围进行查询,有一个思路是:当实现第一种场景后,可以查询两次时间戳。然后根据查询返回开始Offset和结束Offset,去读取需要的数据。

最后再来看看如何实现根据消息ID查询数据。

根据消息 ID 查询数据

消息ID和前面讲到的时间戳、Offset,最大的不同是 消息 ID 是无序的。如果能保证所有消息ID是有序的,那查询的实现思路和前面两种就是一样的。但要构建一个全局有序的消息ID生成器,复杂度太高了,一般不会这么做。

按照上面的思路,我们应该通过构建消息ID和Offset组成的二元索引来完成查询需求,索引的格式可以是下面这样子。即根据消息ID找到对应的Offset,然后再根据Offset找出消息内容。

msgID:dfangjfjhs offset:10
msgID:mbvnjdlfjd offset:21
msgID:otidfkjifd offset:33
msgID:ddnbklfdid offset:40

从技术上来看,根据消息ID查询数据有简单实现和复杂实现两种方案。

简单实现的本质是, 结合消息队列本身底层顺序存储的特征而设计的一种取巧的方法。思路如下:

  1. 客户端通过SnowFlake算法生成唯一的消息ID。
  2. 查询的时候根据消息ID反解析出消息ID对应的时间戳,因为SnowFlake算法中有一部分数据是时间戳。
  3. 结合这个时间戳去搜索消息。

这个方案的好处是,几乎不需要开发工作量,流程简单通用。缺点是可能会误判,即消息明明存在,但是却搜索不出来。因为消息ID在客户端生成,SnowFlake算法的时间戳也是客户端的时间,所以在一些异常或延时消息的场景中,数据写入Topic或分区的时间和客户端发送出来的时间相差很大,从而导致根据SnowFlake解析出的客户端时间无法查询到消息。

复杂方案则是是 基于哈希表、B+树、红黑树等数据结构来构建消息ID和Offset的索引,同时保证在索引元素添加和获取的时候的时间复杂度较低,从而满足查询需求。因为根据消息ID查询消息的需求是很固定的,所以我会建议你使用哈希表来构建索引,因为哈希索引结构能够实现高效的消息查询。

业界主流消息队列RocketMQ就是基于哈希表来构建消息ID和Offset的索引的。我们简单来看一下底层的实现原理,大概分为以下4点:

  1. RocketMQ 的索引存储在 IndexFile(索引文件)中,通过使用哈希索引结构来构建索引。
  2. 在 IndexFile 中,消息 ID 被哈希成一个固定长度的 Key。这个 Key 通过哈希函数映射到一个哈希槽(Slot)上。哈希槽里存储的是该 Key 对应消息在 CommitLog 中的物理偏移量。
  3. RocketMQ 使用 开放寻址法(Open Addressing)来解决哈希冲突问题。当不同的 Key 映射到相同的哈希槽时,会根据预设的步长(step)逐个检查其他槽位,直到找到一个空闲槽位。
  4. 同时哈希索引结构为每个 Key 维护一个链表,用于将 Key 映射到多个物理偏移量(例如当一个消息发送到多个队列时)。 这种哈希索引结构使得 RocketMQ 在查询消息时能够通过 Key 快速定位到对应的哈希槽,再根据物理偏移量找到实际消息,从而提高查询效率。

从技术上来看,原理是比较清晰易懂的,但更重要的是代码如何实现。鉴于这节课主要是关注实现思路,这块我们就不展开了,有兴趣的话你可以去网上搜索相关资料或者留言与我讨论。

你应该发现了上面三种都是简单固定的查询场景,而如果要实现复杂查询怎么办呢?

借助第三方工具实现复杂查询

从实际使用的角度来看,消息队列支持复杂查询主要有第三方引擎支持查询和工具化简单查询两种思路。

第三方引擎支持查询

第三方引擎支持查询是指引入第三方查询引擎,将Kafka的数据导入到下游引擎,依赖引擎的能力来实现复杂的数据查询,比如Kafka + Hive、Kafka + Elasticsearch、Kafka + Trino(以前的Presto SQL)。

如下图所示,上面这几个方案的思路都是一样的。核心思路都是先将数据清洗处理变成结构化的数据,然后导入到下游的查询引擎中,然后依赖搜索引擎的复杂检索能力来提供检索服务。

这几个方案的主要区别在于使用方式、支持的查询能力、资源部署的成本不一样。比如搜索引擎是否能够自主从Kafka拉取数据,还是需要依赖一个独立的组件导入数据,但是从结果上来看都是基本可以满足复杂查询需求的。

从具体落地使用上来看,上面这三种方案业界用得都比较多。从选择上来看,一般业务都是根据自己当前所拥有的资源来决定使用哪一个。比如我当前已经有一套Elasticsearch了,我就选择Kafka + Elasticsearch,举一反三。这么选择也是因为这几个方案之间的差别不是特别大。

不过这个方案有一个前提是,需要Kakfa里面的数据都是规范的格式化的数据,或者需要经过数据清洗格式化后才能导入到下游的引擎中。

从技术上看,消息队列数据规范格式化需要依赖Schema来实现,我们在下一节课会细讲Schema。而数据清洗和导入下游引擎,还需要依赖连接器来实现,我们也会在后面的课程中展开细讲。

接下来看看什么是工具化简单查询。

工具化简单查询

工具化简单查询是指我们自定义编码去消费数据,然后在代码里面加过滤条件,从而实现查询。

这个方案直观看上去,存在性能低、时间复杂度高、不灵活等严重缺点,但在实际场景中它是非常实用的。

例如,我们在出问题的时候,偶尔需要根据消息内容或者消息Key模糊查询数据是否在集群中。这个需求非常常见,但是使用频率低。

从技术上看,标准方案是把数据清洗后导入到下游引擎,然后查询。这个方案的缺点就是成本太高、太复杂,因为使用频率太低,就有一种杀鸡用牛刀的感觉,综合讲 ROI 太低了。所以面对这种场景,我们就可以使用这种思路。

它的核心是:我们在运营平台支持根据时间范围、消息Key,对内容进行模糊查询的功能。底层的实现就是上面说的消费+过滤的方案。从使用上来看,虽然慢一点,但是也能满足需求,在运营和 ROI 层面来看是非常实用的。

总结

查询是消息队列的辅助功能,使用的场景和频率不高。

从功能上看,一般会支持按消息位点查询、按时间戳查询、按消息ID查询、按消息Key或消息内容模糊查询四种场景。

查询的核心是索引的构建。因为消息队列底层顺序存储的特性,在按消息位点查询和按时间戳查询的场景中,基于顺序存储和二分查找就可以快速实现数据的检索。

而根据消息ID查询有简单方案和复杂方案两种。简单方案是根据消息ID反解析出时间戳,然后根据时间戳去查询消息,缺点是精准度不够。标准方案是使用比如哈希表等数据结构来构建索引,实现精准查询,缺点是实现成本较高,但是能实现高效的精准查询。

复杂的查询不应该在内核中支持,因为这会导致内核逻辑复杂,增加开发和维护的成本。复杂的查询要交由专业的查询引擎来支持,即将数据导出到下游的Elasticsearch、Hive中,基于这些引擎的查询能力实现复杂查询。

从实际业务上来看,工具化简单查询是一个非常实用的技巧。只需要在运营端开发功能,不需要内核的任何改动。在使用频率较低的复杂查询场景中,使用的ROI会特别高。

思考题

在消息轨迹的场景中,我们是把轨迹信息存在Topic中的。此时结合我们这节课说到的查询能力,请你思考一下,在内核中我们是如何实现消息轨迹的存储和查询的?大致流程是怎样的?

欢迎分享你的答案,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!

上节课思考闭环

在你当前的业务中,有哪些场景需要用到死信队列和优先级队列?

以电商场景来举例。

1. 在订单处理的场景,如果订单的某个字段丢失,导致这个订单无法被处理,此时就需要先忽略这个订单,继续处理后续的订单。面对这个情况,就可以在消费的时候启用死信队列,将无效的消息写入到死信队列中。

2. 同样是在订单场景,在快递派送流程希望VIP客户优先派送,此时就需要根据客户的城市、属性、VIP级别等信息对客户进行分级。然后在往下游管道生成快递信息的时候,附上优先级信息。此时下游会根据优先级拿到需要处理的快递列表,从而保证优先级高的订单被优先处理。