Skip to content

40 连接器:如何以MQ为核心搭建数据集成架构?

你好,我是文强。

这节课我们来聊聊连接器。在消息队列中,连接器也称为 Connector,它的作用是把不同数据源中的数据导入到消息队列,或者把消息队列中的数据导出到下游的各种存储引擎。连接器对于用户的价值是,可以很方便地将数据导入、导出消息队列。

听起来,它和前面两节课中讲的基于 Serverless 实现流式计算和事件驱动架构的作用很类似,从功能上看,Flink 或 Spark 也能实现。那为什么还有连接器这个概念呢?它的价值是什么?

带着这两个问题,我们开始本节课程,先来详细看一下什么是连接器。

连接器是什么

从技术上来看,连接器中的“连接”指的是数据的连接,即把数据从某个地方搬到另外一个地方。所以连接器就是指 将数据从源端搬到目标端的组件。或者说只要具备数据连接功能的组件,就可以称为连接器。

同样的,消息队列连接器的功能也是把数据从源端搬到目标端。但和普通连接器不同的是, 消息队列连接器的其中一端一定是消息队列。

如上图所示,业界主流消息队列 Kafka、RocketMQ、Pulsar 都支持连接器的概念,组件名称分别是 Kafka Connector、RocketMQ Connector、Pulsar IO。从功能上来看,消息队列连接器分为 源连接器(Source)目标连接器(Sink),作用分别是将数据源的数据导入到消息队列和把消息队列中的数据导出到下游的存储。

那这些连接器是如何运行的呢?

如上图所示,连接器是运行在多个物理节点(Worker)上的。因为连接器需要并发运行多个任务且需要具备横向扩容的能力,所以连接器运行的平台本质上是一个分布式任务调度平台。这个平台跟 Flink、Spark、Mesos、Yarn 等集群的功能是类似的,主要负责分布式任务的调度和运行。

所以可以说,消息队列连接器是由 分布式任务调度平台源/目标连接器 两部分组成。平台负责这些连接器的运行,连接器负责对接各种数据源和数据目标。

如果你了解过数据集成的话,你还会发现连接器的功能和数据集成很像。所以接下来我们来看看什么是数据集成,以及数据集成和连接器的关系。

数据集成和连接器

从技术上看,数据集成是一个概念,不是具体的功能组件。它是将数据从数据源搬到数据目标这个功能的描述,数据从源到目标的过程就称为数据集成。

所以你会发现,消息队列连接器是数据集成概念下的一种技术。即消息队列的 Source 和 Sink 连接器都是一种数据集成。数据集成是一个非常成熟的领域,业界开源的数据集成就有Flink CDC、DataX、SeaTunnel等等。

接下来我们通过一个典型的数据集成场景,让你对数据集成和连接器有一个更深的认识。

场景描述:将 MySQL 中的数据实时同步到 Elasticsearch。

实现这个功能,我们有使用典型数据集成组件和消息队列连接器两种方案。

  1. 典型数据集成组件:该方案使用开源组件,如 Flink CDC、DataX、SeaTunnel等组件,订阅MySQL的数据,然后把订阅到的数据同步到下游。
  2. 消息队列连接器:该方案需要先使用源连接器 SourceConnector 从 MySQL 订阅数据,然后将数据写入到消息队列当中,再使用 SinkConnector 将消息队列中的数据写入到下游存储引擎。

你会发现,两者的最大差别是:消息队列连接器需要把数据先存储到消息队列中,然后再从消息队列中消费数据写入到下游。看起来是额外存储了一份数据,有点浪费。

但是你可以思考这样一个场景,如果源数据需要被分发到下游多个目的地呢?

此时使用典型数据集成组件的话,就要订阅多次源数据,可能会对源端造成较大压力。用消息队列连接器就很合适,只订阅一次源数据,借助消息队列的缓存和分发能力,就能实现多次分发。

所以总结来看, 第一种方案适合数据源和数据目标是一对一的场景,第二种方案适合数据源和数据目标是一对多的场景。

讲完了数据集成和连接器的关系,接下来我们来讲一下消息队列连接器的架构和底层运行原理。

