跳转至

09 消费端:消费者客户端的SDK有哪些设计要点?(下)

你好,我是文强。

这节课我们继续来讲消费端,上节课我们学习了消费模型的选择和分区消费模式设计,这节课我们将学习剩下的三部分,也就是消费分组(订阅)、消费确认、消费失败处理。

消费分组

消费分组是用来组织消费者、分区、消费进度关系的逻辑概念。为什么需要消费分组呢?

在没有消费分组直接消费Topic的场景下,如果希望不重复消费Topic中的数据,那么就需要有一个标识来标识当前的消费情况,比如记录进度。这个唯一标识就是消费分组。

在一个集群中可以有很多消费分组,消费分组间通过名称来区分。消费分组自身的数据是集群元数据的一部分,会存储在Broker的元数据存储服务中。消费分组主要有管理消费者和分区的对应关系、保存消费者的消费进度、实现消息可重复被消费三类功能。

消费分组和Topic是强相关的,它需要包含Topic才有意义,一个空的消费分组是没有意义的。如下图所示,消费分组内有很多个消费者,一个消费分组也可以订阅和消费多个Topic,一个Topic也可以被多个消费分组订阅和消费。

因为 Topic 不存储真实数据,分区才存储消息数据,所以就需要解决消费者和分区的分配关系,即哪个分区被哪个消费者消费,这个分配的过程就叫做消费重平衡(Rebalance)

从流程上来看,当新建一个消费分组的时候,就需要开始分配消费者和分区的消费关系了。分配完成后,就可以正常消费。如果消费者和分区出现变动,比如消费者挂掉、新增消费者、订阅的Topic的分区数发生变化等等,就会重新开始分配消费关系,否则就会存在某些分区不能被订阅和消费的情况。

协调者

从实现上来看,如果要对消费者和分区进行分配,肯定需要有一个模块拥有消费分组、所有的消费者、分区信息三部分信息,这个模块我们一般命名为协调者。协调者主要的工作就是执行消费重平衡,并记录消费分组的消费进度。

如下图所示,在消费分组创建、消费者变化、分区变化的时候就会触发重新分配。分区分配的操作可以在协调者内部或者消费者上完成。

  1. 在协调者完成,即协调者首先获取消费者和分区的信息,然后在协调者内部完成分区分配,最后再把分配关系同步给所有消费者。
  2. 在消费者完成,即负责分配的消费者获取所有消费者和分区的信息,然后该消费者完成分区分配操作,最后再把分配关系同步给其他消费者。

从技术上来看,这两种形式的优劣区别并不大,取决于代码的实现。一般在创建消费分组和消费者/ Topic分区发生变化的时候,会触发协调者执行消费重平衡。

从实现的角度来看,协调者一般是Broker内核的一个模块,就是一段代码或者一个类,专门用来完成上述的工作。当有多台Broker时,协调者的实现有多种方式,比如Kafka 集群每台Broker都有协调者存在。通过消费分组的名称计算出来一个hash值和__consumer_offset的分区数,取余计算得出一个分区号。最后这个分区号对应的Leader所在的Broker节点就是协调者所在的节点。客户端就和计算出来的这台Broker节点进行交互,来执行消费重平衡的相关操作。

当有了协调者后,就需要来确认哪个分区给哪个消费者了,此时就需要一个分配策略来执行,这就是消费分区分配策略。

消费分区分配策略

在具体实现上,一般内核会默认提供几种分配策略,也可以通过定义接口来支持用户自定义实现分区分配策略。

分区分配策略的制定一般遵循以下三个原则:

  1. 各个分区的数据能均匀地分配给每个消费者,保证所有消费者的负载最大概率是均衡的,该原则最为常用。
  2. 在每次重新分配的时候,尽量减少分区和消费者之间的关系变动,这样有助于加快重新分配的速度,并且保持数据处理的连续性,降低处理切换成本。
  3. 可以允许灵活地根据业务特性制定分配关系,比如根据机房就近访问最近的分区、某个Topic的奇数分区分配给第一个消费者等等。

