streaming102
awesomepaper
translate
字数20604 2021-03-02

原文:https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

闲话少说,下面进入主题。先总结下,上次的内容主要分三块:

本文中,将继续关注上次说的数据处理模式,而且通过具体实例来介绍细节。下面将本文主要分为两个部分:

通过本文,您将掌握一个健壮的无序数据处理所需的核心原则和概念,同时拥有让您超越批处理的时间推理工具。

为了让您对于实现方式有个直观的认识,我将会使用 Dataflow SDK 的代码片段(即 Google Cloud Dataflow 的 API),并辅以动画让这些概念可视化。我使用的是 Dataflow SDK 而不是人们所熟悉的 Spark Streaming 或者 Storm 等,原因在于还没有一个别的系统更加适合用于表达我想介绍的例子。不过好消息是有些项目正在朝着这个方向努力。更好的消息是,我们(谷歌)如今已经(联合 data Artisans、Cloudera、Talend 等公司)向 Apache 软件基金会提交了一份创建 Apache Dataflow 孵化项目的议案。希望围绕 Dataflow 模型 提供的健壮的无序数据处理语义来构建一个开放的社区及生态。这让我很期待 2016 年。擦,跑题了。

不过这篇文章缺少了我上次承诺的比较部分。对不起,我低估了我所要将的内容究竟有多少,究竟要花多少时间。但是呢,我又不知道该把所拖延的和该拓展的内容放在哪里。如果能安慰您的话呢,其实我在2015年的新加坡的 Strata + Hadoop World 做了一个演讲,题目是“大规模数据处理的演进”(2016年会在伦敦的 Strata + Hadoop World 做一个更新版本的演讲),其中涵盖了一些我之前想做的承诺的部分;这里有当时的 PPT。虽然不是完全一样,但是值得一读。

好,现在我们开始看“流”。

回顾与路线图

在Streaming 101中,首先澄清了一些术语,介绍了有限数据 VS 无限数据。有限数据源具有有限的大小,通常被称为“批处理”数据。无限数据源具有无限大小,通常被称为“流”数据。在后边将会尽量避免使用批处理和流式来修饰数据源,因为这些名称有其局限性并容易让人误解。

然后,解释了批处理流式处理引擎之间的区别:批处理引擎是设计优先考虑有限数据(现在批处理引擎也提供了micro-batch的方式处理流式数据),而流式处理引擎设计用于处理无限数据。我只会在描述执行引擎时使用批处理和流式处理。

定义完术语之后,介绍了与处理无限数据有关的两个重要的基本概念。首先阐述事件时间(事件发生的时间)和处理时间(数据在系统中被处理的时间)之间的关键区别。这为Streaming 101中提出的主要论文提供了基础:如果关心事件实际发生时间,则必须基于事件的事件时间,而不是处理时间。

接下来介绍了窗口的概念(即沿时间边界切分数据集),这是一种常用的方法,用于应对无限数据源的数据处理,无限数据源理论上永远不会结束。窗口策略的最常见且简单的例子是固定和滑动的窗口,更复杂的窗口类型,例如会话窗口(其中窗口由数据本身的特征决定,捕获每个用户的活动会话窗口,会话窗口之后紧接着是用户的不活动期)也比较广泛的用法。

除了前文中介绍的概念,现在再介绍3个新的概念:

最后,在回答无限数据处理中的4个问题时,更容易搞清楚这些概念和它们之间的关联关系:

后边将一一讨论这些问题,试图让大量清楚哪些概念与What/Where/When/How中的哪个问题有关。

Streaming 101 回顾

首先,回顾一下Streaming 101中提出的一些概念,这一次还将提供一些具体的例子使这些概念更具体。

What: 变换

经典批处理中 Transform 解决了以下问题:“要计算什么结果?”许多人可能已经熟悉经典的批处理,所以我们将以它为基础,加入其他概念,以便于理解。

对于这一部分,我们来看一个示例:计算由10个整数值组成的简单数据集的总和。这么说有点抽象,在实际中,可以想象一个游戏,10个人组成一个团队,每个人的最终得分相加,就是团队的成绩。也可以想象计费和使用情况的监控使用情况这样的场景。

对于每个示例,将包括一个简短的 Dataflow Java SDK 伪代码片段,以使 Pipeline 的定义更具体。因为是伪代码,所以有时会省略细节(如使用具体的I / O源)、使用简称(Java中的当前触发器名称太冗长)。除了这些(大部分我在 Postscript 中明确列举)的小事情之外,其它基本上是真实的 Dataflow SDK代码。稍后还将提供一个链接到实际的代码演练,可以编译和运行自己的类似例子感兴趣的人,可以实际尝试一下。

