原文

闲话少说,下面进入主题。先总结下,上次的内容主要分三块:

  • 术语,定义了我所谓的“流”(streaming)到底是什么意思。
  • 批与流,比较了这两种类型系统的理论功能,并提出了两个可以让流式系统超越批处理系统的概念:正确性和时间推理工具。
  • 数据处理模式,介绍了批处理和流式系统用于处理有限及无限数据的基本方法。

本文中,将继续关注上次说的数据处理模式,而且通过具体实例来介绍细节。下面将本文主要分为两个部分:

  • **Streaming 101 回顾:**简单回顾之前介绍的一些概念,并用一个实例来加以说明。
  • Streaming 102:作为上篇的配套教程,详细介绍了处理无限数据的其他概念,并继续用具体案例来解释。

通过本文,您将掌握一个健壮的无序数据处理所需的核心原则和概念,同时拥有让您超越批处理的时间推理工具。

为了让您对于实现方式有个直观的认识,我将会使用 Dataflow SDK 的代码片段(即 Google Cloud Dataflow 的 API),并辅以动画让这些概念可视化。我使用的是 Dataflow SDK 而不是人们所熟悉的 Spark Streaming 或者 Storm 等,原因在于还没有一个别的系统更加适合用于表达我想介绍的例子。不过好消息是有些项目正在朝着这个方向努力。更好的消息是,我们(谷歌)如今已经(联合 data Artisans、Cloudera、Talend 等公司)向 Apache 软件基金会提交了一份创建 Apache Dataflow 孵化项目的议案。希望围绕 Dataflow 模型 提供的健壮的无序数据处理语义来构建一个开放的社区及生态。这让我很期待 2016 年。擦,跑题了。

不过这篇文章缺少了我上次承诺的比较部分。对不起,我低估了我所要将的内容究竟有多少,究竟要花多少时间。但是呢,我又不知道该把所拖延的和该拓展的内容放在哪里。如果能安慰您的话呢,其实我在2015年的新加坡的 Strata + Hadoop World 做了一个演讲,题目是“大规模数据处理的演进”(2016年会在伦敦的 Strata + Hadoop World 做一个更新版本的演讲),其中涵盖了一些我之前想做的承诺的部分;这里有当时的 PPT。虽然不是完全一样,但是值得一读。

好,现在我们开始看“流”。

回顾与路线图

在Streaming 101中,首先澄清了一些术语,介绍了有限数据 VS 无限数据。有限数据源具有有限的大小,通常被称为“批处理”数据。无限数据源具有无限大小,通常被称为“流”数据。在后边将会尽量避免使用批处理和流式来修饰数据源,因为这些名称有其局限性并容易让人误解。

然后,解释了批处理流式处理引擎之间的区别:批处理引擎是设计优先考虑有限数据(现在批处理引擎也提供了micro-batch的方式处理流式数据),而流式处理引擎设计用于处理无限数据。我只会在描述执行引擎时使用批处理和流式处理。

定义完术语之后,介绍了与处理无限数据有关的两个重要的基本概念。首先阐述事件时间(事件发生的时间)和处理时间(数据在系统中被处理的时间)之间的关键区别。这为Streaming 101中提出的主要论文提供了基础:如果关心事件实际发生时间,则必须基于事件的事件时间,而不是处理时间。

接下来介绍了窗口的概念(即沿时间边界切分数据集),这是一种常用的方法,用于应对无限数据源的数据处理,无限数据源理论上永远不会结束。窗口策略的最常见且简单的例子是固定和滑动的窗口,更复杂的窗口类型,例如会话窗口(其中窗口由数据本身的特征决定,捕获每个用户的活动会话窗口,会话窗口之后紧接着是用户的不活动期)也比较广泛的用法。

除了前文中介绍的概念,现在再介绍3个新的概念:

  • **水位线:**水位线是相对于事件时间的输入完整性的概念。水位线表示一个时间 X,表示所有事件时间 <X 的所有数据都到齐了。因此,当处理无限数据源时,水位线可以作为进度的度量。
  • **触发器:**触发器是一种由外部条件触发、表明何时计算窗口结果的机制。触发器可以让我们灵活的选择何时计算结果并发送给下游,而且随着数据的不停的到来,窗口可以产生多次输出。所以,窗口结束前可以先提供近似结果,并且能够灵活应对上游数据的变化(可能是上游发送的数据修正)或者数据延迟到达(例如,移动场景在某人的离线时,某人的电话记录了各种动作及其事件时间,然后在重新获得连接时继续上传这些事件进行处理)。
  • **累积:**累积模式指定在同一窗口中观察到的多个结果之间的关系。这些结果可能完全相互之间完全独立,或者它们之间可能存在重叠。不同的累积模式具有不同的语义和与计算成本,适用于不同的场景。

