Skip to content

07 生产端:生产者客户端的SDK有哪些设计要点?

你好,我是文强。今天我们讲消息队列的客户端。

先问你一个问题:你在使用消息队列的SDK生产消费数据的时候,是否会有疑问,SDK底层是怎么工作的,由哪些功能模块组成呢?接下来我们会用三节课来详细分析一下这个问题。

消息队列的客户端主要包含生产、消费、集群管控三类功能。这节课我们聚焦在生产和集群管控。从客户端SDK实现的角度来看,生产模块包含 客户端基础功能和生产相关功能 两部分,其中基础功能是客户端中所有功能共用的。

我们看一张生产模块的功能结构图。

基础功能是蓝色部分,包括请求连接管理、心跳检测、内容构建、序列化、重试、容错处理等等。生产功能是黄色部分,包括客户端寻址、分区选择、批量发送,生产错误处理、SSL、压缩、事务、幂等等等。

那图中各个功能模块如何实现?我们从基础功能开始讲解。

客户端基础功能

连接管理

在网络模块,我们讲过客户端和服务端之间基本都是通过各自语言的网络库,创建TCP长连接进行通信的。在大部分实现中,为了避免连接数膨胀,每个客户端实例和每台Broker只会维护一条TCP连接。

建立一条TCP连接很简单,更关键的是,什么情况下建立连接?一般有初始化创建连接和使用时创建链接两种方式。

  • 初始化创建连接,指在实例初始化时就创建到各个Broker的TCP连接,等待数据发送。好处是提前创建好可以避免发送的时候冷启动。缺点是需要提前创建好所有的连接,可能导致连接空跑,会消耗一定的资源。
  • 使用时创建链接,指在实例初始化时不建立连接,当需要发送数据时再建立。好处是发送时才建立,连接的使用率会较高。缺点是可能出现连接冷启动,会增加一点本次请求的耗时。

因为客户端会有空闲连接回收机制,创建连接的耗时一般较短,所以在实际的架构实现中,两种方式都会用,优劣区别并不明显。不过,从资源利用率的角度考虑, 我建议你使用晚建立连接的方式。

因为连接并不是任何时候都有数据,可能出现长时间连接空闲。所以连接都会搭配连接回收机制,连接建立后如果连接出现长时间空闲,就回收连接。连接回收的策略一般是判断这段时间内是否有发送数据的行为,如果没有就判断是空闲,然后执行回收。

因为单个TCP连接发送性能存在上限,我们就需要在客户端启动多个生产者,提高并发读写的能力。一般情况下,每个生产者会有一个唯一的ID或唯一标识来标识客户端,比如ProduceID或客户端的IP+Port。

单个TCP的瓶颈和很多因素有关,比如网路带宽、网络延迟、客户端请求端的socketbuff的配置、TCP窗口大小、发送速率导致本地数据反压堆积、服务端请求队列的堆积情况、收包和回包的速度等等。

接下来我们来看看客户端和服务端之间的心跳检测。

心跳检测

心跳检测是客户端和服务端之间保活的一种机制,检测服务端或者客户端的一方不可用时,另一方可以及时回收资源,避免资源浪费。一般通过 ping-pong 的方式来发起探测。

前面 第03讲 讲过,消息队列一般都是基于TCP协议通信的。所以客户端和服务端之间的心跳检测机制的实现,一般有基于 TCP 的 KeepAlive 保活机制和应用层主动探测两种形式。

基于TCP的KeepAlive保活机制 是TCP/IP 协议层内置的功能,需要手动打开TCP的KeepAlive功能。通过这种方案实现心跳探测,优点是简单,缺点是KeepAlive实现是在服务器侧,需要Server主动发出检测包,此时如果客户端异常,可能出现很多不可用的TCP连接。这种连接会占用服务器内存资源,导致服务端的性能下降。

应用层主动探测 一般是Client向Server发起的,主要解决灵活性和TCP KeepAlive的缺陷。探测流程一般是客户端定时发送保活心跳,当服务端连续几次没收到请求,就断开连接。这样做的好处是,可以将压力分担到各个客户端,避免服务端的过载。

下面我们来看一下 Java NIO 框架 Netty 中心跳探测的实现,来加深一下印象。

Netty 支持心跳探测的关键是 IdleStateHandler,我们来看它的构造器。构造器包含读超时(readerIdleTime)、写超时(writerIdleTime)、读或写超时(allIdleTime)、时间单位(unit)四个参数,这四个参数是心跳检测的主要配置。

  • 读超时,在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件。
  • 写超时,在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
  • 读或写超时,在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件。
  • 时间单位,当超过时间间隔没有收到请求,就会触发后续的处理逻辑,一般是关闭连接。