如果熟悉像 Spark Streaming 或 Flink 这样的计算引擎,那么在看 Dataflow 示例代码的时候就会容易一些。接下来开始让人崩溃的旅程,在Dataflow中有两个基本的原语:

img

图1. 转换类型

如果觉得不甚明白或者想刨根问底,可以看看 Google Dataflow Java SDK 文档

为了对例子说明,假设我们从一个 PCollection<KV<String,Integer>>命名为“input”(即由一个键/值对的字符串和整数组成的 PCollection,其中字符串是类似团队名称,整数是相应团队中个人的得分)。在现实世界的 Pipeline 中,通过从 I/O 源读取 PCollection 原始数据(例如日志记录)获得输入,将日志记录解析为适当的键/值对,然后将其转换为 PCollection<KV<String,Integer>> 。为了在第一个例子中清楚起见,将包括所有这些步骤的伪代码,但是在随后的例子中,删除了 I/O 和解析部分。

因此,对于简单地从 I/O 源读取数据的管道,解析出团队/分数键值对,并计算每队的得分数,代码如下:

PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Sum.integersPerKey());

清单1. 计算总和的 Pipeline

从数据源读取键值对数据,键是String类型(团队名称),值是Integer类型(团队中各人得分)。然后对每个键(团队)计算总和(团队得分),并输出。

对于所有的示例,在每个示例的 Pipeline 的代码片段之后,有1个该Pipeline执行的动画示例。动画中看到一个 Key(即一个团队)的10条输入数据执行Pipeline的样子;在一个实际的Pipeline 中,类似的操作将有成千上万个,在很多台机器上并行执行。为了能清晰的说明,示例中演示了1个Key的执行过程。

动画中每个点代表一个输入或输出数据,输入和输出都有两个时间维度:事件时间(X轴)和处理时间(Y轴)。因此,Pipeline 按照处理时间维度执行,从下向上延伸,如加粗的上升白线所示。输入是圆圈,圆圈内的数字代表该特定记录的值。输入开始是灰色圆圈,随着Pipeline处理变换颜色。

当 Pipeline 处理到某一个值的时候,会将其累加并记录在 State 中,并最终将聚合结果作为输出。State和输出由矩形表示,矩形顶部不断变化的数字表示累加值,矩形覆盖的区域表示到当前处理时刻,所有矩形中的数据已经被处理。对于清单1中的 Pipeline,在经典批处理引擎上执行时,会看起来像这样(请注意,点击下面的图片启动动画,然后会一直循环播放):

image-20200924163454383

图2. 经典的批处理

上边是在批处理引擎上执行Pipeline,累加输入的数据,直到所有输入处理完毕(由顶部的虚线绿色线表示),此时输出为51。在此示例中,因为没有应用任何特定的窗口,所以在事件时间维度上计算了整个数据集的总和;因此,用于累加的State和输出的矩形覆盖X轴的整体。但是,如果要处理一个无限数据源,那么经典批处理将是不够的,不能等待输入结束,因为它实际上永远不会结束。所以需要的使用在Streaming 101中提到的一个概念是---窗口。因此,想要回答第二个问题“在事件时间的哪个位置计算结果?”,现在先简要回顾一下窗口。

Where: 窗口

窗口是把数据在时间上进行切分。常见的窗口策略有:固定窗口、滑动窗口、会话窗口。

img

图3. 窗口策略

为了更好地了解实际中的如何使用窗口,来看一下的上边提到的整数求和Pipeline,使用了长度为2分钟的时间窗口。使用 Dataflow SDK,只需要简单的添加 Window.into 转换即可(下面代码中的第二行):

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

清单2. 使用窗口的求和代码

回想一下,Dataflow提供了一种统一的批处理和流式处理模型,因为语义上,批处理只是流式处理的一个子集。因此,首先在批处理引擎上执行此Pipeline;在批处理引擎上执行此Pipeline,原理简单明了,可以很好的跟在流式处理引擎上执行做对比。

image-20200924163551278

图4. 批处理上窗口的求和

如前所述,输入在不断累加存储在State中,直到输入被完全处理,然后产生输出。然而,在这种情况下,不是1个输出,而是4个:在时间上切分成了4个事件时间窗口,每个窗口产生一个输出。