最后,在回答无限数据处理中的4个问题时,更容易搞清楚这些概念和它们之间的关联关系:

  • **What****计算的结果是什么?**这个由 Pipeline 中的转换来决定。例如计算总和、构建直方图、训练机器学习模型等等。它也是经典批处理需要回答的问题。
  • **Where****在事件时间中的哪个位置计算结果?**这个一般由在 Pipeline 中使用事件时间窗口来决定。这包括从Streaming 101(固定,滑动和会话)窗口的常见示例,和一些貌似没有窗口概念的用例(例如Streaming 101中的时间无关的处理;经典批处理也通常属于此类)以及其他更复杂的窗口类型,如限时拍卖。此外,如果在记录到达系统时将入口时间指定为事件时间,还可以包括处理时间窗口。
  • When 在处理时间中的哪个时刻触发计算结果?我们通过使用水位线和触发器来回答的这个问题。这里的场景比较多样,但最常见的模式是在给定窗口的输入完成时使用水位线来描绘,触发器允许提前计算结果(对于在窗口完成之前发出的推测性的、部分的结果)和延迟计算结果(水位线只是预估窗口的数据全部到达,并不是100%确定,在水位线声明给定窗口的全部到达之后,也有可能会有隶属于该窗口的数据到达)。
  • How 如何修正结果?这个问题由所使用的累积类型回答:丢弃(其中结果是相互独立和不同的),累积(后来的结果建立在先前的结果上),累积和撤回(当前的累积值和上次触发的值撤回一起发送)。

后边将一一讨论这些问题,试图让大量清楚哪些概念与What/Where/When/How中的哪个问题有关。

Streaming 101 回顾

首先,回顾一下Streaming 101中提出的一些概念,这一次还将提供一些具体的例子使这些概念更具体。

What: 变换

经典批处理中 Transform 解决了以下问题:“要计算什么结果?”许多人可能已经熟悉经典的批处理,所以我们将以它为基础,加入其他概念,以便于理解。

对于这一部分,我们来看一个示例:计算由10个整数值组成的简单数据集的总和。这么说有点抽象,在实际中,可以想象一个游戏,10个人组成一个团队,每个人的最终得分相加,就是团队的成绩。也可以想象计费和使用情况的监控使用情况这样的场景。

对于每个示例,将包括一个简短的 Dataflow Java SDK 伪代码片段,以使 Pipeline 的定义更具体。因为是伪代码,所以有时会省略细节(如使用具体的I / O源)、使用简称(Java中的当前触发器名称太冗长)。除了这些(大部分我在 Postscript 中明确列举)的小事情之外,其它基本上是真实的 Dataflow SDK代码。稍后还将提供一个链接到实际的代码演练,可以编译和运行自己的类似例子感兴趣的人,可以实际尝试一下。

如果熟悉像 Spark Streaming 或 Flink 这样的计算引擎,那么在看 Dataflow 示例代码的时候就会容易一些。接下来开始让人崩溃的旅程,在Dataflow中有两个基本的原语:

  • PCollection,表示可能要执行并行转换(因此以“P”打头)的数据集。
  • PTransform,处理 PCollection 并创建新的 PCollection。PTransform 可以执行元素转换,它们可以将多个元素聚合在一起,或者它们可以是其他 PTransform 的组合。

img

图1. 转换类型

如果觉得不甚明白或者想刨根问底,可以看看 Google Dataflow Java SDK 文档

为了对例子说明,假设我们从一个 PCollection<KV<String,Integer>>命名为“input”(即由一个键/值对的字符串和整数组成的 PCollection,其中字符串是类似团队名称,整数是相应团队中个人的得分)。在现实世界的 Pipeline 中,通过从 I/O 源读取 PCollection 原始数据(例如日志记录)获得输入,将日志记录解析为适当的键/值对,然后将其转换为 PCollection<KV<String,Integer>> 。为了在第一个例子中清楚起见,将包括所有这些步骤的伪代码,但是在随后的例子中,删除了 I/O 和解析部分。

因此,对于简单地从 I/O 源读取数据的管道,解析出团队/分数键值对,并计算每队的得分数,代码如下:

PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Sum.integersPerKey());

清单1. 计算总和的 Pipeline

从数据源读取键值对数据,键是String类型(团队名称),值是Integer类型(团队中各人得分)。然后对每个键(团队)计算总和(团队得分),并输出。

对于所有的示例,在每个示例的 Pipeline 的代码片段之后,有1个该Pipeline执行的动画示例。动画中看到一个 Key(即一个团队)的10条输入数据执行Pipeline的样子;在一个实际的Pipeline 中,类似的操作将有成千上万个,在很多台机器上并行执行。为了能清晰的说明,示例中演示了1个Key的执行过程。

动画中每个点代表一个输入或输出数据,输入和输出都有两个时间维度:事件时间(X轴)和处理时间(Y轴)。因此,Pipeline 按照处理时间维度执行,从下向上延伸,如加粗的上升白线所示。输入是圆圈,圆圈内的数字代表该特定记录的值。输入开始是灰色圆圈,随着Pipeline处理变换颜色。

当 Pipeline 处理到某一个值的时候,会将其累加并记录在 State 中,并最终将聚合结果作为输出。State和输出由矩形表示,矩形顶部不断变化的数字表示累加值,矩形覆盖的区域表示到当前处理时刻,所有矩形中的数据已经被处理。对于清单1中的 Pipeline,在经典批处理引擎上执行时,会看起来像这样(请注意,点击下面的图片启动动画,然后会一直循环播放):

image-20200924163454383

图2. 经典的批处理

上边是在批处理引擎上执行Pipeline,累加输入的数据,直到所有输入处理完毕(由顶部的虚线绿色线表示),此时输出为51。在此示例中,因为没有应用任何特定的窗口,所以在事件时间维度上计算了整个数据集的总和;因此,用于累加的State和输出的矩形覆盖X轴的整体。但是,如果要处理一个无限数据源,那么经典批处理将是不够的,不能等待输入结束,因为它实际上永远不会结束。所以需要的使用在Streaming 101中提到的一个概念是---窗口。因此,想要回答第二个问题“在事件时间的哪个位置计算结果?”,现在先简要回顾一下窗口。