所有消息队列的默认策略都是相对通用的,一般都会包含有轮询、粘性、自定义三种类型的策略。

轮询就是指用轮询的方式将分区分配给各个消费者,保证每个消费者的分区数量是尽量相同的,从而保证消费者的负载最大概率上是均衡的。思路是拿到所有主题的所有分区和所有消费者,根据拿到的顺序(实际实现中可能会先全部打乱,以确保随机性)将分区逐个分配给消费者。分配到最后的效果是,每个消费者所分到的分区数是一样的,最多相差1个分区。比如tp0有3分区,tp1有2分区,tp2有3分区,分配后效果如下。

消费者1:tp0-0、tp2-1、tp1-1
消费者2:tp2-2、tp0-1、tp2-0
消费者3:tp1-0、tp0-2

因为Topic一般会有多个分区,默认情况下写入大部分是均匀的。这个方案的优点是,从随机性的原理来看,打乱分区后再分配给每个消费者,消费者的负载大概率是均匀的。但是也有可能出现不均衡,比如当消费组同时订阅多个分区时,有可能会将同一个Topic的多个分区都分配给一个消费者,从而出现消费者的负载倾斜。

在轮询的基础上,为了解决随机轮询的情况,某些流量高的Topic可能会分配给同一个消费者。业界提出了一些轮询方案的升级版本,比如在随机的基础上,将Topic的不同分区尽量打散到不同的消费者,从而保证整体消费者之间的分区是均衡的,如下所示。

消费者1:tp0-0、tp2-1、tp1-1
消费者2:tp0-1、tp2-0、tp1-0
消费者3:tp0-2、tp2-2

各个变种版本的思路核心都是为了消费者更加均衡,避免消费倾斜。所以当你看到有些分配算法很像轮询又不太一样时,只要从这个目的去拆解,就会比较好理解了。

粘性是指尽量减少分区分配关系的变动,进而减少重平衡所耗费的时间和资源损耗。即当已经分配好消费者和分区的消费关系后,当消费者或者分区出现变动,就会触发重平衡。从底层来看,可能就是一个消费者掉了或者新增分区。此时需要重新进行分配的消费者和分区其实是有限的,大部分的分配关系可以不动。而此时如果使用轮询算法,则要全部打散重来,耗时就会很长,并且浪费资源,即把原先不需要重新分配的关系都重新分配一遍。

粘性的效果如下所示,比如当上面的消费者3挂了后,只需要将tp1-0、tp0-2平均分给消费者1和2即可,消费者1和2原先分配的分区不用动。

消费者1:tp0-0、tp2-1、tp1-1、tp1-0
消费者2:tp2-2、tp0-1、tp2-0、tp0-2

在实际的实现中,为了减少重新分配关系,有一个非常常用的算法是一致性哈希。一致性哈希的算法经常用在负载均衡中。用一致性哈希实现粘性策略的优点是,当节点或者分区变动时,只需要执行少量的分区再分配即可。

在一些消息队列中,也会提供一些与自己相关的特色的分区分配策略。比如RocketMQ内部就提供了根据机房就近分配、指定机房分配两种策略,这两种策略的协调者要感知到客户端和服务端的机房信息,然后根据策略进行分配,均主要用在跨可用区场景中。Kafka 也提供了轮询策略改进版RoundRobinAssignor分配策略。这些策略的核心出发点,都是为了解决消费者和分区之间的分配均衡、重平衡耗时、业务场景需要等诉求。

自定义分区分配算法,跟上节课生产端数据的分区分配策略是一样的,内核会提供接口,用户可以根据自身需求实现自定义算法,然后指定配置生效即可。比如Kafka提供了org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口来提供自定义分区分配策略。

消费确认

那么当数据被消费成功后,就必须进行消费确认操作了,告诉服务端已经成功消费了这个数据。消费确认就是我们在消息队列中常说的ACK。