到此为止,我们重新回顾了Streaming 101中引入的两个主要概念:事件时间和处理时间之间的关系以及窗口。再进一步,开始阐述本节开头提到的新概念:水位线,触发器和累积。下边开始Streaming 102。

Streaming 102

在上边我们看到了在批引擎上执行使用了窗口的Pipeline。理想情况下,我们希望具有较低的处理延迟,同时也希望能涵盖对无限数据集的处理。切换到流式引擎是朝着正确方向迈出的一步。批处理引擎可以明确的知道,每个窗口的数据是否完整了(即,有限数据集中所有的数据都被处理了),但目前缺乏确定无限数据集的完整性的实用方法。接下来介绍水位线。

When: 水位线

水位线是问题答案的前半部分:“在处理时间的什么时候计算窗口的结果?” 水位线是事件时间中输入完整性的时间概念。换句话说,它们是系统根据当前处理的数据的事件时间判断处理进度和完整性的方法(有限或无限数据集,在无限数据的情况下作用更为明显)。

回想一下 Streaming 101 中这个图,这里稍作修改,其中描述了事件时间和处理时间之间的偏差,现实中大多数的分布式数据处理系统中的时间偏差一直在变化。

img

图5. 事件时间进度、偏差、水位线

图中表示现实(reality)的红色曲线本质上是水位线。随着处理时间的推移,它跟踪事件时间完整性的进度。在概念上,可以将水位线视为函数 F(P) -> E,在处理时间中选取一个点,返回事件时间的一个点。(更准确地说,对函数的输入实际上是在 Pipeline 中观察到水位线的点上游的一切的当前状态:输入源,缓冲数据,正在处理的数据等;但在概念上,将其视为从处理时间到事件时间的映射更简单。)在事件时间点 E 上系统会认为事件时间小于 E 的所有数据都到齐了。换句话说,这是一个断言,不再有事件时间小于 E 的数据。根据水位线的类型:完美的或启发式的,这种断言可能是严格的保证或者经过训练的估计:

水位线是一个有趣和复杂的话题,未来会写一篇新的文章专门阐述。现在,为了更好地了解水位线的作用以及缺点,我们来看一下使用水位线的流式处理引擎的两个例子,以确定何时在清单2中执行使用窗口的Pipeline时实现输出。左边的例子使用理想的水位线;右边的一个使用启发式水位线。

image-20200924163653862

图6. 理想的与推测的水位线

在这两种情况下,当水位线通过窗口的末尾时,窗口被触发计算。两个执行的主要区别在于右侧的水位线计算中使用的启发式算法,值9因为迟到的问题而没有被计算在内,这大大改变了水位的形状[3]。这些例子突出了水位线的两个缺点:

在Streaming 101中,对关于无限数据流的强大的无序处理不足的完整性概念作了一些强调。水位线太慢或太快,是这些论据的基础。完全依赖于完整性概念的系统无法同时获得低延迟和正确性。触发器是解决这些缺点解决方案。

When**: 触发器的伟大之处就在于触发器本身很伟大**

触发器是问题的另一半答案:“在处理时间的什么时候该计算窗口的结果并发出?”触发器用来表明在处理时间的什么适合该计算窗口的结果(尽管触发器本身可能会根据其他时间触发,例如在事件时间维度上使用水位线)。窗口的每个特定输出都称为窗口的窗格(pane)。

用于触发的信号的示例包括:

除了基于具体信号触发的简单触发器之外,还有复合触发器,允许创建更复杂的触发逻辑。还有些复合触发器:

为了使触发器的概念更具体,继续介绍图6中使用的隐式默认触发器,将其添加到清单2中的代码中:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(AtWatermark()))
  .apply(Sum.integersPerKey());

清单3. 显式设定默认触发器

对触发器能够做什么有了基本的了解,可以开始考虑解决水位线太慢或太快的问题。在这两种情况下,我们希望在水位线超过窗口末尾之前或之后能够有机会计算窗口的结果,并能够提供持续更新的机制(除了水位线超过窗口末尾那一刻)。所以,需要一些多次触发触发器。那么问题就变成了:多次触发用来干什么?

太慢的情况下(即提供提前的推测结果),我们可能应该假设任何给定的窗口可能有稳定的输入数据,因为我们知道(通过窗口的早期阶段),我们观察到的这个窗口的输入是非常不完整的。因此,当处理时间提前(例如,每分钟一次)时周期性地触发可能是明智的,因为触发发射的数量将不依赖于实际观察到的窗口的数据量;在最坏的情况下,我们只会得到稳定的定期触发发射。

