首页 Flink — 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理
文章
取消

Flink — 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理

Flink DataStream—— 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理

[TOC]

1. 背景

​ 最近一次项目当中需要将大量数据保存再Flink程序当中用作缓存数据一共后续数据使用,隧对最近使用到的状态、检查点、保存点等原理和使用进行一个总结

2. 状态

2.1 概述

​ 首先,状态和算子是息息相关的,所以算子可以分为有状态算子无状态算子,但是不同通俗的将某个算子定义为无状态算子或有状态算子。举个例子,以map为例,在默认使用map的情况下,map是属于一个无状态算子,因为他的结果输出是只观察当前输入的事件,并不依赖其他输入事件,所以此时他是一个无状态算子,但是在某些情况下,例如我们给他添加上状态,他就成了一个有状态算子。举个例子来说,现在存在输入的事件存在重复,我们需要对输入的事件进行去重,就需要对输入的事件进行一个保存,并过滤掉重复的事件,这个时候会需要给这个map算子添加一个状态(state)用来保存已经处理过的事件,用来确保后续事件不会重复进入。

如上图所示,状态的概念和逻辑并不复杂,但是流式处理框架仍需要解决一下问题:

  • 数据的产出要保证实时性,延迟不能太高。
  • 需要保证数据不丢不重,恰好计算一次,尤其是当状态数据非常大或者应用出现故障需要恢复时,要保证状态不出任何错误。
  • 一般流处理任务都是7*24小时运行的,程序的可靠性非常高。

基于上述要求,我们不能将状态直接交由内存管理,因为内存的容量是有限制的,当状态数据稍微大一些时,就会出现内存不够的问题。假如我们使用一个持久化的备份系统,不断将内存中的状态备份起来,当流处理作业出现故障时,需要考虑如何从备份中恢复。而且,大数据应用一般是横向分布在多个节点上,流处理框架需要保证横向的伸缩扩展性。可见,状态的管理并不那么容易。

作为一个计算框架,Flink提供了有状态的计算,封装了一些底层的实现,比如状态的高效存储、Checkpoint和Savepoint持久化备份机制、计算资源扩缩容等问题。因为Flink接管了这些问题,开发者只需调用Flink API,这样可以更加专注于业务逻辑。

2.2 状态的几种类型

Managed State和Raw State

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)

 Managed StateRaw State
状态管理方式Flink Runtime托管,自动存储、自动恢复、自动伸缩用户自己管理
状态数据结构Flink提供的常用数据结构,如ListState、MapState等字节数组:byte[]
使用场景绝大多数Flink算子用户自定义算子

上表展示了两者的区别,主要包括:

  • 从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当我们横向伸缩,或者说我们修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。
  • 从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
  • 从具体使用场景来说,绝大多数的算子都可以通过继承RichFunction函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

Keyed State和Operator State

对Managed State继续细分,它又有两种类型:Keyed State和Operator State

Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组,形成一个KeyedStream,数据流中所有id为1的数据共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的所有数据都可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。

无论是Keyed State还是Operator State,Flink的状态都是基于本地的,即每个算子子任务维护着自身的状态,不能访问其他算子子任务的状态。

 Keyed StateOperator State
适用算子类型只适用于KeyedStream上的算子可以用于所有算子
状态分配每个Key对应一个状态一个算子子任务对应一个状态
创建和访问方式重写Rich Function,通过里面的RuntimeContext访问实现CheckpointedFunction等接口
横向扩展状态随着Key自动在多个算子子任务上迁移有多种状态重新分配的方式
支持的数据结构ValueState、ListState、MapState等ListState、BroadcastState等

上表总结了Keyed State和Operator State的区别

Keyed State的使用方法

对于Keyed State,Flink提供了几种现成的数据结构供我们使用,包括ValueStateListState等,他们的继承关系如下图所示。首先,State主要有三种实现,分别为ValueStateMapStateAppendingStateAppendingState又可以细分为ListStateReducingStateAggregatingState