一般情况下,消息确认分为确认后删除数据和确认后保存消费进度数据两种形式。

确认后删除数据是指集群的每条消息只能被消费一次,只要数据被消费成功,就会回调服务端的ACK接口,服务端就会执行数据删除操作。在实际开发的过程中,一般都会支持单条ACK和批量ACK两种操作。这种方式不利于回溯消费,所以用得比较少。

消费成功保存消费进度是指当消费数据成功后,调用服务端的消费进度接口来保存消费进度。这种方式一般都是配合消费分组一起用的,服务端从消费分组维度来保存进度数据。

为了保证消息的回溯消费和多次消费,消息队列大多数用的是第二种方案。数据的删除交由数据过期策略去执行。

保存消费进度一般分为服务端保存和客户端自定义保存两种实现机制。

服务端保存是指当消费端消费完成后,客户端需要调用一个接口提交信息,这个接口是由服务端提供的“提交消费进度”接口,然后服务端会持久保存进度。当客户端断开重新消费时,可以从服务端读取这个进度进行消费。服务端一般会通过内置的Topic或者文件来持久保存该数据。这种方式的优点就是客户端会封装好这些逻辑,使用简单,无需管理进度相关的信息,缺点就是不够灵活。服务端保存一般是默认的方案。

在提交位点信息的时候,底层一般支持自动提交和手动提交两种实现。

  • 自动提交一般是根据时间批次或数据消费到客户端后就自动提交,提交过程客户无感知。
  • 手动提交是指业务根据自己的处理情况,手动提交进度信息,以避免业务处理异常导致的数据丢失。

它们两者的优缺点如下表所示:

一般情况下,我建议你使用手动提交方式,可以避免数据丢失。

客户端自定义保存是指当消费完成后,客户端自己管理保存消费进度。此时就不需要向服务端接口提交进度信息了,自定义保存进度信息即可,比如保存在客户端的缓存、文件、自定义的服务中,当需要修改和回滚的时候就比较方便。这种方案的优点就是灵活,缺点就是会带来额外的工作量。

业界主流来看,这两种方案用得都比较多,不过默认情况下都是使用第一种方案,业务需要的话就选择第二种方案。

消费失败处理

我们知道,一个完整的消费流程包括消费数据、本地业务处理、消费进度提交三部分。

那么从消费失败的角度来看,就应该分为从服务端拉取数据失败、本地业务数据处理失败、提交位点信息失败三种情况。下面我们逐一来看一下。

从服务端拉取数据失败,和客户端的错误逻辑处理是一致的,根据可重试错误和不可重试错误的分类,进行重复消费或者向上抛错。

本地业务数据处理失败,处理起来就比较复杂了。如果是偶尔失败,那么在业务层做好重试处理逻辑,配合手动提交消费进度的操作即可解决。如果是一直失败,即使重试多次也无法被解决,比如这条数据内容有异常,导致无法被处理。此时如果一直重试,就会出现消费卡住的情况,这就需要配合死信队列的功能,将无法被处理的数据投递到死信队列中,从而保存异常数据并保证消费进度不阻塞。

提交位点信息失败,其处理方法通常是一直重试,重复提交,如果持续失败就向上抛错。因为如果提交进度失败,即使再从服务端拉取数据,还是会拉到同一批数据,出现重复消费的问题。

总结

从设计上看,消费端要解决的问题依次分为三步:

  1. 满足基本的消费需求,能消费到数据,确认数据。
  2. 满足稳定性和性能的需求,能快速稳定地消费到数据。
  3. 支持功能方面的需求,比如回溯消费、消费删除、广播消费等等。

为了能满足基本的消费需求,服务端会提供消费和确认接口,同时在客户端封装消费和确认操作中,底层通过网络层和服务端建立、维护TCP连接,然后通过协议完成基本的消费操作。