太快的情况下(即,由于启发式水位线可能存在错误的推测,所以需要一种机制去能够处理延时数据去修正计算结果),假设水位线基于相对准确的启发式(通常是相当安全的假设)。在这种情况下,预计不会经常看到延迟很久的数据,但是在实际中确实存在挺多延迟数据,不过结果很快会得到修正。每收到1个延时数据触发一次的策略,能够让我们更快的修正更新计算结果,但是由于预期的后期数据不频繁,应该不会给系统带来大的冲击。请注意,这些只是示例:如果有不同的应用场景,可以自由选择不同的触发器(或选择不适用触发器)。

最后,我们需要协调这些各种触发器的时间安排:提前,准时和延迟。我们可以使用Sequence 触发器和一个特殊的 OrFinally 触发器组合来实现,OrFinally触发器用来中止这个组合触发器。

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(Sequence(
                 Repeat(AtPeriod(Duration.standardMinutes(1)))
                   .OrFinally(AtWatermark()),
                 Repeat(AtCount(1))))
  .apply(Sum.integersPerKey());

清单4. 手动设定提前和推迟触发

如上所示,伪代码看起来不错,给出了提前、准时、延迟触发的常用模式,对应在 Dataflow 中,提供了一个定制(但语义上相当的)API,使得更容易使用这样的触发器,如下所示:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1))))
  .apply(Sum.integersPerKey());

清单5. 使用early/late api指定提前和延迟触发

在流式处理引擎上执行清单4 或清单5 中的示例(包含了理想的水位线和启发式的Watermak),如下所示:

image-20200924163747760

图7. 提前和延迟触发

这个版本对图6 有两个明显的改进:

这些新触发器的一个有趣的副作用是,它们有效地使理想和启发式水位线版本之间的输出模式标准化。而图6 中的两个版本是截然不同的,而现在这两个版本看起来很相似。

此时剩下的最大差异是窗口生命周期。在理想的水位线案例中,当水位线超过窗口末尾后,窗口过期,窗口中的数据再也不会被处理,可以被安全的回收。在启发式水位线案例中,我们仍然需要保留窗口一段时间来处理延迟数据。但是到目前为止,系统没有任何好的方式知道每个窗口需要保留多长时间。这是最大允许延迟的用武之地。

When: 最大允许延迟(超过即可回收)

在谈到最后一个问题(“如何修正结果?”)之前,先来聊一下持续运行、乱序数据流式处理系统中的实际面对的问题:垃圾收集。在图7 中的启发式水位线示例中,每个窗口的持续状态在该示例的整个生命周期内都会持续;这是必要的,以便在到达时适当地处理延迟数据。但是,尽管能够保留所有持续状态,直到数据全部处理完毕。实际上,当处理无限数据源时,一直保留给定窗口的状态(包括元数据)通常是不切实际;最终会耗尽内存、磁盘等的空间。

因此,任何现实世界的无序处理系统都需要提供一些限制其正在处理的窗口的生命周期的方法。最简洁的实现方在系统内定义一个最大允许延迟的边界,即限制任何给定记录最晚到达时间(相对于水位线时间)不能超过这个时间;任何超过这个时间的数据不再处理,直接丢弃掉。定义了最大允许延迟之后,还需要准确地确定窗口需要保留的时间:直到水位线超过了窗口的末尾时间+最大允许延迟时间[5]。允许系统丢弃超过最大延迟的数据,还能够节省系统的计算资源。

由于最大允许延迟和水位线之间的相互作用有点晦涩,举个例子说明一下。我们来看一下清单5 /图7中的启发式水位线 Pipeline,并添加1分钟的最大允许延迟时间(请注意,之所以选择1分钟是为了更好的在图中说明概念,在现实世界中需要根据场景来确定合理的最大允许延迟时间):

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .withAllowedLateness(Duration.standardMinutes(1)))
  .apply(Sum.integersPerKey());

清单6. 带有最大允许延迟的提前和延迟触发

这个Pipeline的执行看起来像下面的图8所示,添加了以下功能来突出显示允许的延迟效应:

在这个图中,为第一个窗口添加了一个额外的延迟数据6。6相对窗口是迟到的,但仍然在允许的最大延迟时间范围内,所以6被累加修正计算结果为11。然而,9 超过了最大允许延迟,直接被丢弃掉。

image-20200924163828993

图8. 提前和推迟触发及允许延迟

关于最大迟延线的两个最后的说明:

接下来继续讨论第4个也是最后一个问题。

How: 累积