这几个状态的具体区别在于:

  • ValueState<T>是单一变量的状态,T是某种具体的数据类型,比如DoubleString,或我们自己定义的复杂数据结构。我们可以使用T value()方法获取状态,使用void update(T value)更新状态。
  • MapState<UK, UV>存储一个Key-Value Map,其功能与Java的Map几乎相同。UV get(UK key)可以获取某个Key下的Value值,void put(UK key, UV value)可以对某个Key设置Value,boolean contains(UK key)判断某个Key是否存在,void remove(UK key)删除某个Key以及对应的Value,Iterable<Map.Entry<UK, UV>> entries()返回MapState中所有的元素,Iterator<Map.Entry<UK, UV>> iterator()返回状态的迭代器。需要注意的是,MapState中的Key和Keyed State的Key不是同一个Key。
  • ListState<T>存储了一个由T类型数据组成的列表。我们可以使用void add(T value)void addAll(List<T> values)向状态中添加元素,使用Iterable<T> get()获取整个列表,使用void update(List<T> values)来更新列表,新的列表将替换旧的列表。
  • ReducingState<T>AggregatingState<IN, OUT>ListState<T>同属于MergingState<IN, OUT>。与ListState<T>不同的是,ReducingState<T>只有一个元素,而不是一个列表。它的原理是:新元素通过void add(T value)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState<IN, OUT>ReducingState<T>类似,也只有一个元素,只不过AggregatingState<IN, OUT>的输入和输出类型可以不一样。ReducingState<T>AggregatingState<IN, OUT>与窗口上进行ReduceFunctionAggregateFunction很像,都是将新元素与已有元素做聚合。

以上就是关于状态的基本信息了。在日常分布式场景中,主要使用的还是Keyed State较多。

3. 检查点

​ 在上面介绍了Flink的算子都是基于本地的,而Flink又是一个部署在多节点的分布式系统,分布式系统经常出现进程被杀、节点宕机或网络中断等问题,那么本地的状态在遇到故障时如何保证不丢呢?Flink定期保存状态数据到存储上,故障发生后从之前的备份中恢复,这个过程被称为Checkpoint机制。

3.1 Checkpoint大致流程

  1. 暂停处理新流入数据,将新数据缓存起来。
  2. 将算子子任务的本地状态数据拷贝到一个远程的持久化存储上。
  3. 继续处理新流入的数据,包括刚才缓存起来的数据。

Flink是在Chandy–Lamport算法的基础上实现了一种分布式快照算法。在介绍Flink的快照详细流程前,我们先要了解一下检查点分界线(Checkpoint Barrier)的概念。如下图所示,Checkpoint Barrier被插入到数据流中,它将数据流切分成段。Flink的Checkpoint逻辑是,一段新数据流入导致状态发生了变化,Flink的算子接收到Checpoint Barrier后,对状态进行快照。每个Checkpoint Barrier有一个ID,表示该段数据属于哪次Checkpoint。如下图所示,当ID为n的Checkpoint Barrier到达每个算子后,表示要对n-1和n之间状态更新做快照。Checkpoint Barrier有点像Event Time中的Watermark,它被插入到数据流中,但并不影响数据流原有的处理顺序。

接下来,我们构建一个并行数据流图,用这个并行数据流图来演示Flink的分布式快照机制。这个数据流图的并行度为2,数据流会在这些并行算子上从Source流动到Sink。

首先,Flink的检查点协调器(Checkpoint Coordinator)触发一次Checkpoint(Trigger Checkpoint),这个请求会发送给Source的各个子任务。

各Source算子子任务接收到这个Checkpoint请求之后,会将自己的状态写入到状态后端,生成一次快照,并且会向下游广播Checkpoint Barrier。

