chapter11.Stream Processing

在第 10 章中,我们讨论了批处理--一种读入一堆文件作为输入,生成一堆新的输出文件的技术。输出是派生数据的形式,意味着,如果有必要可以通过批处理重新创建数据集。我们看到这是多么简单有力的思路在创建搜索索引,推荐分析系统的时候。

但是,存在一个假设:即输入是有界的(大小已知且有限),因此批处理可以知道何时读取完成它的输入。例如,MapReduce 的核心排序操作在生产输出前必须获得所有输入:可能会发生最后一个输入是最小键的情况,因此需要将它作为第一个输出,所以不能提前开始输出。

实际上,许多数据是无界的,因为它们会随着时间推移慢慢到达:你的用户昨天和今天都在生产数据,而且明天还会继续。除非你倒闭,这个过程不会停止,从这个意义上来说,数据不会完成。因此批处理程序必须人为将其划分为固定持续时间的块:例如,在每天的最后处理一天的数据,或者在每个小时结束处理一小时的数据。

每日批处理的问题在于输入的更改要在一天之后才反映在输出中,这对于很多用户来说太慢了。为了降低延迟,我们可以更加平滑的运行处理过程----例如,在每秒结束处理一秒的数据----或者甚至,连续地没有所谓的固定时间块,就当时间发生时就处理。这就是流处理背后的思想。

通常来说,"流"是指随着时间推移逐渐可用的数据。这个概念出现在许多地方:Unix 的stdin 和 stdout,编程语言(惰性列表),文件系统的 API,TCP 连接,互联网上的音视频传输等等。

在本章中,我们将事件流视为一种数据管理机制:对应上一章批处理的无界的,增量处理的对应物。首先讨论如何通过网络表示,存储和传输流。在 451 页的数据库和流中我们将研究流和数据库的关系,最后,在 464 页的流处理中,我们将探索用于连续处理这些流的方法和工具,以及可用于构建应用程序的方法。

传输事件流

在批处理系统,任务的输入和输出都是文件(可能来自分布式文件系统)。等效的流是什么样子?

当输入是文件(字节的序列)时,处理的第一步就是解析为记录序列。在流处理的上下文中,记录通常被成为 event,但是本质上是一个东西:小的,自包含的,不变的对象,其中包含了某个时间点发生的某件事的详细信息。event 通常包含时间戳标识发生于一天中的那个时间点。

举例来说,发生的事情可能是用户执行的操作,比如查看页面或者购买。它也可能源自机器,比如温度传感器的定期测量或者 CPU 利用率指标。在 391 页的使用 Unix 工具进行批处理的例子中,web 的日志每一行都是一个事件。

如同在第四章中讨论,事件可以被编码成为字符串或者 JSON,或者某种二进制。编码允许你存储事件,例如将事件附加到文件中,然后将其插入到关系表中,或者写到文档数据库中;还允许通过网络将事件发送到另一个节点以对其进行处理。

批处理中,文件被写入一次然后可能被多个任务读取。类似地,在流式术语中,事件由生产者(也被称为发布者或者发送者)生成一次,然后由多个消费者进行处理(订阅者或接收者)。文件系统中,文件表示一系列相关记录;在流系统中,相关的事件通常组合在一起被称为 topic 或者 stream.

原则上来说,一个文件或者数据库足以连接生产者和消费者:一个生产者将事件写入数据存储,然后每个消费者定期轮询数据检查自上次运行以来出现的事件。本质上,就是批处理在每天结束时处理一天数据的过程。

然而,当转向低延迟的连续处理时,如果数据存储不是为这种用途设计的轮询的开销变得相当巨大。越频繁地轮询,返回新事件的概率就越低,因此开销就越高。所以,最好是当新事件到来时通知消费者。

传统上的数据对于这种通知机制的支持不是很好:关系数据库通常具有触发器,它可以对类似表有插入动作作出反应,但是可以做的非常有限,而且在数据库设计中有些事后思考(文献 4,5)。代替的是,已经有了专门开发出来的工具来传递事件通知。

消息系统

一个通用的通知消费者新事件到来的方案是使用消息系统:生产者发送一个包含事件的消息,然后被推送给消费者。我们再 136 页的消息传递数据流中涉及到了,在这里详细阐述。