随着时间推移,触发器被用于为一个窗口生成多个窗格,我们发现自己面临最后一个问题:“如何随着时间修正结果?”在我们已经看到的例子中,每个窗格建立在其前一个窗格基础之上。然而,实际上有三种不同的累积方式[6]:

放在一起看时,每个类型的累积语义会更清晰。考虑图7中第二个窗口的三个窗格(事件时间范围[12:02,12:04))。下表显示了三个支持的累积模式(每个窗格的值)将在三种支持的累积模式中显示(累积模式是图7中使用的特定模式):

img

表1. 使用图7的第二个窗口比较累积模式

下边的代码示例了丢弃模式:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

清单7. 提前/推迟触发的丢弃模式

在流式处理引擎上使用启发式水位线的效果如下所示:

image-20200924164229012

图9. 提前/推迟触发的丢弃模式

虽然输出的整体形状类似于图7的累积模式版本,但请注意,此丢弃版本中的任何一个窗格都不重叠。因此,每个输出与其他输出是独立的。

如果我们想看撤回的效果,那么这个变化将会是类似的(但是,请注意,Google Cloud Dataflow 的这个功能特性仍在开发中,所以这个API中的命名是临时的,可能与最终发布版本不同):

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

清单8. 使用提前/延迟触发的累积撤回模式

在流式处理引擎的执行如下所示:

image-20200924164301217

图10. 提前/推迟触发的累积撤回

由于每个窗口的窗格都是重叠的,所以看起来有点乱。撤回用红色表示,它与蓝色窗格结合重叠,看起来略带紫色。在一个给定的窗格中略微移动了两个输出的值(并用逗号分隔),使它们更容易区分。

将图9,7(启发式)和10的三张图动画效果的最终图放在一起,提供了三种模式的很好的视觉对比:

image-20200925160508695

图11. 丢弃模式、累积模式、累积和撤回模式的对比

从左到右是丢弃模式、累积模式、累积 & 撤回模式,三种模式需要的存储和计算成本依次递增。可以想像,在存储和计算成本方面,呈现的顺序(丢弃,累积,累积 & 撤回)的模式都相继更昂贵。为此,累积提供了一种新的维度,在正确性,延迟和成本的之间进行权衡。

小结

到此为止,我们接触了4个问题:

我们目前只了解了一种窗口类型:基于事件时间的固定窗口。 从Streaming 101中我们提到了多种窗口,其中有两个是今天要详细阐述的:基于处理时间的固定窗口基于事件时间的会话窗口

以下链接:https://zhuanlan.zhihu.com/p/60236077

When/Where: 基于处理时间的窗口

处理时间窗口重要的原因有两个:

因此,值得深入了解处理时间窗口和事件时间窗口之间的差异,特别是考虑到当今大多数流处理系统中广泛使用了处理时间窗口。

当使用类似于本文中提到的模型时,作为一等公民的窗口是严格基于事件时间的,有两种方法可以用来实现处理时间窗口:

请注意,这两种方法或多或少等同,但在在多处理步骤Pipeline的情况下略有不同:在触发器版本中,每个处理步骤都使用处理时间切分窗口,步骤之间相互独立,因此例如窗口X中的数据为 一个阶段可能会在下一阶段的窗口 X-1 或 X+1 中; 在进入时间版本中,一旦将数据归于窗口X中,由于不同的处理步骤时间使用水位线同步处理进度(Dataflow的做法),在整个处理过程中都会一直属于窗口X。对微批来说( Spark Streaming的做法),微批的边界或其他因素,是在引擎级别协调处理。

正如一直强调的,处理时间窗口的最大缺点是,当输入的顺序发生变化时,窗口的内容会发生变化。 为了更具体地说明这一点,我们来看这三种用例:

我们将每个窗口应用到两个不同的输入数据集(总共有6个变体)。 两个输入数据包含完全相同的事件(即相同的值,发生在相同的事件时间),但顺序不同。 第1个数据集跟我们之前例子中的顺序一致,颜色为白色;第二个数据集调整了事件的处理顺序,如下图12所示,为紫色。

image-20200924164831304

图12. 改变了处理时间,其他不变

基于事件时间的窗口

为了建立一个基线,我们首先将基于事件时间的使用启发式Watermark的固定窗口处理两个顺序不同的数据集。 我们将重用清单5 / 图7 中的提前/延迟处理的代码,从而得到如下结果。 左边实际上是我们以前看到的; 右边是第二个数据集的结果。 这里要注意的一点是:尽管输出的整体形状不同(由于处理时间不同),四个窗口的最终结果保持不变:14,22,3和12:

