Skip to content

22 可观测性:如何设计实现消息轨迹功能?

你好,我是文强。

今天我们讲可观测性的第三部分:跟踪(Traces),在消息队列领域,Traces通常被称为消息轨迹,意思是消息在系统中的运行轨迹。消息轨迹在业务消息方向是刚需,因为业务消息如果出现丢失,大概率会导致流程异常。

比如订单的快递运送流程没有正确处理一条订单号的消息,后续就会出现无法跟踪这个订单派送的问题。如果你负责过消息队列基础服务的日常值班,一定会经常接到用户反馈说消息丢了,想让平台查清楚为什么丢了。这个问题,你会怎么排查呢?

如果想排查清楚,我们首先要搞清楚有没有丢,如果丢了,还要搞清楚是哪一个环节丢的。想弄清楚这两个问题,就必须有消息轨迹的功能。那我们今天就来讲一讲消息轨迹功能怎样设计。

丢消息是怎么回事?

到底什么是“丢消息”?你的第一印象是不是服务端把我的消息弄丢了。其实在消息队列里面,丢消息没这么简单。

从广义上来讲,消息队列的丢消息是指生产了一条消息,但是预期中的消费者却没有消费到这条消息。但是这个过程,不是只有一种服务端弄丢了消息这一个场景。我们结合消息的生命周期,来看看可能有哪些情况。

  • 生产消息失败,生产端把消息写入到服务端时失败了,这时有可能是服务端有异常,也有可能是消息的内容或格式不符合规范,比如太长了。
  • 消息设置了延时属性,消息进入了延时队列后,没有在定义的延时时间内出队列,从而没有在特定的时间执行,错过了消息的有效处理时间,被业务抛弃。
  • 消息在服务端存储完成后,因为服务端一致性算法(强一致、最终一致)的设定问题,或者异步刷盘、服务端重启、消息截断之类的行为,导致持久化的消息丢失了。
  • 一般消息队列的消费模型是根据Offset来定位消费位点或者重置消费位置的。如果消费端设置了错误的位点,比如下一条应该消费位点100的消息,但消费端因为异常忽略了100,直接从101开始消费。从服务端的角度,100的消息就没有被消费过,而消费者看来,100的消费就是没被消费到,此时从消费端的视角,消息就是丢失了。
  • 当消费端拉到消息后,如果业务处理有异常,经过了多次处理后依旧失败,消息进入了死信队列,除非业务配置了订阅消费死信队列的消息,否则这条消息的生命周期就算结束了。此时消费端还是没有消费到消息,消息可以算丢失了。

所以从生产消费的流程看,消息的生命周期是很长的,只要其中有一步出现问题,从业务角度看,就会出现类似消息丢失的感知。 而消息轨迹就是为了还原轨迹,跟踪单条消息的运行周期。

不过要定位到单条消息生命周期和运行轨迹,我们得先有消息的唯一标识。

消息的唯一标识

消息的唯一标识,一般是指消息的唯一ID,和OpenTelemetry Traces中的Trace ID是同一个东西。

业界主流消息队列对消息ID的实现有两种形式。一种是明确有消息ID概念的,一条消息一定会有一个对应的消息ID,比如RocketMQ。

另一种没有明确消息ID概念的,会用另外的形式来标识这是一条消息。比如RabbitMQ的消息ID是Header中的一个可选项,不是必传,如果不传就没有消息ID的概念。Kafka唯一标识一条消息,是通过Topic + Partition + Offset来实现的,严格意义上来讲,这种没有明确ID的消息队列,无法唯一标识一条消息队列。

一条消息的生命周期是从客户端发出来的时候开始算的,所以发送出来的时候,就应该赋予消息一个唯一ID,这才能真正表达消息的唯一。一旦在服务端赋予唯一ID,因为客户端可能会重复发送同一条数据,服务端就会认为是两条数据,生成两个ID。所以, 消息ID的生成一般是在客户端SDK自动生成或者业务手动生成指定的

唯一ID的生成方式,业界主要有四种。

  • 最简单的就是UUID,通过各个语言的UUID库生成一个唯一的字符串。
  • 集中式的ID生成方案,通过某个中央引擎,比如MySQL 自增列、Redis 自增值、ZooKeeper自增值等机制保证全局唯一。
  • SnowFlake算法,通过Twitter开源的SnowFlake算法生成唯一ID。
  • 自定义算法:通过自定义的算法生成唯一的ID。