消息队列连接器底层原理分析

先来看一张整体的架构图。

如上图所示,消息队列连接连接器由 数据源源数据连接器分布式任务调度平台目标数据连接器数据目标 五部分组成。

从技术上来看,连接器的内核主要包括以下三部分内容:

  1. 分布式任务调度平台的开发和设计
  2. 各种源、目标连接器的开发
  3. 简单的数据清洗能力

分布式任务调度平台

分布式任务调度平台,也称为连接器的 Runtime,它的作用是用来运行各种 Connector 任务。

从实现来看,Runtime 一般有 Connector 和 Task 两个概念。Connector 是一个逻辑概念,用来表示一个订阅任务。Task 是任务运行的实体,用来执行各个具体的订阅任务。就是说 Runtime 里面会运行多个源/目标 Connector,一个 Connector 会被拆分为多个 Task 运行,Task 是 Runtime 调度的最小单位。

从功能角度,Runtime 一般需要具备以下四个功能:

  1. HTTP API,提供增删改查 Connector 任务,启动、暂停、扩容、缩容等操作的 API 接口。
  2. 元数据存储,用来保存集群 / Connector 基本信息,Connector/Task 运行信息(比如运行在哪些节点上、当前运行状态等),执行节点的基本信息等等。
  3. 调度模块,用来完成 Connector 任务的拆分、调度、启停、扩容等等。
  4. 任务执行节点,用来执行各种 Connector 和 Task 任务。

接下来我们通过 Runtime 集群启动过程和 Connector 任务运行过程来认识一下这四个功能。

先来看一下 Runtime 集群的启动过程。

如上图所示,因为 Runtime 是集群部署,集群由多个任务执行节点组成。所以同样也需要有 节点发现元数据存储主节点节点探活 等流程。这部分的原理在 第15讲第16讲 中详细讲过,就不再赘述。这里需要注意的是,集群启动后需要启动内置的 HTTP Server,比如支持RESTful API,来提供集群管控类操作。

集群启动完成以后,我们以订阅 MySQL 的 Binlog 数据到消息队列的场景举例,来看看Runtime 运行 Connector 的整体过程。

  1. 通过 HTTP RESTful 接口创建订阅 MySQL Binlog 到消息队列的 Source Connector。
  2. Runtime 收到请求后,先保存 Connector 相关的元数据信息。这个元数据包括MySQL的地址、用户名、密码以及消息队列的地址、Topic、Connector 最大任务数等信息。在 第16讲 讲过,元数据可以保存在消息队列的内置的Topic、ZooKeeper、etcd等引擎中。
  3. 保存完元数据后,接下来 Runtime 会根据 Connector 配置的任务数,在不同节点创建Task。Task 从本质上看就是线程,Runtime 会控制多个 Task 分布在多个不同执行节点上。
  4. 每一个 Task 的核心逻辑是从 MySQL 订阅 Binlog ,然后经过本地处理后,将订阅到的数据写入到消息队列中。
  5. 在执行节点异常时,Runtime 会将异常节点上的 Task 调度到新节点运行。
  6. 当增加或减少 Task 数量的时候,Runtime 就会通知对应的执行节点执行新增或删除对应的Task。

那 Runtime 是如何通知执行节点执行对应的操作呢? 一般情况下,每个执行节点会暴露 HTTP或 TCP 接口来接收 Runtime 的指令,然后执行对应的逻辑。

那 Runtime 里是谁来下发指令或者调度任务的呢?我们在 第15讲 中讲过集群一般有主节点的概念,集群中的管控操作一般是由主节点来完成的。而在分布式任务调度平台里面,道理也是类似的,这里就不再重复。

讲完了分布式任务调度平台,我们再来看看源和目标连接器是怎么实现的。

源/目标连接器

