31 Dataflow(三):一个统一的编程模型
你好,我是徐文浩。
在过去的几讲里,我们看到了大数据的流式处理系统是如何一步一步进化的。从最早出现的S4,到能够做到“至少一次”处理的Storm,最后是能够做到“正好一次”数据处理的MillWheel。你应该能发现,这些流式处理框架,每一个都很相似,它们都采用了有向无环图一样的设计。但是在实现和具体接口上又很不一样,每一个框架都定义了一个属于自己的逻辑。
S4是无中心的架构,一切都是PE;Storm是中心化的架构,定义了发送数据的Spout和处理数据的Bolt;而MillWheel则更加复杂,不仅有Computation、Stream、Key这些有向无环图里的逻辑概念,还引入了Timer、State这些为了持久化状态和处理时钟差异的概念。
和我们在大数据的批处理看到的不同,S4、Storm以及MillWheel其实是某一个数据处理系统,而不是MapReduce这样高度抽象的编程模型。每一个流式数据处理系统各自有各自对于问题的抽象和理解,很多概念不是从模型角度的“该怎么样”抽象出来,而是从实际框架里具体实现的“是怎么样”的角度,抽象出来的。
不过,我们也看到了这些系统有很多相似之处,它们都采用了有向无环图模型,也都把同一个Key的数据在逻辑上作为一个单元进行抽象。随着工业界对于流式数据处理系统的不断研发和运用,到了2015年,仍然是Google,发表了今天我们要解读的这一篇《The Dataflow Model》的论文。
那么,在学完这一讲之后,我希望你能够对过去几讲的论文进一步融会贯通,能够做到:
- 从抽象模型的角度来理解流式数据处理,而不仅仅是从系统框架如何实现的角度,思考流式数据处理。
- 掌握Dataflow里ParDo、GroupByKey,以及窗口、触发器和增量数据处理这些概念。
- 能够把这些概念和之前学习过的S4、Storm,以及MillWheel系统联系起来,思考如何在这些系统上扩展,以实现Dataflow的编程模型。
Dataflow的基础模型
Dataflow的核心计算模型非常简单,它只有两个概念,一个叫做ParDo,顾名思义,也就是并行处理的意思。另一个叫做GroupByKey,也就是按照Key进行分组数据处理的问题。
ParDo,地位相当于MapReduce里的Map阶段。所有的输入数据,都会被一个DoFn,也就是处理函数处理。但是这些数据,不是在一台服务器上处理的,而是和MapReduce一样,会在很多台机器上被并行处理。只不过MapReduce里的数据处理,只有一个Map阶段和一个Reduce阶段。而在Dataflow里,Pardo会和下面的GroupByKey组合起来,可以有很多层,就好像是很多个MapReduce串在一起一样。
而GroupByKey,地位则是MapReduce里的Shuffle操作。在Dataflow里,所有的数据都被抽象成了key-value对。前面的ParDo的输入和Map函数一样,是一个key-value对,输出也是一系列的key-value对。而GroupByKey,则是把相同的Key汇总到一起,然后再通过一个ParDo下的DoFn进行处理。
比如,我们有一个不断输入的日志流,想要统计所有广告展示次数超过100万次的广告。那么,我们可以先通过一个Pardo解析日志,然后输出(广告ID,1)这样的key-value对,通过GroupByKey,把相同的广告ID的数据分组到一起。然后再通过一个ParDo,并行统计每一个广告ID下的展示次数。最后再通过一个ParDo,过滤掉所有展示次数少于100万次的广告就好了。
理解流批一体
那么这样看起来,Dataflow不就是个MapReduce吗?它无非是可以把多个MapReduce的过程串接在一起就是了。当然,答案并没有那么简单,因为在Dataflow里,我们还有一个很重要的维度没有加入进来,这个维度就是时间。
Dataflow里的GroupByKey,会把相同Key的数据Shuffle到一起供后续处理,但是它并没有定义在什么时间,这些数据会被Shuffle到一起。
在MapReduce的计算模型下,会有哪些输入数据,是在MapReduce的任务开始之前就确定的。这意味着数据从Map端被Shuffle到Reduce端,只依赖于我们的CPU、网络这些硬件处理能力。而在Dataflow里,输入的数据集是无边界的,随着时间的推移,不断会有新的输入数据加入进来。
如果从这个角度来思考,那么我们之前把大数据处理分成批处理和流式处理,其实并没有找到两种数据处理的核心差异。因为,对于一份预先确定、边界明确的数据,我们一样可以使用流式处理。比如,我们可以把一份固定大小日志,放到Kakfa里,重放一遍给一个Storm的Topology来处理,那也是流式处理,但这是处理的有边界的数据。
而对于不断增长的实时数据,我们一样可以不断定时执行MapReduce这样的批处理任务,或者通过Spark Streaming这样看起来是流式处理,其实是微批(Mini-Batch)的处理方式。
事实上,即使是所谓的“流式”数据处理系统,往往也会为了性能考虑,通过微批的方式来提升性能。一个典型的例子,就是上一讲我们看过的MillWheel里的Checkpoint,就会在等待多条记录处理完之后批量进行。
一旦从这个视角来观察,那么批和流本身是一回事儿。当我们把“批(Batch)”的记录数限制到了每批一条,那么它就是所谓的流了。进一步地,MapReduce的“有边界(Bounded)”的数据集,也只是Dataflow的“无边界(Unbounded)”的数据集的一种特殊情况。所以,Jay Kreps才会在2014年提出流批一体的Kappa架构,而到了2015年的Dataflow,我们就看到了批处理本来就是流处理的一种特殊情况。
时间窗口的分配与合并
在MillWheel的论文里,我们已经看到了一个非常完善的流式数据处理系统了。不过,在这个流式处理系统里,对于“时间”的处理还非常粗糙。MillWheel的确已经开始区分事件的处理时间(Processing Time)和事件的发生时间(Event Time)了,也引入了时间窗口的概念。但是,对于计算结果何时输出,它仍然采用的是一个简单的定时器(Timer)的方案。而到了Dataflow论文里,对这些概念的梳理和抽象就变成了重中之重。
我们先来看一看时间窗口的概念,在流式数据处理里,我们需要的往往不是“统计所有的广告展示数量”,而往往是“每5分钟统计一次广告展示数量”,或者“统计过去5分钟的广告展示数量”。我们常用的时间窗口,也会分成好几种:
- 首先是固定窗口(Fixed Window)。比如,我们统计“每小时的广告展示数量”,那么我们的数据,就会被划分成0点到1点、1点到2点,这样一个个固定区间的窗口。
- 然后是滑动窗口(Sliding Window),也就是窗口随着时间的变动在“滑动”。比如,我们要统计“过去2分钟的广告展示”,那么我们的窗口并不是划分成12:0012:02,12:0212:04这样一段段。而是12:0012:02,然后一分钟之后变成12:0112:03,在这个例子里,2分钟被称之为窗口大小,而窗口每1分钟“滑动”一次,这个1分钟被称之为滑动周期。
- 最后是会话窗口(Session Window)。这个常常用在统计用户的会话上,对于会话的划分,往往是通过我们设置的两次事件之间的一个“超时时间”来定义的。比如,我们有一个客服聊天系统,如果用户和客服之间超过30分钟没有互动,我们就认为上一次会话结束了。在这之后无论是用户主动发言,还是客服主动回复,我们都会认为是进入了一个新的会话。
既然引入了时间窗口这个概念,相信你很容易理解,我们在Dataflow模型里,需要的不只是GroupByKey,实际在统计数据的时候,往往需要的是GroupByKeyAndWindow。统计一个不考虑任何时间窗口的数据,往往是没有意义的,1分钟内广告展示了100万次,和1个月内展示了100万次代表着完全不同的广告投放力度。我们需要根据特定的时间窗口,来进行数据统计。
而在实际的逻辑实现层面,Dataflow最重要的两个函数,也就是AssignWindows函数和MergeWindows函数。每一个原始的事件,在我们的业务处理函数之前,其实都是(key, value, event_time)这样一个三元组。而AssignWindows要做的,就是把这个三元组,根据我们的处理逻辑,变成(key, value, event_time, window)这样的四元组。
需要注意,一个事件不只可以分配给一个时间窗口,而是可以分配给多个时间窗口。比如,我们有一个广告在12:01展示给了用户,但是我们统计的是“过去2分钟的广告展示”,那么这个事件,就会被分配给[12:00, 12:02)和[12:01, 12:03)两个时间窗口,我们原先一条的事件就可以变成多条记录。
而在有了Window的信息之后,如果我们想要按照固定窗口或者滑动窗口统计数据,我们可以很容易地根据Key+Window进行聚合,完成相应的计算。
但是,有些窗口函数的计算并不容易,比如我们前面讲过的第三种会话窗口,每个事件的发生时间都是不一样的。那么这个时间窗口就很难定义。
而Dataflow里的做法,是通过AssignWindows+MergeWindows的组合,来进行相应的数据统计。我们还是以前面说的,客服30分钟没有互动就算作超时的例子来看看。
- 因为我们要根据同一个用户的行为进行分析,所以Key自然是用户ID了。那么对应的Value里,我们可以记录是用户发送的消息还是客服回复的消息,以及对应的消息内容。而event_time,则是实际消息发送的时间。
- 对于每一个事件,我们进行AssignWindows的时候,都是把对应的时间窗口,设置成$[event_time, event_time+30)$。也就是事件发生之后的30分钟超时时间之内,都是这个事件对应会话的时间窗口。
- 而在同一个Key的多个事件,我们可以把这些窗口合并。对于会话窗口,如果两个事件的窗口之间有重合部分,我们就可以把它们合并成一个更大的时间窗口。而如果不同事件之间的窗口没有重合,那么这两个事件就还是两个各自独立的时间窗口。在所有的事件合并完成之后,我们只需要去数有几个时间窗口,就能知道有几个会话了。
比如同一个用户下,有三个事件,发生的时间分别是13:02、13:14、13:57。那么分配窗口的时候,三个窗口会是$[13:02,13:32)$,$[13:14,13:44)$以及$[13:57,14:27)$。前两个时间窗口是有重叠部分的,但是第三个时间窗口并没有重叠,对应的窗口会合并成$[13:02,13:44)$以及$[13:57,14:27)$这样两个时间窗口。
窗口的分配和合并功能,就使得Dataflow可以处理乱序数据。相同的数据以不同的顺序到达我们的计算节点,计算的结果仍然是相同的。并且在这个过程里,我们可以把上一次计算完的结果作为状态持久化下来,然后每一个新进入的事件,都按照AssignWindows和MergeWindows的方式不断对数据进行化简。
你可以来看下论文里的图5,这个图有助于你去理解Dataflow是如何通过它的一些基础操作,来完成对应的数据化简和统计的。
触发器和增量数据处理
这样一来,有了对应的窗口函数逻辑,如果我们的输入数据是确定的,能够一次性都给出来,我们就很容易统计会话数这样的数据了,即使数据是乱序的也没有关系。但是,在实际情况里,我们的输入数据是以流的形式传输到每个计算节点的。并且,我们会遇到延时、容错等情况,所以我们还需要有一个机制告诉我们,在什么时候数据都已经到了,我们可以把计算结果向下游输出了。
在MillWheel的论文里,我们是通过计算一个低水位(Low Watermark)来解决这个问题的。我们会根据获取到的低水位信息,判断是否该处理的事件都已经处理完了,可以把计算结果向下游发送。
但是,这个基于水位的方法在实践中,必然会遇到这样两个问题:
- 第一个,在实际的水位标记之后,仍然有新的日志到达。比如,水位信息告诉我们最早的还没有处理的日志是12:01的,那么我们自然可以把12:00的统计数据发出去。但是,很有可能一分钟后,我们收到了一条12:00的日志数据,这是因为之前某一个节点挂掉了,恢复传输花了一些时间。那么在这种情况下,我们已经往下游发送的数据就是不准确的。而这种情况,对于数据准确性要求高的需求来说,比如广告计费,就让人难以接受。
- 第二个,我们的水位标记,因为需要考虑所有节点。只要有一条日志来晚了,我们的水位就会特别“低”,导致我们迟迟无法输出计算结果。比如,虽然已经到了12:00了,但是我们就是偶尔会出现一条11:05的日志,那么我们的水位一直会卡在11:05,计算结果就会迟迟不能向下游发送。
那么,Dataflow里,是怎么解决这个问题的呢?答案是Lamdba架构。
这里的Lambda架构,并不是需要去搭建一个数据的批处理层,而是利用Nathan Marz的Lambda架构的核心思想,就是我们可以尽快给出一个计算结果,但是在后续根据获得的新的数据,不断去修正这个计算结果。而这个思路,在Dataflow里,就体现为触发器(Trigger)机制。
在MillWheel里,我们向下游输出数据,只能通过定时器(Timer)来触发,本质上也就是通过“时间”这一个维度而已。这个定时器,在Millwheel里其实就被改造成了完成度触发器,我们可以根据当前的水位和时间,来判断日志处理的进度进而决定是否触发向下游输出的动作。而在Dataflow里,除了内置的基于水位信息的完成度触发器,它还能够支持基于处理时间、记录数等多个参数组合触发。而且用户可以实现自定义触发器,完全根据自己的需要来实现触发器逻辑。
PCollection<String> pc = ...;
pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.discardingFiredPanes());
来自Apache Beam的文档教程。
我们可以看一下Apache Beam项目里的一段示例代码。可以看到,在这段代码里,先是设立了一个1分钟的固定窗口。然后在触发器层面,则是设置了在对应的窗口的第一条数据被处理之后,延迟一分钟触发。在Apache Beam的文档里,你还能看到更多不同的触发器策略,你也可以根据自己的需要,来撰写专属于你自己的触发器代码。
而除了确定对应的数据计算什么时候触发,你还可以定义触发之后的输出策略是什么样的。
首先是抛弃(Discarding)策略,也就是触发之后,对应窗口内的数据就被抛弃掉了。这意味着后续如果有窗口内的数据到达,也没法和上一次触发时候的结果进行合并计算。但这样做的好处是,每个计算节点的存储空间占用不会太大。一旦触发向下游输出计算结果了,现有的数据我们也就不需要了。比如,一个监控系统,根据本地时间去统计错误日志的数量并告警,使用这种策略就会比较合适。
然后是累积(Accumulating)策略,也就是触发之后,对应窗口内的数据,仍然会持久化作为状态保存下来。当有新的日志过来,我们仍然会计算新的计算结果,并且我们可以再次触发,向下游发送新的计算结果,而下游也会用新的计算结果来覆盖掉老的计算结果。
这个是一个典型的Lambda架构的思路。我们一般的统计数据,都可以采用这个策略。一方面,我们会尽快根据水位信息,把计算结果发送给下游,使得计算结果的延时尽可能得小。另一方面,在有新的数据过来的时候,我们也会重新修正计算结果。
最后是累积并撤回(Accumulating & Retracting)策略,也就是我们除了“修正”计算结果之外,可能还要“撤回”计算结果。还是以前面的客服会话为例:
- 原本我们先收到了三个事件,13:02、13:14、13:57,根据30分钟的会话窗口,我们的逻辑计算完,窗口就变成了$[13:02,13:44)$以及$[13:57,14:27)$这样两个时间窗口。并且,这两个会话分别作为两条记录,向下游的不同计算节点下发了。
- 这个时候,我们又接收到了一条姗姗来迟的新日志,日志的时间是13:40。那么,根据我们的业务逻辑,这个用户其实只有一个会话$[13:02,14:27)$。所以,我们不仅要向下游发送一个新会话出去,还需要能够“撤回”之前已经发送的两个错误的会话。
当然,这只是我们最理想的状况,抛弃和累积这两种策略并不难实现,但是累积并撤回并不容易实现,即使在2021年的今天,Apache Beam也还没有支持撤回(Retraction)功能。不过,即使你从没有使用过对应的功能,你也需要理解为什么我们需要这样的功能。因为没有这个功能的话,我们的计算结果的正确性,在有些情况下是保障不了的。从这个角度来看,Lambda架构仍未彻底过时。
小结
好了,到这里,我们也算是为整个课程里,大数据的流式处理画上一个句号了。随着时代洪流滚滚向前,Google也针对自己发表的Dataflow这个编程模型,孵化出了Apache Beam这个项目。而在这个时间节点之后,像Apache Flink这样的开源流式处理项目,也都向Dataflow的编程模型靠拢,并实现了Apache Beam的接口。
在Dataflow的论文里,Google把整个大数据的流式处理,抽象成了三个概念。第一个,是对于乱序数据,能够按照事件发生时间计算时间窗口的模型。第二个,是根据数据处理的多维度特征,来决定计算结果什么时候输出的触发器模型。第三个,则是能够把数据的更新和撤回,与前面的窗口模型和触发器模型集成的增量处理策略。
Dataflow不是一篇介绍具体系统实现的论文,而是一篇更加高屋建瓴,从模型角度思考无边界的大数据处理应该如何抽象的论文。
像MapReduce一样,Dataflow是一个抽象的计算模型而不是一个具体的系统实现。用MapReduce的时候,你并不需要Google的C++的原版实现,而完全可以用Java写的Hadoop。而在Dataflow这里,Google更进一步,不仅给出了整个的计算模型,后续还推动了Apache Beam这个项目,希望能让流式数据处理的接口统一。无论你的底层实现是什么,只要能够按照Dataflow里的语义实现对应的接口,那么就算是别人来撰写代码,也都可以实现相同的计算结果。
推荐阅读
想要对大数据的流式处理有深入的了解,我们必须要读的一本书就是《Streaming Systems》。这本书的作者泰勒·阿克道(Tyler Akidau)也是这篇Dataflow论文的作者。我在之前的几讲里,已经推荐过这本书了,目前国内也已经出版了影印本。如果你想深入大数据领域的研发,特别是流式数据处理这个领域,这本书你一定要买回来好好研读一下。
思考题
我们说,Dataflow已经是一个流批一体的计算模型了,有边界的数据也只是无边界的流式数据的一种特殊情况。对于有边界的固定数据,我们当然可以通过重放日志把数据给到Dataflow系统。那么在窗口和触发器层面,我们应该用什么窗口和触发器,来得到我们想要的计算结果呢?
欢迎在留言区分享你的答案和思考,也欢迎你把今天的内容分享给更多的朋友。
- Gavin 👍(3) 💬(0)
window merging 中结果有误(文中位置为:论文中的图5,如何通过AssignWindows和MergeWindows来进行数据计算,数据乱序也不影响计算结果) ,取的是window end, 而非window start,对应Google 论文应该是Figure 4 https://storage.googleapis.com/pub-tools-public-publication-data/pdf/43864.pdf
2022-01-19 - 那时刻 👍(2) 💬(1)
对于有边界的固定数据,我们当然可以通过重放日志把数据给到 Dataflow 系统, 我之前采用的是global window以及default trigger来处理的
2021-12-22 - piboye 👍(1) 💬(0)
sql支持这些功能了吗?
2021-12-26 - 在路上 👍(0) 💬(0)
徐老师好,DataFlow论文第3.2节Design Principles,提到Support robust analysis of data in the context in which they occurred,数据的健壮分析是指什么?
2021-12-24