在消息队列领域,我们主要用的是后两种,因为消息队列的消息一般很大,几千亿几万亿级别,UUID有一定概率会重复,集中式的ID生成会增加SDK复杂度,基本不会用。SnowFlake算法是一个成熟的算法,拿来即用。自己设计算法,保证不重复也可以,和SnowFlake的区别只在于是否是一个成熟开源的算法。所以从技术选择来看,我比较倾向于第三种方案,因为经过工业界的验证,并且表现稳定。

现在有了唯一ID,我们就可以开始记录消息轨迹了,那在设计消息轨迹实现的时候,应该考虑哪些因素呢?

消息轨迹的设计应该注意什么?

从技术实现来看,消息轨迹的设计分为三步:记录、存储、查询。记录指在合适的地方记录轨迹信息,比如生产端消息发出的时间。存储指将这些轨迹信息存储到某个引擎中。查询指要能支持消息查询。

那在这三步在设计的时候,我们应该注意哪些点呢?

记录轨迹,核心诉求就是 能在合适的地方记录轨迹信息,达到排查问题的效果。关键节点,比如生产端发出消息、生产端接收到服务端的返回、服务端收到消息、进/出延时队列、服务端看到消息被消费、消费端记录到消费到消息、消费端记录处理成功/失败、消息进入到死信队列等等。

记录轨迹 不能影响主流程的稳定性和性能。首先不能影响性能,比如生产流程正常耗时10ms,记录轨迹就不能花费5ms,要尽量减少对流程性能的影响,最好是无感知。另外记录消息的方式不能影响主流程的稳定性,因为轨迹是旁路,偶尔丢失轨迹是允许的,但是不能影响到主流程。

记录之后的存储、查询是强相关的。存储主要是为查询服务的,因为消息轨迹是一个写多读少的模型,我们选择 存储引擎,写入性能一定要足够优秀,不然会影响消息轨迹的写入存储,影响轨迹查询的及时性。

其次从成本角度考虑,虽然现在硬盘和物理机越来越便宜,但是因为消息轨迹对比消息有几倍的放大,比如上面RabbitMQ的例子就有9倍的放大,在成本结构中存储的成本占比是最大的。所以首先 消息轨迹的内容要足够精简,最精简的可以只包含两个字段——消息ID、时间,就能满足需求(内容格式上节课我们已经讨论了)。

另外,轨迹存在的意义就是查询,所以 引擎要能支持多维度的查询。以RabbitMQ为例,因为RabbitMQ在正常情况下是没有消息ID的,一般需要根据时间、Vhost、Exchange、Channel、Connection、Queue等多个维度复合查询,所以引擎必须支持多维度的查询,不能是KV类型的存储引擎。

但是支持多维度并不意味着越多越好,引擎满足必要的查询需求就可以了,不要有太多额外的能力。因为我们一般会通过索引或某种数据结构的形式来满足查询需求。复杂的查询需求,存储成本就会相应升高。而消息轨迹的查询需求虽然可能多维度,但是场景是固定的,只要设计上能满足这些查询场景就够了。

消息轨迹的实现方案设计

掌握了设计要点,就要用在技术的选型和实现上了。不过光讲知识点可能不好记,我们从一个具体案例开始,假设要设计一个RabbitMQ消息轨迹的技术方案。

  1. 预计所有集群总节点规模在几百个左右。
  2. 每个集群主要承载业务消息,消息量不是特别大。
  3. 集群不需要记录死信队列、延时队列的进出信息,只需要记录生产者、服务端、消费者三者的生产、接收、消费的时间。
  4. 需要支持多维度的查询条件,比如Vhost、Exchange、Channel、Connection、Queue、RouteKey中的一个或多个字段的组合等值查询,同时需要支持Headers、Body的模糊查询。
  5. 成本要求较为宽松,基本不做限制。

对于这样的项目背景,你会怎么设计呢?非常欢迎留言把你的方案分享出来,接下来带着你的思考,我们进入具体的设计方案分析。

我们讲过消息队列是富客户端的应用,客户端SDK需要有较重的逻辑,比如重试、事务、批量发送等等。所以从消息队列的角度看,完整的消息轨迹包含客户端和服务端两个部分。

客户端轨迹数据记录

客户端的轨迹记录一般有两种实现形式。一种是客户端将轨迹数据写入到本地文件,通过第三方组件采集到轨迹系统;另一种是客户端将轨迹数据通过Broker提供的TCP/HTTP接口,或者通过内置主题的方式,写入到集群中的某个主题中。

我们使用消息队列一般会用标准SDK访问集群,加上客户端部署的方式和位置无法控制,如果用第一种方案,客户端的轨迹数据很难实现统一的采集上报,所以一般会选择第二种方案,在SDK中内置轨迹上报的逻辑,记录轨迹信息并发送到服务端。