Where: 窗口

窗口是把数据在时间上进行切分。常见的窗口策略有:固定窗口、滑动窗口、会话窗口。

img

图3. 窗口策略

为了更好地了解实际中的如何使用窗口,来看一下的上边提到的整数求和Pipeline,使用了长度为2分钟的时间窗口。使用 Dataflow SDK,只需要简单的添加 Window.into 转换即可(下面代码中的第二行):

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

清单2. 使用窗口的求和代码

回想一下,Dataflow提供了一种统一的批处理和流式处理模型,因为语义上,批处理只是流式处理的一个子集。因此,首先在批处理引擎上执行此Pipeline;在批处理引擎上执行此Pipeline,原理简单明了,可以很好的跟在流式处理引擎上执行做对比。

image-20200924163551278

图4. 批处理上窗口的求和

如前所述,输入在不断累加存储在State中,直到输入被完全处理,然后产生输出。然而,在这种情况下,不是1个输出,而是4个:在时间上切分成了4个事件时间窗口,每个窗口产生一个输出。

到此为止,我们重新回顾了Streaming 101中引入的两个主要概念:事件时间和处理时间之间的关系以及窗口。再进一步,开始阐述本节开头提到的新概念:水位线,触发器和累积。下边开始Streaming 102。

Streaming 102

在上边我们看到了在批引擎上执行使用了窗口的Pipeline。理想情况下,我们希望具有较低的处理延迟,同时也希望能涵盖对无限数据集的处理。切换到流式引擎是朝着正确方向迈出的一步。批处理引擎可以明确的知道,每个窗口的数据是否完整了(即,有限数据集中所有的数据都被处理了),但目前缺乏确定无限数据集的完整性的实用方法。接下来介绍水位线。

When: 水位线

水位线是问题答案的前半部分:“在处理时间的什么时候计算窗口的结果?” 水位线是事件时间中输入完整性的时间概念。换句话说,它们是系统根据当前处理的数据的事件时间判断处理进度和完整性的方法(有限或无限数据集,在无限数据的情况下作用更为明显)。

回想一下 Streaming 101 中这个图,这里稍作修改,其中描述了事件时间和处理时间之间的偏差,现实中大多数的分布式数据处理系统中的时间偏差一直在变化。

img

图5. 事件时间进度、偏差、水位线

图中表示现实(reality)的红色曲线本质上是水位线。随着处理时间的推移,它跟踪事件时间完整性的进度。在概念上,可以将水位线视为函数 F(P) E,在处理时间中选取一个点,返回事件时间的一个点。(更准确地说,对函数的输入实际上是在 Pipeline 中观察到水位线的点上游的一切的当前状态:输入源,缓冲数据,正在处理的数据等;但在概念上,将其视为从处理时间到事件时间的映射更简单。)在事件时间点 E 上系统会认为事件时间小于 E 的所有数据都到齐了。换句话说,这是一个断言,不再有事件时间小于 E 的数据。根据水位线的类型:完美的或启发式的,这种断言可能是严格的保证或者经过训练的估计:

  • **理想的水位线:**在完全了解所有输入数据的情况下,可以构建理想的水位线;在这种情况下,没有延迟数据;所有数据都提前或准时到达。
  • **启发式水位线:**对于许多分布式输入源,完全了解输入数据是不切实际的,在这种情况下,最佳选择是提供。启发式的水位线使用任何有关输入的信息(分区,分区内排序,文件增长率等),以提供尽可能准确的进度估计。在许多情况下,启发式水位线可以预测的非常准确。即使如此,使用启发式水位线意味着它有时可能是不正确的的,这将导致有些数据被划分为延迟数据。我们将在下面的触发器部分中了解如何处理延迟数据。

水位线是一个有趣和复杂的话题,未来会写一篇新的文章专门阐述。现在,为了更好地了解水位线的作用以及缺点,我们来看一下使用水位线的流式处理引擎的两个例子,以确定何时在清单2中执行使用窗口的Pipeline时实现输出。左边的例子使用理想的水位线;右边的一个使用启发式水位线。

image-20200924163653862

图6. 理想的与推测的水位线

在这两种情况下,当水位线通过窗口的末尾时,窗口被触发计算。两个执行的主要区别在于右侧的水位线计算中使用的启发式算法,值9因为迟到的问题而没有被计算在内,这大大改变了水位的形状[3]。这些例子突出了水位线的两个缺点:

  • 太慢:当任何类型的水位线,由于已知的未处理数据(例如,由于网络带宽约束而缓慢增长的输入日志)被正确地延迟时,如果结果的计算只依赖水位线的触发,将直接导致输出结果的延迟。

    这在左图中是最明显的,即迟到的9阻止所有后续窗口的水位线,即使这些窗口的输入数据已经更早的达到并且是完整的了。第二个窗口[12:02,12:04]尤其明显,从窗口中的第一个值到达到窗口计算并输出结果的时间需要将近七分钟。这个例子中的启发式水位线要稍微好一点(五分钟直到输出),但这不意味着启发式水位线从来不会受到其他水位线滞后的影响;在本例子选择了特殊的数据突出了这种对比。

    这里的重点在于:水位线提供了一个非常有用的完整性概念,从延迟的角度来看,只考虑完整性是不够的。想象一下一个仪表板,显示重要的指标,按小时或天显示。我们不太可能想等待一整个小时或一天才能查看当前窗口的结果;这是使用经典批处理系统为这种系统提供数据的痛点之一。相反,随着输入的演变和最终的完成,这些窗口的结果会随着时间的推移而持续并不断的更新更好一些。

  • **太快:**当一个启发式水位线比实际的水位线更快的向前推进时,会导致原来没有延迟的数据变成了延迟数据。这是在右边的例子中发生的情况:在第1个窗口的输入数据全部到达之前,水位线进入第1个窗口的末尾,导致错误的输出值为5而不是14。这个缺点是严格的启发式水位线的问题;既然是启发式就意味着有时会是错误的。因此,如果关心正确性,单纯依靠水位线确定何时计算结果并发出是不够的。