生产者和消费者之间利用直接通信通道(Unix 管道或者 TCP 连接)是实现消息传递系统的简单方法。但是,大多数消息系统会在此基础上进行扩展。尤其是,Unix 管道和 TCP 连接只是将一个生产者和一个消费者连接起来,而消息系统允许多对多的消息模式

在此发布/订阅模型中,不同的系统采用不同的方法,并且没有可以支持所有目的的答案。为了区分系统,提出以下两个问题会很有帮助:

  1. 如果生产者发送消息比消费者处理消息快会发生什么?广义上来讲,会有三种情况:系统丢掉消息,在队列中缓存消息,进行负反馈(流控,比如告诉生产者不要发送了)。举个栗子,Unix 管道和 TCP 使用负反馈:他们有一个很小的固定 buffer,如果被占满,发送端会被锁住直到 buffer 里的内容被使用。 如果消息在队列中被缓存,重要的是理解如果队列不够长怎么办?如果队列内存不足会 crash 吗或者把消息写到磁盘?如果这样做了,怎么保证消息队列的性能?
  2. 如果节点 crash 或者暂时掉线了会发生什么,会丢消息吗?与数据库一样,持久性可能需要写入磁盘或复制,这回产生开销。如果可以容忍丢失消息的后果,可以在同样的硬件上获得更高的吞吐和更小的延迟

消息丢失是否能够接受更多的取决于应用。比如,对于定期的传感器读写,偶尔的丢数据是可以接受的,因为无论如何稍后都会有新的数据发送。然而,请注意,如果丢弃了大量消息,可能会导致度量不准确。如果要对事件进行计数,那么可靠的传递就变得更为重要,因为每个丢失的消息都会导致错误计数。

我们在 10 章中讨论的批处理系统有一个很好的特性,就是有很强的可靠性保证:失败的任务自动重试,并自动丢弃失败任务的部分输出。这意味着输出跟没有发生故障一样,这有助于简化编程模型。在本章的稍后,我们将研究如何在流系统中提供一个类似的保证。

从生产者直接传递到消费者

许多消息系统使用网络通信在生产者和消费者直连,不经过任何中间节点:

  • UDP 多播在禁用也中广泛应用于流,例如股市摘要,其中低延迟很重要。尽管 UDP 本身是不可靠的,但是应用级别的协议可以恢复丢失的数据包(生产者必须记住已经发送的数据包,以便丢失重传)
  • 无代理的消息库,比如 ZeroMQ,或者采用类似的方法,通过 TCP 或者 IP 多播实现发布/订阅消息
  • StatsD 和 Brubeck 利用不可靠的 UDP 消息从网络中的所有计算机手机指标并监控它们。(在 Stats 协议中,只有收到所有消息的指标才正确;使用 UDP 使得指标最好近似)
  • 如果消费者在网络中提供服务,生产者可以直接通过 HTTP 或者 RPC 请求将消息推送给消费者。这是 webhooks 背后的思路,这种模式是一种服务的回调 URL 向另一种服务注册,并且每当发生事件时,都会向该 URL 发出请求

尽管这些直接的消息系统在设计他们的场景中工作的很好,但是他们通常需要应用程序的代码意识到消息丢失的可能性。他们可以容忍的错误非常有限:即使协议检测到并重新传输网络中丢失的数据包,他们通常也假定生产者和消费者一直在线

如果消费者离线,当发送的消息不可达时就可能丢失消息。有些协议允许生产者重传失败消息,但是这种如果生产者 crash,就不起作用了,从而丢失本应该重试的消息。

消息代理

消息代理对比数据库

多个消费者

ACK 和重传

分区日志

消息存储使用日志

日志对比传统消息

消费者补偿

硬盘空间的利用

当消费者无法跟上生产者速度

重放老消息

数据库和流

我们在消息代理和数据库之间进行了一些比较。即使传统上它们都被视作工具的单独类别,但是我们看到基于日志的消息代理已经成功从数据库中获取了思路并将其应用于消息传递。我们也可以反过来从消息和流中获得启发,将其应用于数据库