image-20200924164906636

图13. 处理时间顺序不同的事件时间窗口

使用触发器的处理时间窗口

现在我们来比较上述两种处理时间方法。 首先,将尝试触发器方法。使用处理时间窗口达到效果,需要考虑以下三个方面:

相应的代码看起来像清单9; 请注意,全局窗口是默认的,因此没有具体的覆盖策略:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.triggering(
                  Repeatedly(AtPeriod(Duration.standardMinutes(2))))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

清单9. 在全局事件窗口上使用重发触发器、丢弃模式,模拟处理时间窗口

当流处理引擎上输入两个不同顺序的数据集的时候,结果如下图14所示。关于此图有两点有点意思:

image-20200924164934945

图14. 处理时间顺序不同的处理时间窗口

使用进入时间的处理时间窗口

最后,我们来看看通过将输入数据的事件时间映射为入口时间来实现的处理时间窗口。在代码方面,这里有四个方面值得一提:

实际的代码可能是这样:

PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
  .apply(Sum.integersPerKey());

清单10. 显式设置默认触发器

在流式引擎上的执行将如下面的图15所示。当数据到达时,它们的事件时间被覆盖为它们的进入时间(即到达时的处理时间),导致在理想Watermark线上的向右水平移位。该图中有趣的注释:

image-20200924165003099

图15. 使用入口时间的处理时间窗口,处理两个内容一样但顺序不同的数据集

虽然看到不同的方法可以实现处理时间窗口很有趣,但是这里的大部分内容是自从第一篇文章以来一直提到的:事件时间窗口与顺序无关,至少在极限情况下如此(实际 在处理过程中的窗格可能会不同,直到输入完成),而处理时间窗口不是。 如果关心事件实际发生的时间,必须使用事件时间窗口,否则计算结果是无意义的。

Where**: 会话窗口**

现在来看一下我最喜欢的特性之一:动态的、数据驱动的窗口,称为会话窗口。

会话是一种特殊类型的窗口,它捕获数据中的一段活动,在不活动一段时间后窗口中止。 它们在数据分析中特别有用,因为可以让我们看到某一个特定用户在一段时间内的行为。 这可以让我们分析会话内的活动的相关性,基于会话的长度来推断用户的参与水平等。

从窗口的角度来看,会话窗口在两个方面很有趣:

对于一些用例,可以提前在一个会话中的数据中标记一个共同标识符(例如,在线的的视频播放器,定时发出心跳包,心跳包内容是服务质量信息,对于任何给定的一次观看,分配一个会话ID,所有的心跳信息中都添加这个会话ID)。在这种情况下,会话更容易构建(按照会话ID区分会话),本质上是按键分组的一种形式。

然而,在更一般的情况下(即,实际会话提前并不知道),会话只能从从数据中构建出来。当处理无序数据时,这变得特别棘手。

提供一般会话支持的关键是,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口包含单个记录,每个记录中的每个记录与下一个记录的间隔不超过预先定义的间隔。因此,即使会话中的数据乱序了,也可以简单地通过将各个数据的重叠窗口合并在一起来构建最终会话。

img

图16. 未合并的原始会话窗口和合并之后的会话窗口

下面来看一个代码示例,以清单8中的代码为基础,修改为使用会话窗口:

PCollection<KV<String, Integer>> scores = input
  .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
               .triggering(
                 AtWatermark()
                   .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

清单11. 基于会话窗口,提前和延迟触发,使用累加和撤销模式

在流处理引擎上执行如下所示:

image-20200924165052916

图17. 基于会话窗口,提前和延迟触发,使用累加和撤销模式

上图中的具体过程如下:

就这么简单地将流处理模型分解为不同的、可组合的部分,这还真是了不起啊。至此,你可以将注意力放在业务逻辑上了,而非那些数据形式的细节。

终于到结尾了,好开心啊

我讲完了所有的例子。此处应有掌声!您现在应该已经步入了健壮的流处理的世界中了,准备好高飞吧。在您离开之前,我想快速回顾一遍,防止您忘了。首先,我提到了几个重要的概念:

其次,我们用来构建我们探索的四个问题:

第三,最后一点,这种流处理模式所带来的灵活性(最终,需要做的是在处理数据的各种要素之间取得平衡,如正确性,延迟和成本),回顾一下,通过少量的代码修改,对相同的数据集处理而得到的输出的变化如下:

image-20200925162921517

感谢您的耐心与兴趣,下次再会!