在Streaming 101中,对关于无限数据流的强大的无序处理不足的完整性概念作了一些强调。水位线太慢或太快,是这些论据的基础。完全依赖于完整性概念的系统无法同时获得低延迟和正确性。触发器是解决这些缺点解决方案。

When**: 触发器的伟大之处就在于触发器本身很伟大**

触发器是问题的另一半答案:“在处理时间的什么时候该计算窗口的结果并发出?”触发器用来表明在处理时间的什么适合该计算窗口的结果(尽管触发器本身可能会根据其他时间触发,例如在事件时间维度上使用水位线)。窗口的每个特定输出都称为窗口的窗格(pane)。

用于触发的信号的示例包括:

  • 水位线进度(即事件时间进度),是图6中我们已经看到的隐式版本,当水位线通过窗口的末尾时,计算结果并输出[4]。另一个用例是在窗口生命周期的末尾时触发垃圾回收,稍后我们将看到一个例子。
  • **处理时间进度,**可以用于提供固定的周期更新,因为处理时间(不像事件时间)总是均匀地,没有延迟地进行。
  • 元素计数,可以用于在窗口积累固定数量的元素后触发。
  • 带标记的,或其他数据依赖触发器,即记录或记录的特性(例如,EOF元素或刷新事件)意味着需要产生输出的地方。

除了基于具体信号触发的简单触发器之外,还有复合触发器,允许创建更复杂的触发逻辑。还有些复合触发器:

  • 重复触发器,重复触发器特别适用于处理时间触发器以提供定期更新。
  • AND 触发器(逻辑AND),所有子触发器都符合触发条件才触发(例如,在水位线通过窗口结束之后,我们观察到一个终止的标记记录),它才会触发。
  • Or 触发器(逻辑或),子触发器中的任何一个符合触发条件都会引起触发(例如,在水位线通过窗口结束之后或我们观察到终止的标记记录)时。
  • 序列触发器,以子触发器按照预定义的顺序触发子依次触发。

为了使触发器的概念更具体,继续介绍图6中使用的隐式默认触发器,将其添加到清单2中的代码中:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(AtWatermark()))
  .apply(Sum.integersPerKey());

清单3. 显式设定默认触发器

对触发器能够做什么有了基本的了解,可以开始考虑解决水位线太慢或太快的问题。在这两种情况下,我们希望在水位线超过窗口末尾之前或之后能够有机会计算窗口的结果,并能够提供持续更新的机制(除了水位线超过窗口末尾那一刻)。所以,需要一些多次触发触发器。那么问题就变成了:多次触发用来干什么?

太慢的情况下(即提供提前的推测结果),我们可能应该假设任何给定的窗口可能有稳定的输入数据,因为我们知道(通过窗口的早期阶段),我们观察到的这个窗口的输入是非常不完整的。因此,当处理时间提前(例如,每分钟一次)时周期性地触发可能是明智的,因为触发发射的数量将不依赖于实际观察到的窗口的数据量;在最坏的情况下,我们只会得到稳定的定期触发发射。

太快的情况下(即,由于启发式水位线可能存在错误的推测,所以需要一种机制去能够处理延时数据去修正计算结果),假设水位线基于相对准确的启发式(通常是相当安全的假设)。在这种情况下,预计不会经常看到延迟很久的数据,但是在实际中确实存在挺多延迟数据,不过结果很快会得到修正。每收到1个延时数据触发一次的策略,能够让我们更快的修正更新计算结果,但是由于预期的后期数据不频繁,应该不会给系统带来大的冲击。请注意,这些只是示例:如果有不同的应用场景,可以自由选择不同的触发器(或选择不适用触发器)。

最后,我们需要协调这些各种触发器的时间安排:提前,准时和延迟。我们可以使用Sequence 触发器和一个特殊的 OrFinally 触发器组合来实现,OrFinally触发器用来中止这个组合触发器。

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(Sequence(
                 Repeat(AtPeriod(Duration.standardMinutes(1)))
                   .OrFinally(AtWatermark()),
                 Repeat(AtCount(1))))
  .apply(Sum.integersPerKey());

清单4. 手动设定提前和推迟触发

如上所示,伪代码看起来不错,给出了提前、准时、延迟触发的常用模式,对应在 Dataflow 中,提供了一个定制(但语义上相当的)API,使得更容易使用这样的触发器,如下所示:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1))))
  .apply(Sum.integersPerKey());

清单5. 使用early/late api指定提前和延迟触发

在流式处理引擎上执行清单4 或清单5 中的示例(包含了理想的水位线和启发式的Watermak),如下所示:

image-20200924163747760

图7. 提前和延迟触发