Source算子做完快照后,还会给Checkpoint Coodinator发送一个确认,告知自己已经做完了相应的工作。这个确认中包括了一些元数据,其中就包括刚才备份到State Backend的状态句柄,或者说是指向状态的指针。至此,Source完成了一次Checkpoint。跟Watermark的传播一样,一个算子子任务要把Checkpoint Barrier发送给所连接的所有下游子任务。

对于下游算子来说,可能有多个与之相连的上游输入,我们将算子之间的边称为通道。Source要将一个ID为n的Checkpoint Barrier向所有下游算子广播,这也意味着下游算子的多个输入通道里都会收到ID为n的Checkpoint Barrier,而且不同输入通道里Checkpoint Barrier的流入速度不同,ID为n的Checkpoint Barrier到达的时间不同。Checkpoint Barrier传播的过程需要进行对齐(Barrier Alignment),我们从数据流图中截取一小部分,以下图为例,来分析Checkpoint Barrier是如何在算子间传播和对齐的。

如上图所示,对齐分为四步:

  1. 算子子任务在某个输入通道中收到第一个ID为n的Checkpoint Barrier,但是其他输入通道中ID为n的Checkpoint Barrier还未到达,该算子子任务开始准备进行对齐。
  2. 算子子任务将第一个输入通道的数据缓存下来,同时继续处理其他输入通道的数据,这个过程被称为对齐。
  3. 第二个输入通道ID为n的Checkpoint Barrier抵达该算子子任务,所有通道ID为n的Checkpoint Barrier都到达该算子子任务,该算子子任务执行快照,将状态写入State Backend,然后将ID为n的Checkpoint Barrier向下游所有输出通道广播。
  4. 对于这个算子子任务,快照执行结束,继续处理各个通道中新流入数据,包括刚才缓存起来的数据。

数据流图中的每个算子子任务都要完成一遍上述的对齐、快照、确认的工作,当最后所有Sink算子确认完成快照之后,说明ID为n的Checkpoint执行结束,Checkpoint Coordinator向State Backend写入一些本次Checkpoint的元数据。

之所以要进行对齐,主要是为了保证一个Flink作业所有算子的状态是一致的,也就是说,一个Flink作业前前后后所有算子写入State Backend的状态都是基于同样的数据。

3.2 状态存储(State Backend)

上面为Checkpint的大致流程,其中State Backend起到了持久化存储数据的重要功能。Flink将State Backend抽象成了一种插件,并提供了三种State Backend,每种State Backend对数据的保存和恢复方式略有不同。接下来我们开始详细了解一下Flink的State Backend。

MemoryStateBackend

从名字中可以看出,这种State Backend主要基于内存,它将数据存储在Java的堆区。当进行分布式快照时,所有算子子任务将自己内存上的状态同步到JobManager的堆上。因此,一个作业的所有状态要小于JobManager的内存大小。这种方式显然不能存储过大的状态数据,否则将抛出OutOfMemoryError异常。这种方式只适合调试或者实验,不建议在生产环境下使用。下面的代码告知一个Flink作业使用内存作为State Backend,并在参数中指定了状态的最大值,默认情况下,这个最大值是5MB。

1
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));

如果不做任何配置,默认情况是使用内存作为State Backend。

FStateBackend

这种方式下,数据持久化到文件系统上,文件系统包括本地磁盘、HDFS以及包括Amazon、阿里云在内的云存储服务。使用时,我们要提供文件系统的地址,尤其要写明前缀,比如:file://hdfs://s3://。此外,这种方式支持Asynchronous Snapshot,默认情况下这个功能是开启的,可加快数据同步速度。

1
2
3
4
5
6
7
8
9
10
11
// 使用HDFS作为State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));

// 使用阿里云OSS作为State Backend
env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"));

// 使用Amazon作为State Backend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));

// 关闭Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false));

Flink的本地状态仍然在TaskManager的内存堆区上,直到执行快照时状态数据会写到所配置的文件系统上。因此,这种方式能够享受本地内存的快速读写访问,也能保证大容量状态作业的故障恢复能力。