我们已知,连接器的功能是从数据源消费数据写入到数据目标。所以,从技术上拆解连接器,它应该包含读取(Read)、本地处理(Local Transforms)、写入(Write)三部分。

  • 读取,负责从源端读取数据,读取方式一般是主动读取。主动读取需要源端支持远程读取。比如订阅 MySQL Binlog,可以理解为是通过TCP 协议去 MySQL 拉取数据。再比如订阅某个钉钉应用的消息,就需要通过钉钉某个开放的 HTTP API 去读取数据。
  • 本地处理,指当数据拉取到本地后,需要对数据进行格式转换、类型转换等处理。
  • 写入,负责将处理完成的数据写入到下游的数据目标,这块主要是使用下游引擎提供的 SDK 写入数据。

上面我们讲到 Connector 是一个逻辑概念,Task 才是执行的主体。创建一个 Connector 就是创建一个订阅操作,Connector的底层会拆分为多个Task。

来看下图,这是典型的订阅 Kakfa 数据写入到 Elasticsearch 的 SinkConnector 场景。此时可以设置 Connector 的任务数和分区数保持一致,以达到最佳的性能。

从代码的角度,Connector 是如何实现的呢?核心就是先定义 Interface 接口,然后各个 Source、Sink 插件继承实现接口的具体逻辑,从而实现具备不同功能的 Connector。

如下所示,我们简单定义了 Connector 和 Task 两个接口。

public abstract class Connector{
  public abstract void start(Map<String, String> var1);
  public abstract void stop();
}

public abstract class Task{
  public abstract void start(Map<String, String> var1);
  public abstract void execute(Collection<SinkRecord> var1);
  void stop();
}

可以看到,我们在Conenctor 里面定义了 start 和 stop 接口,分别用于实现 Connector 启动和停止时需要执行的代码逻辑。Task 同样提供了start、execute、stop接口,分别用于实现 Task 启动、业务逻辑处理、停止三部分的代码逻辑。

接下来我们来实现一个简单版本的 MySQLSource Connector,伪代码如下:

public class MySQLConnector{
  public void start(Map<String, String> var1){
      // 配置初始化
      config.init()
      // 多个任务初始化
      task.init()
  }
  public void stop(){
      task.stop()
  }
}

public class MySQLTask{
  public void start(Map<String, String> var1){
      // 初始化 MySQL
      mysql.init(var1)
      // 连接 MySQL 实例
      mysql.connect()
  }
  public void execute(Collection<SinkRecord> var1){
       // 从 MySQL 订阅数据
       dada = mysql.query()
       // 对源数据进行格式转换、处理
       result = transforms(data)
       // 写入到目标Topic
       writeTopic(result)
  }
  void stop(){
      // 停止 MySQL 连接
      mysql.stop()
  }
}

代码的注释写得很清楚,这里我就不再赘述了。你会发现 execute 方法是整个插件的核心,业务逻辑都在这个方法里面实现。

完成代码编写后,再将代码打包,提交到 Runtime 平台运行。Runtime 会先依次调用Connector.start() 和 Task.start() 来启动任务,然后不断地执行 Task.execute() 方法从源端订阅数据进行处理。

从技术上看,Java 运行指定类是通过反射机制来实现的,这里就不再展开细讲。讲完了Connector 编写和设计逻辑,接下来我们来看看如何实现简单的数据清洗能力。

简单的数据清洗能力

先来看一下为什么要数据清洗能力(Transforms)呢?我们通过一个例子来简单说明一下。

来看下图,图中上半部分是源数据,下半部分是下游需要的数据。

从图中可以看到,源数据是一个 API 接口的返回值,包含了9个字段。而我们只需要5个字段,而且部分字段的内容是从源数据的某个字段提取出来的。按正常思路,这部分逻辑在Task.execute() 里面自定义实现即可。

从实现的角度,这样做是没有问题的,但是数据清洗转换是一个很常见的需求,如果所有插件都自定义实现数据清洗逻辑,工作量就太大了。那有没有办法降低插件开发的工作量呢?

答案是可以的,我们可以把数据转换的逻辑放到 Runtime 里面去实现。连接器中的 Transforms 模块就是做这个功能的。我们给一个基础的数据处理功能清单,来看看 Transforms 都能实现哪些功能。

可以看到,表格中每一个操作都是很基础的数据格式转换操作。在数据处理时,将这些操作组合起来,就能完成复杂的数据处理操作。