这个版本对图6 有两个明显的改进:

  • 对于第二个窗口中的“水位线太慢”的情况,[12:02, 12:04]:我们现在每分钟提供一次定期的提前计算。在理想的水位线案例中,差异最为突出,其中时间到首次输出从近7分钟降至3分半;在启发式案例中也有明显改善。这两个版本现在都可以随着时间的推移而稳定地进行计算和修正计算结果(具有值7,14,然后是22的窗格),降低了从数据输入到得到计算结果之间的延迟。
  • 对于第一个窗口中的“启发式水位线太快”的情况,[12:00, 12:02]:当9的值延迟到达时,立即将其合并到一个值为14的新的修正的窗格中。

这些新触发器的一个有趣的副作用是,它们有效地使理想和启发式水位线版本之间的输出模式标准化。而图6 中的两个版本是截然不同的,而现在这两个版本看起来很相似。

此时剩下的最大差异是窗口生命周期。在理想的水位线案例中,当水位线超过窗口末尾后,窗口过期,窗口中的数据再也不会被处理,可以被安全的回收。在启发式水位线案例中,我们仍然需要保留窗口一段时间来处理延迟数据。但是到目前为止,系统没有任何好的方式知道每个窗口需要保留多长时间。这是最大允许延迟的用武之地。

When: 最大允许延迟(超过即可回收)

在谈到最后一个问题(“如何修正结果?”)之前,先来聊一下持续运行、乱序数据流式处理系统中的实际面对的问题:垃圾收集。在图7 中的启发式水位线示例中,每个窗口的持续状态在该示例的整个生命周期内都会持续;这是必要的,以便在到达时适当地处理延迟数据。但是,尽管能够保留所有持续状态,直到数据全部处理完毕。实际上,当处理无限数据源时,一直保留给定窗口的状态(包括元数据)通常是不切实际;最终会耗尽内存、磁盘等的空间。

因此,任何现实世界的无序处理系统都需要提供一些限制其正在处理的窗口的生命周期的方法。最简洁的实现方在系统内定义一个最大允许延迟的边界,即限制任何给定记录最晚到达时间(相对于水位线时间)不能超过这个时间;任何超过这个时间的数据不再处理,直接丢弃掉。定义了最大允许延迟之后,还需要准确地确定窗口需要保留的时间:直到水位线超过了窗口的末尾时间+最大允许延迟时间[5]。允许系统丢弃超过最大延迟的数据,还能够节省系统的计算资源。

由于最大允许延迟和水位线之间的相互作用有点晦涩,举个例子说明一下。我们来看一下清单5 /图7中的启发式水位线 Pipeline,并添加1分钟的最大允许延迟时间(请注意,之所以选择1分钟是为了更好的在图中说明概念,在现实世界中需要根据场景来确定合理的最大允许延迟时间):

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .withAllowedLateness(Duration.standardMinutes(1)))
  .apply(Sum.integersPerKey());

清单6. 带有最大允许延迟的提前和延迟触发

这个Pipeline的执行看起来像下面的图8所示,添加了以下功能来突出显示允许的延迟效应:

  • 加粗的水平方向的白线表示当前的处理时间,注释为Lateness Horizon的小刻度表示窗口的最大延时线(事件时间)。
  • 一旦水位线超过窗口的最大延迟线,该窗口将被回收,这意味着窗口的所有信息都将被销毁。
  • 虚线的矩形,表示窗口关闭时窗口覆盖的时间范围(在处理时间和事件时间两个维度上),一个小尾巴向右延伸,以表示窗口的延迟水平(与水位线对照)。

在这个图中,为第一个窗口添加了一个额外的延迟数据6。6相对窗口是迟到的,但仍然在允许的最大延迟时间范围内,所以6被累加修正计算结果为11。然而,9 超过了最大允许延迟,直接被丢弃掉。

image-20200924163828993

图8. 提前和推迟触发及允许延迟

关于最大迟延线的两个最后的说明:

  • 显然,如果从能够提供理想的水位线的资源中获取数据,则无需处理延迟数据,而0秒的最大延迟时间将是最佳的。这是我们在图7 的理想水位线部分中看到的。
  • 有一些例外情况不需要指定最大延迟,即使在使用启发式的水位线时,比如在数据覆盖的时间的范围内,对有限的key进行统计,(例如,数据覆盖的时间的范围内内,按照Web浏览器分组,统计网站的总访问次数)。在这种情况下,系统中活动窗口的数量受限于使用的Key的数量。只要Key的数量仍然很低,就无须通过设置最大允许的延迟时间来限制窗口的生命周期。

接下来继续讨论第4个也是最后一个问题。

How: 累积

随着时间推移,触发器被用于为一个窗口生成多个窗格,我们发现自己面临最后一个问题:“如何随着时间修正结果?”在我们已经看到的例子中,每个窗格建立在其前一个窗格基础之上。然而,实际上有三种不同的累积方式[6]:

  • 丢弃:每当窗格计算完毕时,任何存储的窗口状态都将被丢弃。这意味着每个窗格与之前的窗格都是相互独立的。当下游消费者本身正在执行某种累计时,例如当将整数发送到希望自己计算总和的系统时,丢弃模式是有用的,下游系统将数据累积在一起形成最后的结果。

  • 累积:如图7所示,每当窗格计算完毕时,保留该窗格所有的状态,未来输入的数据会累积到并更新现有状态。这意味着窗格是建立在前面窗格的基础之上的。以后的结果可以简单地覆盖以前的结果,例如在诸如BigTable或HBase的键/值存储中存储输出结果时,累积模式很有用。

  • 累积和撤回:和累积模式一样,但是当生成新窗格时,同时会为前一个窗格生成1个独立的撤回。撤回(与新的累积结果一起)本质上是在表达,“我以前告诉过你的结果是X,但我错了。撤回我上次告诉你的X,并将其替换为Y.“有两种情况,撤回是特别有用的:

    • 当下游消费者将不同维度的数据重新分组时,新值可能会与先前的值不同,因此最终会在不同的组中。在这种情况下,新值不能简单的覆盖旧值;需要从旧组中删除旧值,然后将新值添加到新组中。
    • 当使用动态窗口(例如,后边会有更详细的介绍)时,由于窗口合并,新值可能会替换多个旧窗口。在这种情况下,只从新窗口的信息中难以确定哪些旧窗口中需要撤回。对于旧的窗口进行明确的撤回使得任务变得简单明了。