此时,如果触发了超时事件,则会触发后续的比如连接重建、报错等行为。接下来我们来看看错误是如何处理的。

错误处理

从请求的角度,有些错误是重试可以恢复的,比如连接断开、Leader切换、发送偶尔超时、服务端某些异常等;有些错误是不可恢复的,比如Topic/分区不存在、服务端Broker不存在、集群和Broker长时间无响应等。

所以,在客户端的处理中也会将错误分为可重试错误和不可重试错误两类。

因为网络环境、架构部署的复杂性,集群可能出现短暂网络抖动、Leader切换等异常,可重试错误就是这类通过一次或多次重试可能恢复的异常;不可重试的错误就是不管如何重试都无法恢复的异常。

客户端收到可重试错误后,会通过一定的策略进行重试,尽量确保生产流程的顺利进行。

虽然实现思路很直接、很简单,但在客户端SDK的实现过程中,错误处理是一个包含很多细节的工作,一般需要考虑下面几个点。

  • 如何定义可恢复错误和不可恢复错误。
  • 完整的错误码的定义和枚举,好的错误码定义可以提高排查问题的效率。
  • 错误后重试的代码实现方式是否合理高效。
  • 判断哪些情况需要停止客户端,向上抛错,以免一些错误信息一直在SDK内空转,提高上层感知异常和排查异常的难度。
  • 日志信息打印debug、info、error日志时,是否包含了完整的内容。

发生错误后,客户端一般会提供重试策略,接下来我们来看看重试机制的实现。

重试机制

重试策略一般会支持重试次数和退避时间的概念。当消息失败,超过设置的退避时间后,会继续重试,当超过重试次数后,就会抛弃消息或者将消息投递到配置好的重试队列中。

退避时间是可以配置的,比如1s、10s、1分钟。当出现错误时,就会根据退避策略退避,再尝试写入。一般情况下,重试是有次数上限的,当然也支持配置无限重试。

退避策略影响的是重试的成功率,因为网络抖动正常是ms级,某些异常可能会抖动十几秒。此时,如果退避策略设置得太短,在退避策略和重试次数用完后,可能消息还没生产成功;如果退避时间设置太长,可能导致客户端发送堵塞消息堆积。

所以消息队列生产者的重试次数和退避策略的设置是比较讲究的,需要根据业务的场景仔细设计。

另外,客户端为了满足安全传输、性能、功能方面的需求,客户端都会支持传输加密、压缩、事务、幂等等功能。这块在后面课程会展开细讲。

那么在基础功能之上,接下来我们再看一下生产客户端所需要的一些相关功能是如何实现的。先来看一下客户端寻址机制。

生产相关功能

客户端寻址机制

我们知道,消息队列作为一个分布式系统,分区会分布在集群的不同节点上。所以从客户端的角度看,往服务端写入数据的时候,服务端有那么多台节点,请求要发给哪台节点呢?

你可能觉得这个问题太简单了,类似我们发送HTTP请求,手动指定目标Broker的IP就行了。就是说在生产者写数据到Broker的时候,在代码里面手动指定分区对应的对端的Broker地址,然后将数据写到目标Broker。

这个思路没问题,但是我们手动指定对端Broker地址的时候,怎么知道这个分区在这台Broker上的对应关系存在哪里呢?为了解决这个问题,业界提出了 Metadata(元数据)寻址机制和服务端内部转发两个思路。

1. Metadata(元数据)寻址机制

服务端会提供一个获取全量的 Metadata 的接口,客户端在启动时,首先通过接口拿到集群所有的元数据信息,本地缓存这部分数据信息。然后,客户端发送数据的时候,会根据元数据信息的内容,得到服务端的地址是什么,要发送的分区在哪台节点上。最后根据这两部分信息,将数据发送到服务端。

消息队列的元数据是指 Topic、分区、Group、节点、配置等集群维度的信息。比如Topic有几个分区,分区的 Leader 和 Follower 在哪些节点上,节点的 IP 和端口是什么,有哪些Group等等。

在 Metadata 寻址机制中,元数据信息主要包括 Topic 及其对应的分区信息和 Node 信息两部分。下面我们来看一下Kafka的元数据信息结构,让你有一个更加直观的认识。