TCP/HTTP服务收到轨迹数据后,需要保存轨迹数据,保存逻辑和服务端轨迹数据的保存是一样的,我们一起看下。

服务端轨迹数据记录

正常情况下服务端保存轨迹数据,有四种形式:本地文件、内置 Topic、第三方服务、单机维度的存储引擎。

  • 本地文件

在链路跟踪或正常的系统架构中,选择本地文件存储,然后采集上报到存储引擎的方式,基本所有的系统都这样用过。

优点就是使用方便快捷、体系成熟,因为业界的ELK体系很成熟,所以数据采集、上报、存储、分析一条流很常用。缺点是链路较长,成本较高,并且每台节点都需要部署 Agent 采集数据,运维相对复杂。

  • 内置Topic

内置Topic把轨迹数据发送到消息队列的某个内置主题中,第三方系统消费内置主题中的轨迹数据,写入到持久存储引擎中。

优点是复用了消息队列内置的缓存能力,不需要依赖第三方存储,存储成本较低,同时也可以通过部署远程的消费者集群(跟分布式指标采集系统的原理一样)来消费轨迹数据,然后存储,避免了单机运维的复杂度。缺点是需要单独开发一个单独服务,去消费轨迹数据并存储,开发成本较高。

  • 第三方服务

直接将轨迹数据存入到第三方存储中,是指记录轨迹数据的时候,直接把数据发送到ES等第三方系统。

优点是简单直接,链路较短,运维复杂度较低。缺点是记录轨迹链路的稳定性依赖于第三方系统,很容易因为第三方系统的抖动,导致主链路不稳定。所以这种方式业界用得比较少,但是在一些数据量较小、业务不是特别重要的场景下,还是有人用。

  • 单机维度的存储引擎

把轨迹数据直接存储在各个节点上的文件系统,或部署在该节点上独立的存储服务中。

优点是链路很短,成本很低,不需要额外的比如Kafka、Elasticsearch的成本。缺点是查询麻烦,如果单机挂掉,轨迹数据就会丢失。

以上是从成本角度考虑的。前面我们讲过消息轨迹虽然很重要,但漏记是允许的,只要保证大部分情况下能用就行,允许短时间不可用。

从使用者的角度,如果查询的频率较低,投入很多成本,从ROI来看收益不高。所以一些系统在设计时,就会考虑复用单机的存储和检索能力。比如每台Broker部署一个RocksDB服务,数据存储到每台Broker的RocksDB上,需要检索的时候,依次访问每台Broker上的服务去查询消息。

总的来说, 四种方案都有人用,选择的依据在于对成本和可靠性的考量。第一、二种用得比较多,比如RocketMQ和RabbitMQ就是用的第二种。

我个人也推荐内置主题的形式,因为复用了消息队列的缓冲和存储的能力,性能较高,也适合远程的分布式采集。最主要的是不依赖第三方系统,在私有化部署或者独立部署的过程中,可以完成组件内的闭环。

持久存储引擎的选择

持久存储引擎存在的意义就是高可靠、长期地保存轨迹数据,并提供所需要的数据检索能力。所以对于引擎,我们有三个要求: 数据不能丢能长期保留数据具备检索能力

业界主流的存储引擎有不少。从数据可靠性来看,这些引擎都具备高可靠存储的能力,也具备长期保留数据的能力。因为消息轨迹在检索方面的需求相对比较固定,主要是根据消息ID或时间+消息ID进行精准查询,基本所有的存储引擎都可能满足条件。

那一般怎么选呢? 你可能会想,很简单啊,选ElasticSearch,ES本身就是很强大且通用的搜索引擎,访问方便、性能也很高、检索能力强大。

回答是没错的,大部分情况下,我们的选择都是用ES。但是ES的缺点也很明显,我们前面说了它的存储放大严重,很占磁盘空间,会导致成本较高。

在我看来,除了技术上的功能、性能层面的选择,成本也是一个非常重要的因素。因为在大规模集群的消息轨迹的架构设计中,成本是一个非常重要的选项。如果业务流量较低、轨迹数据较少,用ES很合适,因为功能和性能都满足要求,成本也不会有太大压力。

但是,在大规模的集群(比如几万台节点)运营中,我们更多在权衡的不是功能和性能,而是如何在功能、性能、稳定性、成本四个点之间取得平衡。思考的路径一般是 可以牺牲一部分的功能、性能或稳定性,在满足基本需求的基础上,尽量降低成本

而且因为数据规模不同,公司常用存储引擎不一样,也有的会用ClickHouse、HBase等引擎存储轨迹数据。从消息轨迹的业务上来看,功能都是满足的,从成本来看,主要看数据规模。所以,在引擎选择上,并没有定性的结论,但是ES还是最普遍的。