放在一起看时,每个类型的累积语义会更清晰。考虑图7中第二个窗口的三个窗格(事件时间范围[12:02,12:04))。下表显示了三个支持的累积模式(每个窗格的值)将在三种支持的累积模式中显示(累积模式是图7中使用的特定模式):

img

表1. 使用图7的第二个窗口比较累积模式

  • **丢弃:**每个窗格仅包含在特定窗格中到达的值。因此,该窗口的最终值是最后一个窗格的值,而不是总和。但是,如果要自己计算所有独立窗格,将得到正确的答案22.这就是为什么丢弃模式下,下游消费者自己在窗格上执行聚合时很有用。
  • **累积:**如图7所示,每个窗格包含在该特定窗格中到达的值以及前一个窗格中的所有值。因此,最后一个窗格的值是该窗口所有值的总和22。但是,如果要自己累积该窗口的所有窗格,则会对来自窗格2和窗格1的输入进行双重和三重计数,给出的总和是不正确的51.这就是为什么累积模式是最有用的,可以简单地用新的值覆盖以前的值,新值已经包含了迄今为止收到的所有数据。
  • **累积和撤回:**每个窗格都包含一个新的累积模式值以及前一个窗格值的撤回。因此,观察到的最后(非撤回)值以及所有窗格(包括撤回)的总和是正确的答案22.这就是为什么撤回是如此强大。

下边的代码示例了丢弃模式:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

清单7. 提前/推迟触发的丢弃模式

在流式处理引擎上使用启发式水位线的效果如下所示:

image-20200924164229012

图9. 提前/推迟触发的丢弃模式

虽然输出的整体形状类似于图7的累积模式版本,但请注意,此丢弃版本中的任何一个窗格都不重叠。因此,每个输出与其他输出是独立的。

如果我们想看撤回的效果,那么这个变化将会是类似的(但是,请注意,Google Cloud Dataflow 的这个功能特性仍在开发中,所以这个API中的命名是临时的,可能与最终发布版本不同):

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

清单8. 使用提前/延迟触发的累积撤回模式

在流式处理引擎的执行如下所示:

image-20200924164301217

图10. 提前/推迟触发的累积撤回

由于每个窗口的窗格都是重叠的,所以看起来有点乱。撤回用红色表示,它与蓝色窗格结合重叠,看起来略带紫色。在一个给定的窗格中略微移动了两个输出的值(并用逗号分隔),使它们更容易区分。

将图9,7(启发式)和10的三张图动画效果的最终图放在一起,提供了三种模式的很好的视觉对比:

image-20200925160508695

图11. 丢弃模式、累积模式、累积和撤回模式的对比

从左到右是丢弃模式、累积模式、累积 & 撤回模式,三种模式需要的存储和计算成本依次递增。可以想像,在存储和计算成本方面,呈现的顺序(丢弃,累积,累积 & 撤回)的模式都相继更昂贵。为此,累积提供了一种新的维度,在正确性,延迟和成本的之间进行权衡。

小结

到此为止,我们接触了4个问题:

  • **What:要计算什么结果?**转换操作。
  • **Where:在事件时间中的哪个位置计算结果?**由窗口决定。
  • **When:在处理时间中的哪个时刻触发计算结果?**由水位线和触发器决定。
  • **How:如何修正结果?**通过各种累积模式。

我们目前只了解了一种窗口类型:基于事件时间的固定窗口。 从Streaming 101中我们提到了多种窗口,其中有两个是今天要详细阐述的:基于处理时间的固定窗口基于事件时间的会话窗口

以下链接:https://zhuanlan.zhihu.com/p/60236077

When/Where: 基于处理时间的窗口

处理时间窗口重要的原因有两个:

  • 对于某些使用情况,例如使用情况监控(例如,Web服务流量的 QPS),希望在收到数据流时分析数据,处理时间窗口绝对是适当的方法。
  • 对于事件发生时间很重要的(例如,分析用户行为趋势,计费,评分等)的场景,处理时间窗口绝对是错误的选择,要能够清晰的区分哪些场景合适。

因此,值得深入了解处理时间窗口和事件时间窗口之间的差异,特别是考虑到当今大多数流处理系统中广泛使用了处理时间窗口。

当使用类似于本文中提到的模型时,作为一等公民的窗口是严格基于事件时间的,有两种方法可以用来实现处理时间窗口:

  • **触发器:**忽略事件时间(即,使用跨越所有事件时间的全局窗口),并使用触发器在处理时间轴中提供该窗口的快照。
  • **进入时间:**将进入时间作为数据到达系统的事件时间,并从此开始使用正常的事件时间窗口。 目前 Spark Streaming 就是这么做的。