主题分区元数据:
{
    "test123": {
        "Topic": "test123",
        "Partitions": [
            {
                "ID": 0,
                "Error": {},
                "Leader": 101194,
                "Replicas": [
                    101194,
                    101193
                ],
                "Isrs": [
                    101194,
                    101193
                ]
            }
        ],
        "Error": {}
    }
}

节点元数据:
[
    {
        "ID": 101195,
        "Host": "9.130.62.0",
        "Port": 6097
    },
    {
        "ID": 101194,
        "Host": "9.130.62.1",
        "Port": 6096
    },
    {
        "ID": 101193,
        "Host": "9.130.62.2",
        "Port": 6095
    }
]

客户端一般通过 定期全量更新Metadata信息和请求报错时更新元数据信息 两种方式,来保证客户端的元数据信息是最新的。目前Kafka、RocketMQ、Pulsar用的都是这个方案。

2. 服务端内部转发机制

另外一种服务端内部转发机制,客户端不需要经过寻址的过程,写入的时候是随机把数据写入到服务端任意一台Broker。

具体思路是服务端的每一台Broker会缓存所有节点的元数据信息,生产者将数据发送给Broker后,Broker如果判断分区不在当前节点上,会找到这个分区在哪个节点上,然后把数据转发到目标节点。

这个方案的好处是分区寻址在服务端完成,客户端的实现成本比较低。但是生产流程多了一跳,耗时增加了。另外服务端因为转发多了一跳,会导致服务端的资源损耗多一倍,比如CPU、内存、网卡,在大流量的场景下,这种损耗会导致集群负载变高,从而导致集群性能降低。

所以这种方案不适合大流量、高吞吐的消息队列。目前业界只有 RabbitMQ 使用这个方案。

解决了请求要发送给哪个节点,接下来我们来看看消息数据要写入到哪个分区。

生产分区分配策略

我们知道,数据可以直接写入分区或者写入Topic。写入Topic时,最终数据还是要写入到某个分区。这个数据选择写入到哪个分区的过程,就是生产数据的分区分配过程。过程中的分配策略就是生产分区分配策略。

一般情况下,消息队列默认支持轮询、按Key Hash、手动指定、自定义分区分配策略四种分区分配策略。

轮询 是所有消息队列的默认选项。消息通过轮询的方式依次写入到各个分区中,这样可以保证每个分区的数据量是一样的,不会出现分区数据倾斜。

分区数据倾斜是指一个Topic的每个分区的数据量不一样,有的分区数据量大,有的小,从而导致硬件的负载不均,集群性能出现问题。

但是如果我们需要保证数据的写入是有序的,轮询就满足不了。因为在消费模型中,每个分区的消费是独立的,如果数据顺序依次写入多个分区,在消费的时候就无法保持顺序。所以为了保证数据有序,就需要保证Topic只有一个分区。这是另外两种分配策略的思路。

按Key Hash 是指根据消息的Key 算出一个Hash值,然后跟Topic的分区数取余数,算出一个分区号,将数据写入到这个分区中。公式参考:

partitionSeq = hash(key) % partitionNum

这种方案的好处是可以根据Key来保证数据的分区有序。比如某个用户的访问轨迹,以客户的AppID为Key,按Key Hash存储,就可以确保客户维度的数据分区有序。缺点是分区数量不能变化,变化后Hash值就会变,导致消息乱序。并且因为每个Key的数据量不一样,容易导致数据倾斜。

手动指定 很简单,就是在生产数据的时候,手动指定数据写入哪个分区。这种方案的好处就是灵活,用户可以在代码逻辑中根据自己的需要,选择合适的分区,缺点就是业务需要感知分区的数量和变化,代码实现相对复杂。

除了这3种默认策略,消息队列也支持 自定义分区分配策略,让用户灵活使用。内核提供 Interface(接口)机制,用户如果需要指定自定义的分区分配策略,可以实现对应的接口,然后配置分区分配策略。比如Kafka可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口实现自定义分区策略。

为了提高写入性能,有的生产者客户端会提供批量(Batch)写入的语义。

批量语义

客户端支持批量写入数据的前提是,需要在协议层支持批量的语义。否则就只能在业务中自定义将多条消息组成一条消息。

批量发送的实现思路一般是在客户端内存中维护一个队列,数据写入的时候,先将其写到这个内存队列,然后通过某个策略从内存队列读取数据,发送到服务端。

批量发送数据的策略和存储模块的刷盘策略很像,都是根据数据条数或时间聚合后,汇总发送到服务端,一般是满足时间或者条数的条件后触发发送操作,也会有立即发送的配置项。