我们之前说过,事件就是某个时间点发生的事情的记录。发生的事情可以是用户操作(例如查询),或者传感器读数,也可能是写入数据库。事实是写入数据库的事件可以被捕获,存储和处理。这个事实表明,数据库和流之间的连接比磁盘上的日志的物理存储更深入---这是很基本的。

事实上,复制的日志(replication log)就是数据库写事件的流,由处理事务的领导者产生。跟随者将该写入流应用于自己的数据库副本,从而获得相同数据的准确副本。复制日志中的事件描述了数据发生了更改。

我们还从 348 页的 Total Order Broad-cast 中连接了复制状态机复制原理,该原理指出:如果每个事件表示一个数据库的写入,每个复制以相同的顺序处理相同事件,所有的复制最终会达到相同的状态。这只是事件流的另一种情况!

本节中我们首先研究异构数据系统中出现的问题,然后探讨如何通过事件流带入数据库的思想解决它。

保持系统同步

正如我们在本书中一直看到的,没有单一系统可以满足所有的数据存储,查询和处理需要。实际上,大多数不平凡的应用程序都需要组合几种不同的技术来满足其需求:比如,使用 OLTP 数据库服务用户请求,使用缓存来加速常见请求,全文索引来处理搜索查询,以及用于分析的数据仓库。其中每个都有自己的数据副本,并以自己的形式表示,针对自己的目的进行了优化

由于相同或者相关的数据出现在几个不同的位置,因此需要使它们彼此保持同步:如果在数据库中更新了一项,需要在缓存、索引和数据仓库中同步更新。对于数据仓库,通常这种同步由 ETL 来处理(查看 91 页的数据仓库),通常是获取数据库的完整副本,进行转换并将其批量加载到数据仓库中,换句话说,一个批处理。相似的,我们再 411 页的 The Output of Batch Workflows 中看到了如何使用批处理创建搜索索引,推荐系统和其他派生数据系统。

改变数据捕获

时间源

状态,流和不变性

处理流

到目前为止,在本章中,我们已经讨论了流的来源(用户活动事件,传感器和向数据库的写操作),并讨论了流的传输方式(通过直接消息,消息代理传递事件日志)

剩下的就是讨论有了流之后如何处理。大致来说,有三种选择:

  1. 你可以获取事件中的数据,并将其写入数据库,缓存,搜索索引和类似的存储系统,然后其他客户端从其查询。这是使数据库与系统其他部分中发生更改保持同步的一种好方法,尤其是流的消费者是唯一向数据库写入数据的客户端的情况下。写入存储系统相当于在 411 页的批处理工作流的输出中讨论的流式等效
  2. 你可以通过某种方式将事件推送给用户,比如发邮件,发通知,或者将事件流式传输到实时仪表版可视化。这种情况下,人是消费者
  3. 你可以处理一个或者多个输入流然后生产一个或者多个输出流。流会通过管道流经几个不通的处理阶段在最终输出之前(1 或 2)

在本章的剩余部分,我们将讨论选择 3:处理流生成其他流。像这样处理流的一段代码被称为* operator 或者 job*. 与我们再第 10 章讨论的 Unix 进程和 MapRedues 任务密切相关,数据流的模式是类似的:流处理包括只读的数据流输入,并在输出中将其附加到不同位置

流处理的分割和并行化模式也是跟 MapReduces 相似的,所以我们不再赘述。基于 map 的操作比如转换或者过滤记录也可以相同

与批处理任务的最大不同就是流是无界的,不会结束。这个不同有很多含义:如果章节开始讨论的,对于无界数据集,排序没有意义,因此无法使用排序-合并连接。错误容忍机制必须改变:对于已经运行了几分钟的批处理任务,可以简单地重启失败任务,但是对于已经运行了几年的流处理任务,在崩溃后从头开始不可行。

流处理的使用

Reasoning about time

Stream joins

错误容忍

总结

在本章中,我们讨论了事件流以及他们的目的,和如何处理它们。在某些程度上,流处理很像批处理(第 10 章讨论),但是无界流不是固定大小的输入。从这个角度,流式处理中的消息代理和事件日志就类似文件系统。

我们花了些时间来比较两种类型的消息代理:

  • AMQP/JMS-style message broker 代理将单个消息分配给使用者,并且当消息被成功处理后
  • Log-based message broker