请注意,这两种方法或多或少等同,但在在多处理步骤Pipeline的情况下略有不同:在触发器版本中,每个处理步骤都使用处理时间切分窗口,步骤之间相互独立,因此例如窗口X中的数据为 一个阶段可能会在下一阶段的窗口 X-1 或 X+1 中; 在进入时间版本中,一旦将数据归于窗口X中,由于不同的处理步骤时间使用水位线同步处理进度(Dataflow的做法),在整个处理过程中都会一直属于窗口X。对微批来说( Spark Streaming的做法),微批的边界或其他因素,是在引擎级别协调处理。

正如一直强调的,处理时间窗口的最大缺点是,当输入的顺序发生变化时,窗口的内容会发生变化。 为了更具体地说明这一点,我们来看这三种用例:

  • 事件时间窗口
  • 使用触发器的处理时间窗口
  • 使用进入时间的处理时间窗口

我们将每个窗口应用到两个不同的输入数据集(总共有6个变体)。 两个输入数据包含完全相同的事件(即相同的值,发生在相同的事件时间),但顺序不同。 第1个数据集跟我们之前例子中的顺序一致,颜色为白色;第二个数据集调整了事件的处理顺序,如下图12所示,为紫色。

image-20200924164831304

图12. 改变了处理时间,其他不变

基于事件时间的窗口

为了建立一个基线,我们首先将基于事件时间的使用启发式Watermark的固定窗口处理两个顺序不同的数据集。 我们将重用清单5 / 图7 中的提前/延迟处理的代码,从而得到如下结果。 左边实际上是我们以前看到的; 右边是第二个数据集的结果。 这里要注意的一点是:尽管输出的整体形状不同(由于处理时间不同),四个窗口的最终结果保持不变:14,22,3和12:

image-20200924164906636

图13. 处理时间顺序不同的事件时间窗口

使用触发器的处理时间窗口

现在我们来比较上述两种处理时间方法。 首先,将尝试触发器方法。使用处理时间窗口达到效果,需要考虑以下三个方面:

  • 窗口: 使用全局事件时间窗口,本质上是以事件窗格模拟处理时间窗口。.
  • 触发: 根据处理时间窗口的期望大小,在处理时间维度上周期性触发。
  • 累积: 使用丢弃模式来保持窗格彼此独立,从而让每个窗格都像一个独立的处理时间窗口。

相应的代码看起来像清单9; 请注意,全局窗口是默认的,因此没有具体的覆盖策略:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.triggering(
                  Repeatedly(AtPeriod(Duration.standardMinutes(2))))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

清单9. 在全局事件窗口上使用重发触发器、丢弃模式,模拟处理时间窗口

当流处理引擎上输入两个不同顺序的数据集的时候,结果如下图14所示。关于此图有两点有点意思:

  • 由于我们基于事件时间的窗格模拟处理时间窗口,所以在处理时间轴中勾画了“窗口”,这意味着窗口宽度是在Y轴上度量而不是X轴。
  • 由于处理时间窗口对输入数据的顺序敏感,在两个数据集中,每个窗口包含的数据都是不同的,即时事件发生的时间相同。 在左边我们得到12,21,18,而在右边我们得到7,36,4。

image-20200924164934945

图14. 处理时间顺序不同的处理时间窗口

使用进入时间的处理时间窗口

最后,我们来看看通过将输入数据的事件时间映射为入口时间来实现的处理时间窗口。在代码方面,这里有四个方面值得一提:

  • **时移:**当数据到达时,数据的事件时间被入口时间(数据到达时的处理时间)覆盖。请注意,我们目前在Dataflow中没有标准API来处理时间,尽管我们接下来会可能会使用伪代码I / O源中的虚构方法来代表下面的代码。对于Google Cloud Pub / Sub,只需在发布消息时将消息的timestampLabel字段留空;对于其他来源,需要查阅源代码文档。
  • 窗口: 返回使用标准的固定事件时间窗口。
  • 触发: 由于入口时间提供了计算理想Watermark的能力,所以可以使用默认触发器,在这种情况下,当Watermark通过窗口的末尾时,触发器会隐式触发一次。
  • **累积模式:**由于我们每个窗口只有一个输出,所以累积模式是无关紧要的。

实际的代码可能是这样:

PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

清单10. 显式设置默认触发器

在流式引擎上的执行将如下面的图15所示。当数据到达时,它们的事件时间被覆盖为它们的进入时间(即到达时的处理时间),导致在理想Watermark线上的向右水平移位。该图中有趣的注释:

  • 与其他处理时间窗口示例一样,当输入的顺序变化时,即使输入的值和事件时间保持不变,我们也会得到不同的结果。
  • 与其他示例不同,窗口在事件时间维度上(因此沿X轴)重新划分了。尽管如此,这些窗口并不是原生的事件时间窗口;而是我们将处理时间简单地映射到事件时间上,擦除每个输入的原始记录,并用新的记录代替它,而事件的时间是表示Pipeline首次收到到数据的时间。
  • 尽管如此,由于使用了Watermark,触发器仍然在与之前的处理时间示例完全相同的时间触发。此外,所产生的输出值与该示例相同,如左侧的12,21,18,右侧的7,36,4。
  • 由于使用入口时间,所以理想的Watermark是可能的,所以实际的Watermark与理想的Watermark相匹配,斜率为1,向右上方延伸。

image-20200924165003099

图15. 使用入口时间的处理时间窗口,处理两个内容一样但顺序不同的数据集