RabbitMQ消息轨迹方案设计实操

了解了具体的技术选型,我们回过头看RabbitMQ消息轨迹的案例,我分享一下我的方案和思考点,整体流程分为五步。

  1. 存储引擎选择: 消息查询需要支持多维度查询和模糊查询,只有几百个节点且主要承载业务消息,所以整体的轨迹数据量预期不会太大,加上成本限制不是特别严重,我会选择ES当存储引擎。
  2. 客户端轨迹记录: RabbitMQ无法记录客户端(Produce和Consumer)的轨迹,我会单独提供一套SDK实现集成轨迹的发送逻辑,把数据发送到RabbitMQ集群中的amq.rabbitmq.trace中。
  3. 服务端轨迹记录: RabbitMQ内核已经实现了轨迹的功能。只要打开Trace开关,就能把生产接收的时间和消费拉取的时间等数据写入到名为amq.rabbitmq.trace的Exchange中,无需重复开发。
  4. 实现轨迹数据存储: 当记录完轨迹后,我们可以开发一个客户端去消费集群中的Trace Queue,消费完数据往ES存储。
  5. 轨迹查询: 轨迹查询平台查询轨迹的时候,直接通过HTTP Rest 访问ES的接口查询轨迹数据即可。

这个方案比较简单务实。不过 RabbitMQ 的 Queue 堆积太多数据,可能会造成集群压力较大,导致不稳定。

所以如果想在“实现轨迹数据存储”这一步保证消费的性能,你也可以直接把数据写入到本地文件,然后通过Filebeat、Fluend等采集器把数据采集到ES。这种方案相比直接发送ES,消费速度更快,Trace Queue基本不会有堆积。

分享一个大集群的实际案例

RabbitMQ的方案设计,集群比较小,我们实际工作时集群规模都比较大,所以这里我也想分享一个对应的实际案例,蛮实用的,也很有意思。

我们在现网大规模集群的轨迹架构中,因为节点数非常多,轨迹数据非常大,全部集群数据加起来接近PB级,如果存储到ES进行查询,成本会爆炸。

为了极限降低成本,我们曾经实现一个内部的消息轨迹查询功能。用的是服务端保存轨迹数据的第四种方案,把轨迹数据写入到本地文件,按时间分段存储,配置了本地文件的清理策略,比如只保留最近十天,然后在节点上起一个HTTP Server,接收轨迹关键字比如轨迹ID,最后通过执行Linux 原生的grep命令检索本地文件,查询起来性能还是可以的。

这个方案因为利用了Linux原生的grep字符串匹配能力,加上一般情况下,消息队列Broker的本地磁盘容量都会有一定的冗余,所以几乎零成本实现了消息轨迹。

总结

消息轨迹记录了消息从生产到消费完成或消费失败的整个生命周期。从轨迹中,可以看到生产时间、服务端收到数据的时间,进入延时队列、消费成功或者进入死信队列时间等等。有了消息轨迹,我们就可以完美定位消息丢失的问题。

在小规模集群的运营过程中,将ES当做存储引擎是比较常用的方案。在数据非常少的场景中,有时候MySQL也可以当作存储引擎。业界的ClickHouse、HBase也可以作为存储引擎的备选方案。

在大规模的集群运营中,从方案选择上来看,不能只关注技术层面的功能和性能,更重要的是需要关注成本。如果成本非常严苛,可以选择适当牺牲功能和性能,以降低成本。有时候适当牺牲,可以降低相当大比例的成本。

思考题

在RabbitMQ的消息轨迹方案设计中,如果调整1和5的条件,数据量变为非常大,而成本需要尽量控制,你会如何设计方案呢?

欢迎分享你的方案和思考,如果觉得有收获,也欢迎你把这节课分享给感兴趣的朋友。我们下节课再见!

上节课思考闭环

如果让你在当前主要维护的一个服务中实现透明监控,你会怎么做?

首先,我会调研一下公司当前有没有团队实现过现类似的透明监控,然后调研一下当前团队和公司有的监控系统是基于什么实现的。考虑这个点是因为,希望能够和公司已有的监控体够尽量贴合,以减少后续独立开发维护的成本。

如果是独立的一个模块,并且公司已经有了Prometheus服务,我会建你议用Prometheus的全套方案,利用Peometheus数据类型注册指、内置HTTP Server 暴露 /Metrics 的方式提供数据。这种方式的好处就是简单方便,实现成本低。

如果是一个全新的大项目,包含多个组件,我建议你用OpenTelemetry的方案。从长期来看,OpenTelemetry在完整度和可维护性上会更优雅。