RocksDBStateBackend

这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。然而,每次从RocksDB中读写数据都需要进行序列化和反序列化,因此读写本地状态的成本更高。快照执行时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State Backend时,也要配置分布式存储的地址。Asynchronous Snapshot在默认情况也是开启的。

此外,这种State Backend允许增量快照(Incremental Checkpoint),Incremental Checkpoint的核心思想是每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。Incremental Checkpoint非常适合超大规模的状态,快照的耗时将明显降低,同时,它的代价是重启恢复的时间更长。默认情况下,Incremental Checkpoint没有开启,需要我们手动开启。

1
2
3
// 开启Incremental Checkpoint
boolean enableIncrementalCheckpointing = true;
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing));

相比FsStateBackendRocksDBStateBackend能够支持的本地和远程状态都更大,Flink社区已经有TB级的案例。

除了上述三种之外,开发者也可以自行开发State Backend的具体实现。

4. 存储点

目前,Checkpoint和Savepoint在代码层面使用的分布式快照逻辑基本相同,生成的数据也近乎一样,那这两个相似的名字到底有哪些功能性的区别呢?Checkpoint的目的是为了故障重启,使得作业中的状态数据与故障重启之前保持一致,是一种应对意外情况的有力保障。Savepoint的目的是手动备份数据,以便进行调试、迁移、迭代等,是一种协助开发者的支持功能。一方面,一个流处理作业不可能一次性就写好了,我们要在一个初版代码的基础上不断修复问题、增加功能、优化算法、甚至做一些机房迁移,一个程序是在迭代中更新的;另外一方面,流处理作业一般都是长时间运行的,作业内部的状态数据从零开始重新生成的成本很高,状态数据迁移成本高。综合这两方面的因素,Flink提供了Savepoint的机制,允许开发者调试开发有状态的作业。

Flink的Checkpoint机制设计初衷为:第一,Checkpoint过程是轻量级的,尽量不影响正常数据处理;第二,故障恢复越快越好。开发者需要进行的操作并不多,少量的操作包括:设置多大的间隔来定期进行Checkpoint,使用何种State Backend。绝大多数工作是由Flink来处理的,比如Flink会定期执行快照,发生故障后,Flink自动从最近一次Checkpoint数据中恢复。随着作业的关停,Checkpoint数据一般会被Flink删除,除非开发者设置了保留Checkpoint数据。原则上,一个作业从Checkpoint数据中恢复,作业的代码和业务逻辑不能发生变化。

相比而下,Savepoint机制主要考虑的是:第一,刻意备份,第二,支持修改状态数据或业务逻辑。Savepoint相关操作是有计划的、人为的。开发者要手动触发、管理和删除Savepoint。比如,将当前状态保存下来之后,我们可以更新并行度,修改业务逻辑代码,甚至在某份代码基础上生成一个对照组来验证一些实验猜想。可见,Savepoint的数据备份和恢复都有更高的时间和人力成本,Savepoint数据也必须有一定的可移植性,能够适应数据或逻辑上的改动。具体而言,Savepoint的潜在应用场景有:

  • 我们可以给同一份作业设置不同的并行度,来找到最佳的并行度设置,每次可以从Savepoint中加载原来的状态数据。
  • 我们想测试一个新功能或修复一个已知的bug,并用新的程序逻辑处理原来的数据。
  • 进行一些A/B实验,使用相同的数据源测试程序的不同版本。
  • 因为状态可以被持久化存储到分布式文件系统上,我们甚至可以将同样一份应用程序从一个集群迁移到另一个集群,只需保证不同的集群都可以访问这个文件系统。

可见,Checkpoint和Savepoint是Flink提供的两个相似的功能,它们满足了不同的需求,以确保一致性、容错性,满足了作业升级、BUG 修复、迁移、A/B测试等不同场景。

参考

本文由作者按照 CC BY 4.0 进行授权