下面我们通过一段伪代码,来看一下底层是如何基于这些基础操作完成数据清洗的。

 public String transform(String data){
      String data = add_field(data)
      String data = replace(data)
      String data = rename(data)
      String data = uppercase(data)
      String data = remove_field(data)
      return data;
  }

上面代码表示,每行数据都要经过添加字段、替换字段、重命名变量名、首字母大写、移除某个字段这五步逻辑。这些处理逻辑是我们在创建Conenctor的时候配置好的,根据定义的配置顺序执行即可。

每个基础操作的代码实现,本质上也是基于 Interface 和反射机制来实现的。跟上面Connector和 Task 的逻辑原理是一样的,这里就不再赘述 。

讲完了连接器架构和底层原理,接下来我们就来看看业界主流消息队列连接器的具体实现。我们已知 RocketMQ、Kafka、Pulsar 都支持连接器的功能。因为从实现原理来看,三者基本是一致的,所以下面我们就以 Kafka Connector 为例来拆解一下连接器具体的实现。

Apache Kafka Connector

Kafka Connector 是 Apache Kafka 官方支持的一个开源框架,用于 将数据从外部系统导入到 Kafka 集群,或者从 Kafka 集群导出到外部系统。先来看一下 Kafka Connector 的系统架构。

系统架构

如上图所示,Kafka Connector 的架构主要由几个部分组成。

  1. Cluster 表示 Connector 的集群,也就是我们前面提到的 Runtime 集群。
  2. Connector 是负责管理整个订阅任务的逻辑概念,它负责组织管理 Task 任务。
  3. Task 是执行实际数据传输工作的组件。一个 Connector 可以有一个或多个 Task,每个 Task 都是一个独立的数据处理单元,可以并行运行。
  4. Worker 是运行 Connector 和 Task 的节点。
  5. Transform 是用于数据转换的组件。
  6. Config & Status Storage 用来存储 Connector 和 Task 的配置信息以及运行状态,这些信息通常存储在 Kafka 的内部主题中。

从实现来看, Kakfa Connector 的 Runtime 就是 Connector 集群,集群的元数据是存储在 Kakfa 内置的 Topic 中的。集群启动后,它通过暴露 HTTP API 给用户管理 Connector 任务。

从功能上来看,Kafka Connector 也分为了 Source Connector 和 Sink Connector。

  1. Source Connector 的主要任务是从外部系统(如数据库、消息队列或文件系统)中提取数据,并将这些数据转换为 Kafka 可以理解的消息格式,然后发布到 Kafka 主题中。每个 Source Connector 可以有一个或多个 Source Task,每个 Task 负责从数据源读取数据并将其发送到 Kafka。这些 Task 可以并行运行,以提高数据处理的吞吐量。
  2. Sink Connector 的主要任务是从 Kafka 主题中消费消息,并将这些消息转换为目标系统可以理解的格式,然后将数据写入目标系统。与 Source Connector 类似,每个 Sink Connector 也可以有一个或多个 Sink Task,每个 Task 负责从 Kafka 读取数据并将其写入目标系统。

了解了 Kafka Connector 的系统架构,接下来我们简单来看一下 Connector 暴露出来的 RESTful API。

RESTful API

Kafka Connect 提供了一组 RESTful HTTP API,包含了增删改查 Connector、修改配置等等,用来管理和监控 Kafka Connector 集群。

下面我们通过创建 Connector 和获取 Connector 列表接口来简单讲解一下它们的使用。

  • 创建 Connector,你可以使用 POST 请求到 /connectors 端点创建一个新的 Connector。请求体应包含 Connector 的配置,请求格式如下:
POST /connectors
{
  "name": "my-connector",
  "config": {
      ...
  }
}
  • 获取所有 Connectors,你可以使用 GET 请求到 /connectors 端点来获取所有已经创建的 Connectors 的列表,请求格式如下:
GET /connectors

因此如果你想操作 Connector 集群,直接调用集群暴露的 API 即可。如果需要了解更多 API 使用方式,你可以去查看 Apache Kafka 的 官方文档