如果要回溯消费,则需要单独记录消费进度。这样就能抽象出消费分组的概念,用来管理消费者、分区、消费进度的关系。通过消费分组来记录消费进度,从而实现数据的多次分发。另外,消费分组机制也可以用在广播消费的场景。

在消费确认的过程中,一般需要客户端回调服务端提供的确认接口。确认接口分为确认删除和确认记录消费进度两种模式。主流方式是在确认的时候记录消费进度。

异常处理主要是为了保证数据能被正常消费,重点关注不丢数据、不重复消费、不阻塞住消费三个问题,我们需要针对不同的问题做不一样的处理。

思考题

从代码的实现角度,如果让你负责开发某个消息队列客户端的消费模块,你会怎么思考?依次做哪些事情?

欢迎分享你的看法,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!

上节课思考闭环

当Topic的消息写入存在倾斜,某些分区消息堆积很多,此时选择哪种分区消费模式可以解决问题?

可以分三种情况来解答。

1.如果数据可以丢弃,那么可以通过重置消费位点到最新来解决历史堆积,让消费者可以消费新的数据。不过,这个方案有缺点,重置位点之前的数据会丢失,如果消费性能还是跟不上的话,那么后续还是会堆积。

2. 如果数据不能丢弃,不用保证消费顺序,那么可以将消费模式切换到共享消费模式,则有多个消费者同时消费一个分区,可以极大地提升消费速度,还可以通过横向增加消费者,从根本上解决堆积问题。

3. 如果数据不能丢弃,且需要保证消费顺序,那么就只能从发送端入手,分析为何发送端写入倾斜,然后解决写入倾斜的问题。

精选留言(2)
  • shan 👍(1) 💬(0)

    消费者客户端SDK设计总结 消费分组 通过消费者分组可以管理消费者消息的消费,一个集群中可以有很多消费组,消费者相关的元数据信息会保存在Broker的元数据存储服务中。 一个消费组可以订阅多个Topic,一个Topic也可以被多个消费组订阅和消费。 消费重平衡(Rebalance) 一般会为每个消费者分配对应的消息队列,这个分配的过程叫做消费重平衡(Rebalance),协调者的工作就是执行消费重平衡。以下两种方案: (1)协调者进行分配:协调者内部完成消费重平衡之后,将分配结果同步给所有消费者; (2)在消费者端完成:不需要协调者,每个消费者在自己本地进行消费重平衡,RocketMQ在5.0之前采用的就是这种方式; 一般消费者发生变化(上线或者下线)/ Topic分区/队列发生变化的时候,会触发执行消费重平衡。 分配策略 分区/消费队列的分配策略一般有以下几大类: (1)轮询,将分区/队列排序,按顺序依次分配给各个消费者,如果分区/队列数量与消费者数量不均衡,有可能出现某个消费者分配过多或者有些消费者没有队列可以分配的情况,导致负载倾斜; (2)粘性:尽量减少分区分配关系的变动,从而减少重平衡所耗费的时间和资源损耗; (3)自定义分配策略; RocketMQ在5.0以前按队列粒度分配给消费者,主要有以下几种策略: (1)平均分配策略; (2)平均轮询分配策略; (3)一致性hash分配; (4)根据配置进行分配; (5)根据机房分配; (6)优先分配同机房策略; Rocket5.0以后可以按照消息粒度,将消息直接分配到消费者消费,具体可以参考官方文档: https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance 消费确认 消息消费完毕之后,需要向服务端发送ACK,服务端一般有两种方式处理消费后的消息: (1)删除数据:每条消只能被消费一次,这种方案一般很少采用; (2)保存消费进度:服务端保存每个队列的消费进度(一般会按消费者划分),数据的删除交由过期策略执行,消息队列大多数采用的就是这种方案;

    2023-09-23

  • Geek_368613 👍(0) 💬(0)

    “通过消费分组的名称计算出来一个 hash 值和 __consumer_offset 的分区数,取余计算得出一个分区号”,请问下文强老师,这个计算的操作是由谁来完成的呢?

    2023-07-19