虽然看到不同的方法可以实现处理时间窗口很有趣,但是这里的大部分内容是自从第一篇文章以来一直提到的:事件时间窗口与顺序无关,至少在极限情况下如此(实际 在处理过程中的窗格可能会不同,直到输入完成),而处理时间窗口不是。 如果关心事件实际发生的时间,必须使用事件时间窗口,否则计算结果是无意义的。

Where**: 会话窗口**

现在来看一下我最喜欢的特性之一:动态的、数据驱动的窗口,称为会话窗口。

会话是一种特殊类型的窗口,它捕获数据中的一段活动,在不活动一段时间后窗口中止。 它们在数据分析中特别有用,因为可以让我们看到某一个特定用户在一段时间内的行为。 这可以让我们分析会话内的活动的相关性,基于会话的长度来推断用户的参与水平等。

从窗口的角度来看,会话窗口在两个方面很有趣:

  • 它们是数据驱动窗口的示例:窗口的位置和大小是输入数据本身来决定,而不是在时间内基于某些预定义模式,如固定和滑动窗口。
  • 它们也是不对齐窗口的示例,即窗口并不将数据一视同仁,而是将数据的特定子集(例如,每个用户)进行切分。 这与对齐的窗口(如固定和滑动窗口)形成对比,这些窗口通常对数据一视同仁,进行切分。

对于一些用例,可以提前在一个会话中的数据中标记一个共同标识符(例如,在线的的视频播放器,定时发出心跳包,心跳包内容是服务质量信息,对于任何给定的一次观看,分配一个会话ID,所有的心跳信息中都添加这个会话ID)。在这种情况下,会话更容易构建(按照会话ID区分会话),本质上是按键分组的一种形式。

然而,在更一般的情况下(即,实际会话提前并不知道),会话只能从从数据中构建出来。当处理无序数据时,这变得特别棘手。

提供一般会话支持的关键是,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口包含单个记录,每个记录中的每个记录与下一个记录的间隔不超过预先定义的间隔。因此,即使会话中的数据乱序了,也可以简单地通过将各个数据的重叠窗口合并在一起来构建最终会话。

img

图16. 未合并的原始会话窗口和合并之后的会话窗口

下面来看一个代码示例,以清单8中的代码为基础,修改为使用会话窗口:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

清单11. 基于会话窗口,提前和延迟触发,使用累加和撤销模式

在流处理引擎上执行如下所示:

image-20200924165052916

图17. 基于会话窗口,提前和延迟触发,使用累加和撤销模式

上图中的具体过程如下:

  • 当遇到具有值为5的第一个记录时,它被放置在从该记录的事件时间开始的单个原始会话窗口中,窗口宽度为会话窗口的超时时长,例如超时时长为1分钟,会话窗口宽度为1分钟。在后边遇到的任何窗口与该窗口重叠的都应该隶属于同一个会话,并且合并到此窗口中。
  • 第二个到达记录是7,它类似地放在自己的原始会话窗口中,因为它不与5的窗口重叠。
  • 同时,Watermark已经过第一个窗口的末尾,所以在12:06之前,包含值5的窗口被物化为准时的窗口。此后不久,当处理时间正好为12:06的时候,第二个窗口也被物化为具有值7的推测结果。
  • 我们接下来观察一系列的记录,3,4和3,这3个会话窗口相互重叠。因此,它们都被合并在一起,并且在12:07的时候提前触发,发出一个值为10的单个窗口。
  • 当8到达不久之后,它与具有值7的会话和与值10的会话重叠。所有这三个因此被合并在一起,形成具有值25的新的组合会话。当Watermark然后通过这个会话的末尾时,它物化了一个包含值25的新会话以及之前发布的两个窗口的撤消,但后来被并入它:7和10。
  • 当9到达延迟到达时,类似的舞蹈发生在9号晚上,与值为5的原始会话,和值为25的会话变成了一个更大的值为39的一个较大的会话。值39和窗口25、5的撤销被立即延迟触发。

就这么简单地将流处理模型分解为不同的、可组合的部分,这还真是了不起啊。至此,你可以将注意力放在业务逻辑上了,而非那些数据形式的细节。

终于到结尾了,好开心啊

我讲完了所有的例子。此处应有掌声!您现在应该已经步入了健壮的流处理的世界中了,准备好高飞吧。在您离开之前,我想快速回顾一遍,防止您忘了。首先,我提到了几个重要的概念:

  • 事件时间与处理时间:事件发生时间和被数据处理系统处理的时间之间的重要区别。
  • 窗口:通常使用的方法是通过在时间边界(通过处理时间或事件时间)对其进行切分来管理无限数据,尽管我们将数据流模型中的窗口定义缩小仅表示事件时间内)。
  • 水位线:事件时间进度的概念,为在无限数据上运行的乱序处理系统提供了估计窗口数据完整性的手段。
  • 触发器:用于精确指定在合适计算窗口结果的机制,对于特定用例是有意义的。
  • 累积:在单个窗口被多次触发计算的情况下,随着触发持续的修正窗口结果。

其次,我们用来构建我们探索的四个问题:

  • What 要计算出什么结果?= 转换
  • Where 事件在哪里结果计算? = 窗口
  • When 在处理时间维度上什么时候计算窗口结果? = 水位线 + 触发器
  • How 如何不断的修正计算结果?= 累积

第三,最后一点,这种流处理模式所带来的灵活性(最终,需要做的是在处理数据的各种要素之间取得平衡,如正确性,延迟和成本),回顾一下,通过少量的代码修改,对相同的数据集处理而得到的输出的变化如下:

image-20200925162921517

感谢您的耐心与兴趣,下次再会!