接下来,我们来看一下 Kafka Connector 的具体使用案例,让你对 Kakfa Connector 有一个更深的理解。

示例:File to Topic Source

这是一个 Kafka SourceConnector 示例,它的功能是订阅文件数据写入到 Topic。

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=
org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=
org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

这个 demo 的意思是:将 test.txt 文件中的数据导入到名为 connect-test 的 Topic 中。并在导入数据的过程中,对数据进行清洗转换。清洗转换的逻辑为:把文本中每行的值加上 key:line,然后每行数据添加一个固定值 "data_source":"test-file-source",最后将数据转换为JSON格式。

输入:
"foo"
"bar"
"hello world"

输出:
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

从运行的角度看,我们通过 RESTful API 调用 POST 的 /connectors 接口提交任务。连接器 Runtime 会对应创建一个Connector,然后根据设置的Task 数量,将多个 Task 分配给不同的Worker节点。Worker 节点会以线程的形式去运行每个Task,从源端读取数据,处理转换,然后写入到下游。

总结

连接器也称为 Connector,它是将数据从源端搬到目标端的组件。消息队列连接器和普通连接器的区别是,消息队列的连接器的其中一端一定是消息队列。

数据集成是一个概念,不是一个具体的功能组件。它描述的是将数据从数据源搬到数据目标的过程。消息队列连接器只是数据集成概念下的一种具体实现方案。

从是否缓存数据的角度看,数据集成有典型数据集成组件和消息队列连接器两种方案。它们最大的差别是,消息队列连接器需要把数据先存储到消息队列中缓存,然后根据需要再从消息队列中消费数据,导入到下游。而典型数据集成组件是将数据直接导入到下游组件的。

从功能上来看,典型的数据集成适合数据源和数据目标是一对一的场景,消息队列连接器适合数据源和数据目标是一对多的场景。

从技术上看,消息队列连接器由 Runtime、Connector、Transforms 三部分组成。Runtime 本质上是一个分布式任务调度集群,用来创建、分配、调度 Connector 和 Task 任务的运行、停止、更新等。Connector 一般会分为源连接器和目标连接器两种,功能上分别表示将数据导入、导出消息队列。Transforms 的作用是通过配置化的参数,完成对数据的清洗和转换等操作。

业界主流消息队列 Kafka、RocketMQ、Pulsar 都支持连接器的概念,组件名称分别是Kafka Connector、RocketMQ Connector、Pulsar IO。从技术原理上看,设计思路都是一致的,核心都是一个分布式的任务调度平台,并在平台上执行分布式的 Source 和 Sink 任务。

思考题

课程中我们提到,分布式任务调度平台的作用就是负责任务的运行、调度、启停。从功能上来看,像 Spark/Flink、Mesos 等分布式任务调度平台都具备这个能力,为什么主流的消息队列还要自己独立开发 Runtime 呢?

欢迎分享你的想法,如果觉得有收获,也欢迎你把这节课分享给感兴趣的朋友。我们下节课再见!

上节课思考闭环

事件源底层的存储模型,是所有事件源存储在一起,还是每个事件源进行独立的分区或Topic存储,使用哪种方案更合理呢?它们各自存在什么问题?又如何解决?

如果所有的事件数据都存储在同一个Topic或者分区里面,此时一旦某个事件源的数据很大导致消费堆积时,必然会影响其他事件源数据的处理。而这个问题是无法根治的,所以合理的方案是,每个事件源的底层都是单层存储。

那么使用独立的存储模型,应该选择每个事件源一个Topic,还是每个事件源一个分区呢?

从技术上来看,我建议你选择每个事件源一个独立的Topic存储。当数据量小时,每个Topic只有一个分区。当数据量大时,可以扩容 Topic 的分区数。

此时如果还出现消费堆积的话,可以通过扩容分区、调整消费者的消费模型来提高消费速度。

从技术上来看,需要拆分集群缓存数据。比如可以根据固定的容量拆分,根据Topic 数、分区数等,或者根据大客户、中长尾客户进行拆分。