Kafka是按照时间的策略批量发送的,提供了linger.ms、max.request.size、batch.size三个参数,来控制数据的批量发送。

linger.ms:设置消息延迟发送的时间,这样可以等待更多的消息组成 Batch 发送。默认为0表示立即发送。

max.request.size:生产者能够发送的请求包大小上限,默认为1MB。

batch.size:生产者会尝试将业务发送到相同的 Partition 的消息合包后再进行发送,它设置了合包的大小上限。

Pulsar也提供了batchingEnabled、batchingMaxMessages、batchingMaxPublishDelayMicros 三个参数,来控制数据批量发送(参数语义参考 官方文档)。

为了支持对于性能和可靠性有不同需求的业务场景,客户端一般会支持多种数据发送方式。

数据发送方式

消息队列一般也会提供同步发送、异步发送、发送即忘三种形式。

同步和异步更多是语言语法的实现,同步发送主要解决数据发送的即时性和顺序性,异步发送主要考虑性能。发送即忘可能不好理解,我们重点讲下。

发送即忘指消息发送后不关心请求返回的结果,立即发送下一条。这种方式因为不用关心发送结果,发送性能会提升很多。缺点是当数据发送失败时无法感知,可能有数据丢失的情况,所以适合用在发送不重要的日志等场景。Kafka提供了ack=0、RocketMQ提供了sendOneway来支持这种模式。

讲完了发送相关的功能设计,接下来我们看一下管控操作在客户端中的实现方式。

集群管控操作

集群管控操作一般是用来完成资源的创建、查询、修改、删除等集群管理动作。资源包括主题、分区、配置、消费分组等等。

从功能上来看,消息队列一般会提供多种集群管理方式,比如命令行、客户端、HTTP接口等等。

命令行工具是最基本的支持方式。如下图所示,它的底层主要通过包装客户端SDK和服务端的相关功能接口进行交互。程序编码上一般由 命令行 参数包装底层SDK调用 三部分组成。主要流程是接收参数、处理参数、调用SDK等相关操作。

有的消息队列也会支持HTTP接口形式的管控操作。好处是因为HTTP协议的通用性,业务可以从各个环境发起管控的调用,不用非得使用admin SDK。另外客户端封装HTTP接口实现命令行工具的成本也比较低。

总结

消息队列生产者客户端的设计,主要关注下面三个部分。

  1. 网络模块的开发和管理。这部分是为了完成和服务端的通信,比如请求和返回的构建、心跳检测、错误处理,重试机制等。
  2. 根据服务端提供的各个接口的协议结构,构建请求,完成序列化和反序列化后,通过网络模块发起请求并获得返回。
  3. 在前面两步的基础上,添加各个业务层面的功能,比如生产、消费、事务、幂等、SSL等等。

客户端和服务端交互的过程中,一般要经过元数据寻址,以正确找到分区所在的Broker。如果我们想避免客户端寻址,只能在服务端内进行转发,但有性能和资源的损耗。所以在主打吞吐的消息队列组件中,转发的方案用得很少。

从生产者的角度来看,需要重点关注分区分配策略、批量语义、发送方式三个方面。请求内容构建和序列化属于协议设计的内容,主要取决于协议的具体设计和序列化/反序列化框架的选择。

思考题

假设让你从头开始写一个消息队列的某个语言的SDK,思考步骤是怎样的?

期待看到你的思路,如果觉得今天的内容对你有帮助,也欢迎分享给身边的朋友一起学习。我们下节课再见!

上节课思考闭环

数据的批量写入,如果不用PageCache的缓存刷新机制,我们可以在应用程序中管理数据完成批量写入吗?如果可以怎么实现?优缺点是什么?

可以的。

好处是可以跳过操作系统的刷盘策略,根据自己业务的读写特点自定义刷盘策略。实现得好的话有助于提升缓存的命中率。

应用程序可以通过使用 Direct IO 来模拟实现PageCahce的功能。大致思路是通过Direct IO管理硬盘,然后在应用中缓存数据,等待数据达到一定量的时候再统一批量写入硬盘,然后调用force刷新数据到硬盘。

这种方式的效果和写 PageCache 是一样的,遇到的问题也是一样的。区别在于数据缓存在哪里,通过什么策略刷新到硬盘。写PageCache的好处是不需要程序自己管理缓存,不需要自定义策略写入,操作系统都可以帮忙做,缺点是可能无法百分百满足我们自己的业务场景。