本文是作者学习Flink的一些文档整理、记录和心得体会,希望与大家共同学习探讨。
Apache Flink是一个开源的分布式流式处理框架,高性能、高可用,他有强大的流式和批处理能力,通过语义保证数据处理精确性。流式处理方面,Flink能对有界、无界数据流做有状态的计算(stateful computations)。
他有如下特点:
能同时支持高吞吐和低事件延迟(亚秒级)
真实时流处理
基于数据流模型,支持DataStream API中的event time
和无序处理
优雅的Java和Scala API,集成了较丰富的streaming operator,自定义operator也较为方便,并且可以直接调用API完成stream的split和join,可以完整的表达DAG图。
跨不同时间语义(event time
,processing time
)的弹性window(时间,计数,会话,自定义触发器)
支持Flink托管的State
具有Exactly Once
处理保证的容错能力
支持多种window语义,如Session, Tumbling, Sliding window,方便实时统计
流式程序中自然背压
支持流、批处理,且将批当作流的特例,最终实现批流统一。
Flink自主实现多层次内存管理而不完全依赖于JVM,可以在一定程度上避免大量Full-GC问题。
在此基础上,批处理的lib包可支持图运算和机器学习(FlinkML);流式处理的lib支持复杂事件处理;流批统一的关系型SQL&Table API
支持迭代计算
程序可自动优化
内置支持DataSet(批处理)API中的迭代程序(BSP)
自定义内存管理,可在内存和核外数据处理算法之间实现高效,可靠的切换。在JVM层面实现了内存管理和优化
可兼容Apache Hadoop MapReduce和Apache Storm
可集成YARN,HDFS,HBase和Apache Hadoop生态系统的其他组件
下面这个例子展示了用scala语言写的一段对5秒时间窗口内的数据进行流式word count的流处理程序:
case class WordWithCount(word: String, count: Long)
val text = env.socketTextStream(host, port, '\n')
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
windowCounts.print()
下面这个例子展示了用scala语言写的对数据进行word count批处理程序:
case class WordWithCount(word: String, count: Long)
val text = env.readTextFile(path)
val counts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.groupBy("word")
.sum("count")
counts.writeAsCsv(outputPath)
开发API较复杂,不支持状态托管, 用户必须自己处理状态持久化和一致性保证,如果某个挂了状态很难恢复
可参考
阿里巴巴fork,现在开始采用 flink的思想
微批,非真正的实时流式处理
同样是纯实时思想. 可参考
比较:
都支持丰富时间语义,不过SS不支持IngestionTime
处理模式
SS除了定时触发、连续批触发以外,还支持了跟flink类似的连续处理模式,不再使用批处理引擎,而是类似Flink的持续处理模式,端到端延迟最低可达1ms。
Structured Streaming将实时数据当做被连续追加的表,流上的每一条数据都类似于将一行新数据添加到表中。
Structured Streaming定义了无界表的概念,即每个流的数据源从逻辑上来说看做一个不断增长的动态表(无界表),从数据源不断流入的每个数据项可以看作为新的一行数据追加到动态表中。用户可以通过静态结构化数据的批处理查询方式(SQL查询),对数据进行实时查询。
都可以注册流表,进行SQL分析
运行架构
类似,SS主节点是Driver,Flink是JobManager
异步IO和维表Join
FlinkSql支持异步维表Join;
Structured Streaming不直接支持与维表的join操作,但是可以使用map、flatmap及udf等来实现该功能,所有的这些都是同步算子,不支持异步IO操作。但是Structured Streaming可直接与静态数据集join,也可以帮助实现维表的join功能,当然维表要不可变。
状态管理
Flink状态不需要用户自己维护,更灵活丰富
Join
Flink Join支持更全,SS限制较多
分布式的流式数据流的运行时,是Flink的核心实现层,包括分窗、各种Time、一致性语义、任务管理和调度、执行计划等。一般用户不用关心此层代码,只需调用上层API即可满足开发需求。
建立在Core层之上,分别为流式处理和批处理API。需要注意的是,Flink的批处理屙屎建立在流式架构上的。
建立在API层上的一些高级应用lib包,如机器学习、关系型API等。
Flink集群主要有两个角色,即Master-JobManager和SlaveWorker-TaskManager。
ResourceManager
负责资源分配和回收,内部有SlotManager负责管理TaskManagerSlot
(最小资源分配和调度单位),整个Flink Cluster只有一个ResourceManager实例。
Flink在不同运行环境(Yarn、Mesos、K8S等)中实现了不同ResourceManager,如YarnResourceManager
。
主要方法:
Dispatcher
JobMaster
每个JobMaster仅负责管理一个Job
的执行,监控单个Job运行的所有Task。
多个job可同时在一个Flink集群中运行,每个Job拥有自己的JobMaster。
JobMaster主要职责有:
JobMaster主要组件有:
ExecutionVertex
向Flink ResourceManager申请Slot资源、获取到资源后创建ExecutionVertex部署信息发送给TM),管理调度策略、Failover策略、Slot分配等ExecutionGraph.schedulingTopology
来创建failover策略类FailoverStrategy
的实现类,比如RestartPipelinedRegionFailoverStrategy
,即当某个节点失败时,重启一个Region的所有节点。SchedulingStrategy
实现类,如EagerSchedulingStrategy
,这个实现类表示所有Task在同一时间一起调度。调度策略详情可见 2.10.3。ExecutionGraph
向本JM提起的Slot请求,当没有足够Slot时就向ResourceManager申请新的Slot。AllocationID
标记一个Flink集群有一个JobManager进程实例,一些HA场景可能有多个StandBy JM。
Job最大并行度 / 每个TaskManager分配的Slot数
。也就是说TM数量由Flink根据你的Job情况自动推算,-yn
启动参数失效了。在TM上实际执行的类为TaskExecutor
。
客户端虽然不是Flink运行时和作业执行时的一部分,但它是被用作准备和提交 dataflow 到 JobManager 。
Job提交完成之后,客户端可以断开连接(分离模式),也可以保持连接来接收Job执行进度报告(attach模式)。
Flink内部节点之间的通信是用Akka,比如JobManager、TaskManager、Yarn-ResourceManager之间的通信,比如状态从TaskManager发送到JobManager。
可参考:
算子之间基于Netty通信。
可参考:
Flink Yarn Session部署流程如下:
Client上传依赖jar到HDFS
开启一个新的Flink Yarn Session时,Client首先检查申请的资源(AM所需的内存和vcore)可用的是否足够,然后将包括Flink和配置的jar上传到HDFS。
Client为AM申请资源
Client为Flink所用的AM向Yarn RM申请Container资源。
Yarn启动AM(JobManager)
Yarn的RM收到申请后,为AM在NM上分配首个Container,用来启动Flink AM。具体来说,NM会先为该AM做一些Container分配准备工作,如下载资源(就包括刚才Client上传的Flink Jar等文件)。准备完成后,就启动了AM。
JobManager和AM运行在同一个Container中,所以AM知道JobManager的地址。所以在启动完成后,会为随后需要创建的TaskManager们生成一个新的Flink配置文件,该文件就包含了该JobManager的链接地址。该文件也会被上传到HDFS。
此时,Flink的Web服务也会在该Container开始运行。
注意,Yarn为app分配的所有端口都是临时端口,可使得用户并行执行多个Flink Yarn Session。
AM向RM申请Container资源以启动TaskManager
RM向拥有适合Container资源的NM发送分配指令,NM接到请求后先从HDFS下载相关的jar文件、配置文件,然后启动TaskManager。此时TaskManager就能正确运行,并连接到正确的JobManager。
此时一个Flink Yarn Session集群部署完毕,可以开始接受Job。
更多内容可参考:
可参考:
注意:本章节基于Flink 1.10,该版本有大量修改。
Flink通过严格控制其各种组件的内存使用情况,在JVM之上提供有效的工作负载。Flink开发者已经竭尽可能来设置最优默认配置,但还是有一些时候需要细粒度内存调优。
Flink内存使用情况:
Total Process Memory(taskmanager.memory.process.size
)
这个就是在容器环境(Yarn、Docker等)Flink程序请求的总内存大小。
Total Flink Memory(taskmanager.memory.flink.size
)
不建议同时设定此选项和Total Process Memory
,否则可能导致内存设定冲突
JVM Heap
Framework Heap
JVM堆内存中Flink框架本身专用部分,一般不调整
相关配置taskmanager.memory.framework.heap.size
Task Heap
JVM堆内存中运行算子和用户代码专用部分
相关配置taskmanager.memory.task.heap.size
。
Off-Heap Memory
Managed Memory
由Flink管理的本地内存,Batch jobs
用来排序、存放HashTable、中间结果的缓存;Streaming jobs
的RocksDB State Backend。这块内存堆皮处理算子提高处理效率很重要,有些操作可直接在原始数据进行而无需序列化过程,所以Flink会尽可能在不超过配额前提下分配Managed Memory,并当该内存不足时,优雅得将数据落盘,避免OOM。
相关配置taskmanager.memory.managed.size
和taskmanager.memory.managed.fraction
,都设置时size会覆盖fraction,如果都没设置则用默认值 0.4。
Direct Memory
Framework Off-heap Memory
运行Flink框架的堆外直接(或本地)内存,一般不调整
相关配置taskmanager.memory.managed.size
、taskmanager.memory.framework.off-heap.size
Task Off-heap Memory
运行算子的堆外直接、本地的task专用内存
相关配置taskmanager.memory.task.off-heap.size
Network Memory
Flink用来在Task之间交换数据(比如网络传输Buffer)的直接内存。
相关配置:taskmanager.memory.network.min
、taskmanager.memory.network.max
、taskmanager.memory.network.fraction
其他Off-Heap Memory
JVM metaspace
相关配置taskmanager.memory.jvm-metaspace.size
JVM Overhead
JVM其他开销使用的本地内存,比如线程栈、代码缓存、GC空间等。
相关配置:taskmanager.memory.jvm-overhead.min
、taskmanager.memory.jvm-overhead.max
、taskmanager.memory.jvm-overhead.fraction
参考
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup.html
flink 1.11在flink 1.10调整了内存模型基础上,1.11进一步调整,JM和TM内存模型更统一
必须设定以下某项,因为没有默认值
TaskManager
JobManager
taskmanager.memory.flink.size
jobmanager.memory.flink.size
taskmanager.memory.process.size
jobmanager.memory.process.size
taskmanager.memory.task.heap.size
taskmanager.memory.managed.size
jobmanager.memory.heap.size
如果使用低版本升级到高版本Flink,可参考迁移指南
注意上图中Off-heap
包括
Off-heap Memory用途
Flink大多默认配置已经足够好,一般只需要配置taskmanager.memory.process.size
或taskmanager.memory.flink.size
,再配置下taskmanager.memory.managed.fraction
控制Jvm堆和Managed Memory
比例。
以下配置适用于TaskManager和JobManager:
Key
Default
Type
Description
taskmanager.memory.process.size
(none)
MemorySize
TaskExecutors的所有内存大小即Total Process Memory
.
使用YarnConainter时应该和Contianer内存一致(不设本值,只指定yjm
或ytm
已经和Contianer内存一致)
taskmanager.memory.flink.size
(none)
MemorySize
TaskExecutor的总Flink内存大小即Total Flink Memory
,除去JVM Metaspace和JVM Overhead
taskmanager.memory.framework.heap.size
128 mb
MemorySize
堆内Flink Framework内存,不会分配给Task Slot
taskmanager.memory.framework.off-heap.size
128 mb
MemorySize
堆内Flink Framework内存,包括direct 和 native,不会分配给Task Slot。
Flink 计算 JVM 最大 direct memory
时会考虑本部分
taskmanager.memory.task.heap.size
(none)
MemorySize
Task Heap Memory
,JVM堆内存中运行算子和用户代码的task专用部分.
未指定本值时,等于Total Flink Memory
减去(Framework Heap Memory
加 Task Off-Heap Memory
加 Managed Memory
加 Network Memory
taskmanager.memory.task.off-heap.size
0 bytes
MemorySize
Task Off-heap Memory
,JVM堆外直接、本地内存中运行算子和用户代码的task专用部分.
Flink 计算 JVM 最大 direct memory
时会考虑本部分
taskmanager.memory.managed.size
(none)
MemorySize
位于堆外非直接内存中的Managed Memory
。Batch jobs
用来排序、存放HashTable、中间结果的缓存;Streaming jobs
的RocksDB State Backend Memory。
使用者即可以以MemorySegments的形式从内存管理器中分配内存,也可以从内存管理器中保留字节并将其内存使用率保持在该范围内。
如果未指定,则采用下一项配置来计算。
taskmanager.memory.managed.fraction
0.4
Float
位于堆外非直接内存中的Managed Memory
比例,如果没有配置具体大小时,该空间大小由Total Flink Memory
乘以本值得出。
taskmanager.memory.network.fraction
0.1
Float
(TaskManager Only)堆外直接内存中的Network Memory
,Flink用来在Task之间交换数据(比如网络传输Buffer),用Total Flink Memory
乘以本值来计算。但结果如果小于network.min就用min,大于max就用max。
taskmanager.memory.network.max
1 gb
MemorySize
network memory上限
taskmanager.memory.network.min
64 mb
MemorySize
network memory下限
taskmanager.memory.jvm-metaspace.size
96 mb
位于堆外的MemorySize
JVM Metaspace 内存
taskmanager.memory.jvm-overhead.fraction
0.1
Float
位于堆外的JVM其他开销使用的本地内存,比如线程栈、代码缓存、GC空间等。包括native内存但不包括direct内存,
Flink 计算 JVM 最大 direct memory
时不会考虑本部分.
jvm-overhead区具体大小用Total Process Memory
乘以本值来计算。但结果如果小于jvm-overhead.min就用min,大于max就用max。
taskmanager.memory.jvm-overhead.max
1 gb
MemorySize
jvm-overhead上限
taskmanager.memory.jvm-overhead.min
192 mb
MemorySize
jvm-overhead下限
本地ide直接运行且不运行集群时,只需做以下内存配置:
Memory component
配置项
本地运行时默认值
Task heap
taskmanager.memory.task.heap.size
无限大
Task off-heap
taskmanager.memory.task.off-heap.size
无限大
Managed memory
taskmanager.memory.managed.size
无限大
Network memory
taskmanager.memory.network.min
taskmanager.memory.network.max
无限大
以上配置不是必须的,未设定时采用默认值。
注意: 启动的本地进程的实际JVM堆大小不受Flink的控制,取决于您如何启动该进程。 如果您想控制JVM堆大小,则必须显式传递相应的JVM参数,例如 -Xmx,-Xms。
这种内存有三类设置情况,但必须在最大值和最小值之间,否则Flink会启动失败。以下以network
举例。
total Flink memory = 1000Mb,
network min = 64Mb,
network max = 128Mb,
network fraction = 0.1
此时network memory = 1000Mb x 0.1 = 100Mb,处于max/min之间.
total Flink memory = 1000Mb,
network min = 128Mb,
network max = 256Mb,
network fraction = 0.1
此时 network memory = 1000Mb x 0.1 = 100Mb < min,所以 network memory = 128Mb
total Flink memory = 1000Mb,
task heap = 100Mb,
network min = 64Mb,
network max = 256Mb,
network fraction = 0.1
此时network memory 是total Flink memory的剩余内存,但必须位于min-max之间,否则失败。
可参考:
Flink进程启动时根据配置或派生来的内存组件大小自动推断设置JVM参数
JVM Arguments
TaskManager Value
JobManager Value
-Xmx and -Xms
Framework Heap + Task Heap Memory
JVM Heap Memory
-XX:MaxDirectMemorySize
Framework Off-Heap + Task Off-Heap(计算时包括了用户代码使用的本地非直接内存) + Network Memory
Off-heap Memory (计算时包括了用户代码使用的本地非直接内存;jobmanager.memory.enable-jvm-direct-memory-limit为true时才会设置此JVM选项)
-XX:MaxMetaspaceSize
JVM Metaspace
JVM Metaspace
Java Heap
OffHeap Heap
包括JVM Direct Memory
和Native Memory
,当jobmanager.memory.enable-jvm-direct-memory-limit
为true
就开启对JVM Direct Memory内存大小限制,此时可通过-XX:MaxDirectMemorySize
/ jobmanager.memory.off-heap.size
限制。当发生了OutOfMemoryError: Direct buffer memory
异常时可调整该值。
如果不设,就从Total Flink Memory
减去JVM Heap
来推断该值。
managed memory
大小设为0,把尽可能多的内存分配给JVM Heap来运行用户代码。managed memory
,否则可能导致内存超限而使得应用被RM杀掉。详见Flink Application Execution
指一个用户程序,可通过main()
方法产生一个或多个Flink Job:
获取一个ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment()
Load/create 初始数据
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
数据转换
val mapped = text.map { x => x.toInt }
指定计算结果输出位置,如各种Sink等
windowCounts.print()
触发程序执行
env.execute("Streaming WordCount")
每个Job由ExecutionEnvironment
提供的方法来控制job执行(如并行度)。
提交目的地有几个,他们的集群生命周期和资源隔离保证有所不同:
Flink Session Cluster
一个长期运行的Flink集群,可接受若干Flink Job运行。
该模式下的Flink集群生命周期与Job无关,以前该模式成为Flink Session Cluster。
Flink Job Cluster
只运行一个Flink Job的专用Flink Cluster。Flink On Yarn per-job就是该模式。
该模式下的Flink集群的生命周期和Job绑定。
需要较多时间用来申请资源和启动Flink Cluster,所以适合长期运行的Flink Job。
Flink Application Cluster
只运行一个Flink Application上提交的Job的专用Flink Cluster。Zeppelin per-job使用该模式。
该模式下的Flink集群的生命周期和Flink Application绑定。
该模式中,Flink应用的main()
方法在集群中运行,而不是在Client中。
提交job时不需要先启动Flink Session Cluster再提交job到该集群,而是将应用逻辑和依赖打到一个可执行的jar中,由ApplicationClusterEntryPoint
负责调用该jar的main
方法来提取JobGraph执行。
指LogicGraph的运行时表示,通过在Flink Application中调用execute
方法来创建和提交。
Logical Graph 是一个有向图,用来描述streaming程序的高层次逻辑。
图里的节点是Operator,边代表算子和敌营数据流(或数据集)的 input/output 关系。
逻辑计划也通常被称为dataflow graphs
(数据流图)
PhysicalGraph是将LogicalGraph翻译成在分布式运行时中运行的执行图的结果。
图里的节点是Task,边代表数据流(或数据集)之间的 input/output 关系或partition。
注意这个Partition不同于Kafka的Partition,这是Flink Partition概念!
Partition是整个数据流(或数据集)的独立子集,数据流中的每条record会被发到一个或多个partition。
Partition的消费者是Task。
如果一个算子改变了数据流的partition划分方式,则称为repartitioning
前面提到过,Flink资源调度单位为Slot
,每个TaskManager有一个或多个Slot,每个Slot可以运行多个不同JobVertex的并行Task实例组成的pipeline。一个pipeline由多个连续task组成,比如并行度为 n 的 MapFunction
和 并行度为 n 的 ReduceFunction
。
Pipeline内部各算子实例之间通过流水线交互数据,效率很高。
比如有一个job包含DataSource(并行度4)、MapFunction(并行度4)、ReduceFunction(并行度3),此时一个pipeline就由Source - Map - Reduce
序列组成。
如果当前有两个TM,每个TM包含3个Slot,则程序运行状况如下:
**上图中同一个颜色的就是一个pipeline!**在一个Slot中运行,多个pipeline之间是并发运行。
Flink内部使用SlotSharingGroup
和CoLocationGroup
来定义哪些task可以共享一个 Slot, 哪些task必须严格放到同一个slot。
切记,只有不同JobVertex的实例才能放到一个Slot进行Share。
job运行期间,JM跟踪分布式task,以决定何时调度下一个task,以及处理运行完成的task和执行失败。
我们先提一下一个重要的类Transformation
:
他是一个抽象类,表示创建了一个DataStream的特定算子。每个DataStream都对应有一个底层的Transformation,表示该Stream的发起者。
多个算子API会创建一棵Transformation树,当Stream程序提交运行的时候,这个树形结构图会被StreamGraphGenerator
翻译为StreamGraph
。
在运行时,一个Transformation不一定对应一个物理上的算子,因为某些算子只是逻辑上的,比如union/split/select
数据流、partition等。
比如有以下一个Transformation图:
会在运行时转换为如下物理算子图:
而分区信息、union、split/select等信息已经被编码到了Source到Map算子之间的边里。这些信息会在提交到集群JM后,被转为ExecutionGraph的ResultPartition和InputGate时使用。
JM接收参数为JobGraph
,他表示数据流,由作为顶点(JobVertex)的算子和中间结果IntermediateDataSet
构成。每个算子都有属性,如并行度、运行的代码、依赖的类库等。
JM将JobGraph转换为ExecutionGraph
。
ExecutionJobVertex
ExecutionGraph是一个并行版本的JobGraph,对于JobGraph的每个JobVertex来说,对应着多个表示每个并行子任务实例的ExecutionVertex
。比如并行度100的某个算子,则JobGraph中有1个表示该算子的JobVertex,ExecutionGraph中有100个表示该算子的ExecutionVertex。
ExecutionVertex的作用是跟踪特定子任务的执行状态。
ExecutionJobVertex
而所有对应一个JobVertex的ExecutionVertex被封装在一个ExecutionJobVertex
中,用来追踪该算子整体状态。
IntermediateResult
追踪JobGraph.IntermediateDataSet
状态
IntermediateResultPartition
追踪每个partition的IntermediateDataSet状态
每个ExecutionGraph都有一个与之相关的job状态信息,用来表示当前的job执行状态。
Created
Running
Finished
Failing
以撤销所有运行中Task。final
状态且配置为不可重启,则Job转为Failed
状态Restarting
,准备好后转为Created
Cancelling
,会撤销所有运行中Task。当所有Task转为final
状态后,转为Cancelled
Cancelled
、Failing
和Finished
状态会导致全局的终止状态,触发Job清理Suspended
与上面三个状态不同,只会触发本地终止
注意:由于一个TasK可能会被执行多次(比如在异常恢复后),所以ExecutionVertex
的执行是由Execution
来跟踪的,每个 ExecutionVertex 有Current Execution
和Prior Execution
。
Flink Job调度模型相关枚举类为ScheduleMode
,具体调度策略类为SchedulingStrategy
,决定执行图中的任务怎么开始。
EAGER
立刻调度所有task。
主要用于无界流数据。
LAZY_FROM_SOURCES
从Source开始,按拓扑顺序,一旦上游所有输入数据就绪,则下游task开始。即前驱任务全部执行完成后,才开始调度后续任务,后续任务会读取上游缓存的输出数据来进行计算。
适用于批处理作业。
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST
和LAZY_FROM_SOURCES
类似,不同之处在于本选项会使用batch slot请求,这种模式可支持使用比请求的request更少的slot来执行job。
用户需要保证job不包含任何pipelined shuffle
(正在开发中的)Pipelined region based
以PipelinedRegion作为单位进行调度
为准备好运行的Task分配Slot资源申请核心为运行在JobManager的SlotProvider
,有两种分配模式:
CompletableFuture#getNow(Object)
来获取已分配的slotFlink ResourceManager
负责资源分配和回收,内部有SlotManager
负责管理TaskManagerSlot
(最小资源分配和调度单位)。
Flink在不同运行环境(Yarn、Mesos、K8S等)中实现了不同ResourceManager,如YarnResourceManager
。
主要方法:
SlotManager
属于ResourceManager,具体管理集群中已注册到JM的所有TaskManagerSlot的信息和状态,他们由TaskExecutor启动后注册到RM时提供。当JM为Task申请资源时,SlotManager就会从当前空闲的Slot 中按一定的匹配规则选择一个空闲的 Slot 分配给Job使用。
内部通过TaskExecutor到Flink ResourceManager的定时心跳(包括了该TM的所有Slot状态信息)来更新Slot状态
SlotPool
每个SlotPool实例属于某个Job的对应的JobMaster实例
ExecutionGraph
向本JM提起的Slot请求,当没有足够Slot时就向Flink ResourceManager申请新的Slot。
提交任务详细流程可见Flink-作业提交流程
TM启动后会连接、注册到RM,成功后会将本TM携带的所有Slot信息传递到RM
Flink RM收到TM的注册消息后,会有一个动态代理AkkaInvocationHandler
触发ResourceManager#sendSlotReport
方法将该TM注册到SlotManager,此后该TM上的Slot资源可由JM进行分配。
实际上就是注册到SlotManagerImpl
,会将这些新注册的Slot先尝试从SlotManagerImpl.pendingSlots
中匹配已有的Slot请求;否则就将这个状态为TaskManagerSlot.State.FREE
的Slot放入freeSlots
中保存,表示为可分配的空闲Slot资源。
如果TM连接断开,则会移除该TM注册的Slot。
到这里,TM已经成功连接并注册到RM,但需要注意的是,TM与JM的连接和注册要等到RM向TM提起Slot资源分配请求阶段。
每个Job会对应启动一个JM,他会连接RM,然后调度本Job,向SlotPool申请Slot。
当然最开始因为还没连接到RM所以无法分配,需要放入pending
队列等待分配
RM接收到JM的SlotRequest请求后,SlotManagerImpl
尝试为该请求分配Slot资源
如果findMatchingSlot(ResourceProfile)
有可用TaskManagerSlot,就先从freeSlots中移除要分配的Slot,然后使用SlotManagerImpl#allocateSlot
进行分配,此时就会利用AkkaInvocationHandler
动态代理发送RPC 调用TM#requestSlot
TM接收到requestSlot后,查询taskSlotTable
,若目标Slot空闲则调用allocateSlot
开始分配Slot,会连接、注册到该申请Slot的JM,最后通过RPC offerSlots将Slot分配信息告知JM
JM接收到分配成功的Slot信息,此后Scheduler组件开始deploy所有ExecutionVertex,最后将Task部署描述信息TaskDeploymentDescriptor
通过RPC发送给TaskExecutor,提交任务
注意,一旦JM申请到Slot,其SlotPool也会保存他们,这样即使ResourceManager挂了,依然可以分配这里已空闲的Slot;当Slot无人使用时,会被自动释放。
TM接收到JM发送的Task部署信息后,会初始化TM各种服务,利用收到的部署信息组装Task
,利用ExecutionGraph创建运行时图(为每个下游inputChannel(ExecutionEdge)生成一个ResultSubPartition
实例PipelinedSubpartition
、为每个输入ExecutionEdge
生成一个InputChannel实例,每个InputChannel都只从上游单个ResultSubPartition消费)
Task组装任务完成后,从taskSlotTable匹配分配的Slot和本TM拥有的Slot,匹配上后启动executingThread线程来执行核心工作
包括从PermanentBlobService
下载相关lib jar、创建拥有这些类的引用的ClassLoader、为该Subtask的每个ResultPartition实例分别创建一个BufferPool
实例(由该SubTask的所有ResultSubPartition
共享,属于Floating Buffers)、初始化用户代码并创建RuntimeEnvironment
(提供给执行代码访问,内容如Task的name、并行度、Configuration、数据流Reader和Writer,以及TM提供的一系列组件如MemoryManager、IO Manager等,最后执行用户定义的StreamTask
分配Slot核心源码可见SchedulerImpl#internalAllocateSlot
// slotSharingGroupId不为空就申请`SharedSlot`,否则申请SingleSlot
CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :
allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
if (failure != null) {
cancelSlotRequest(
slotRequestId,
scheduledUnit.getSlotSharingGroupId(),
failure);
allocationResultFuture.completeExceptionally(failure);
} else {
allocationResultFuture.complete(slot);
}
});
这是一种Flink ResourceManager实现,具体如下:
跟上面图的区别是,TaskExecutor会在JM向Flink RM申请资源时发现SlotManager上的Slot不够,Flink RM才会向Yarn RM申请Container资源来启动TE来注册到RM,随后就是RM向TE请求Slot,最后TE Offer Slot给JM。
TM与RM和JM之间会定时发送心跳同步Slot状态,以保证分布式系统的一致性恢复。当某组件长时间未收到其他组件的心跳信息时,就会认为对方失效并进入FailOver流程。
Flink On Yarn分为:
Per-Job
Yarn-Session
Application
以上两种模式的共同问题是需要在Client执行用户代码以编译生成 Job Graph 才提交到集群运行,过程中需要下载相关 Jar 包、上传集群,客户端和网络负载压力容易成为瓶颈,尤其当多个用户共用一个客户端时。
1.11.0 中引入了 Application 模式(FLIP-85)来解决上述问题,按照 Application 粒度来启动一个集群,属于这个 Application 的所有 Job 在这个集群中运行。核心是 Job Graph 的生成以及作业的提交不再在客户端执行,而是转移到 JM 端执行,这样网络下载上传的负载也会分散到集群中,不再有上述 Client 单点上的瓶颈。
用户可以通过bin/flink run-application
来使用 Application 模式,目前 Yarn 和 Kubernetes(K8s)都已经支持这种模式。Yarn application 会在客户端将运行作业需要的依赖都通过Yarn Local Resource传递到 JM。K8s Application 允许用户构建包含用户 Jar 与依赖的镜像,同时会根据作业自动创建 TM,并在结束后销毁整个集群,相比 Session 模式具有更好的 隔离性。K8s 不再有严格意义上的 Per-Job 模式,Application 模式相当于 Per-Job 在集群进行提交作业的实现。
任何类型的数据都是作为事件流产生的,如信用卡交易、传感器数据、机器日志以及社交网站或者手机上的用户交互信息。数据可以分为无界和有界的。
无界数据有开端但没有定义结束。也就是说无界数据会源源不断的被生产,必须持续进行处理。处理无界数据通常要求以特定顺序(例如事件发生的顺序)摄取事件,以便能够推断出结果的完整性。
Flink通过时间和状态的精准控制能够在无界流上运行任何类型的应用程序。
有界数据有明确的开始和结束。可以在处理计算有界数据前先摄取到所有数据。与无界数据不同,处理有界流不需要有序地摄取,因为可以始终对有界数据集进行排序。 有界流的处理也称为批处理。
Flink通过算法和数据结构来对有界流进行内部处理,这些算法和数据结构专门针对固定大小的数据集而设计,从而产生出色的性能。
Flink中应用程序由用户自定义算子转换而来的流式 Dataflow 组成,这些Dataflow组成有向无环图DAG,包含若干Source和若干。
上图展示了一段Flink DataStream 程序转换为DataFlow的情况,图下方黄色圆内为一个算子。
大多数时候程序中的transformation
和算子一一对应,但也有例外,比如上图,一个transformation包含了三个算子。
Flink程序天生就是并行化和分布式的。在执行时,DataStream有一个或更多的StreamPartition,而且每个算子也有一个或多个Subtask(由不同线程甚至进程、机器节点执行)。
算子subtask个数就是该算子的并行度(operator.setParallelism(N)
),而一个Job的不同算子可能有不同并行度。
以下是实例程序的逻辑视图和并行视图:
和Spark一样,Flink既能以StandAlone方式部署,又可以跑在YARN Mesos Kubernetes等资源调度器上。
Flink在这些RM上运行时,关于提交和控制一个Application的所有通讯手段都是REST请求。
Flink可以根据应用配置的并行性来识别出所需的资源并向RM申请。为了预防任务失败,Flink会在container失败时重新申请新的资源。
Flink被设计来可以高效地运行任意规模的有状态的流式应用。应用程序可以并行拆分为数千个任务分布在集群中,同时执行。理论上来讲应用可以用到无限的资源。而且,Flink可以轻松维护非常大的应用程序状态。 Flink异步、增量的检查点(checkpoint)算法可以确保对处理延迟的影响最小化,同时能保证精确一次性(exactly once
)的状态一致性。
目前Flink应用广泛,很多公司使用它来处理巨大规模的应用,如:
Flink内的部分算子是有状态的,意味着一个新的事件来到,被处理的逻辑需要依赖于之前累积的事件的结果。这些装态可用来简单计算没短时间内的数量,也可以用在复杂场景,比如欺诈检测特征计算。
前面已经提到过,Flink算子可在不同线程并行执行。那么有状态算子的并行实例组在存储对应状态时是按key进行划分的,每个并行实例负责处理自己的那个key分组的事件和在本地维护这些key对应的状态。
状态总是在本地访问,可使得Flink程序达到高吞吐低延迟,利用内存达到极致性能。
有状态的Flink应用程序针对本地状态访问进行了特别优化。
任务状态信息始终保存在内存中,或是当状态信息超过可用内存时异步存储在高效的、位于磁盘的数据结构中。这样设计使得任务计算通常都是在内存中进行,延迟非常低。(是不是很熟悉,很SparkStreaming相似的思想)
前面说到过,Flink能保证就算出现故障时也拥有精准一次的状态一致性。
Flink的处理方式是周期性异步(异步原因是不阻塞正在进行的数据处理逻辑)获取状态并存储的检查点,来将本地状态持久化到存储,在出错时用以恢复,重放流。
状态快照的内容是
当发生故障时,就从最后一次成功存储的Checkpoint恢复状态,重置数据源,从状态中记录的消费offset开始重新消费。
必须要保证每个从Source发出的event只能精准一次地影响Sink,要求:
Stream
流,就是流式处理中的基本概念。虽然流数据分为各种不同特征的类型,但是Flink可以巧妙高效的进行处理:
Flink的设计哲学最擅长于处理无界数据,但是对于有界数据(其实就是批处理)也提供了高效的操作方式。
一般来说处理这类流数据的方式有两种:
数据流或数据集的基本组成元素就是Record。
算子和函数将Record作为输入和输出。
见这里
Time是流式应用状态中的一个重要概念。
大多流式数据本身就有时间语义,因为每个事件都是在特定的时间点上产生的。而且通常流式计算是以时间为基础的。在流式处理中很重要的方面是应用程序如何去测量时间即event-time
和processing-time
的差别。
在流处理中设定使用哪种时间,请参考Flink学习3-API介绍-DataStream-处理时间设定。这个设定决定了Source怎么表现(比如是否分配timestamp)以及时间窗口算子使用哪种时间作为计算标准。
EventTime是每个单独的Event在其生产设备上发生的时间,比如APP向服务端上报事件时在APP内该事件的发生时间,在APP内就会被嵌入到该event中。服务端可以从该event中提取EventTime时间戳。
说白了,EventTime是事件在现实世界中发生的时间,在之前就产生,和后来到达的Flink服务端时间无关。Flink可从每条记录中提取timestamp
。
处理具有EventTime语义的流的应用程序,是基于事件的时间戳来计算结果。 因此,无论是处理记录型还是实时的流事件,通过EventTime
处理都会得到准确和一致的结果。但由于事件往往乱序到达,所以不可能无线等待,只能等待有限时间。只要在有限等待时间内所有数据都达到Flink系统,则基于EventTime的处理就会产生正确和一致性结果,就算是乱序、事件迟到、重跑历史数据(比如一个月的Kafka历史数据,ProcessingTIme可能就只有几秒区间,这时就适合用EventTime处理)等场景。比如有1小时的EventTime时间窗口,则只要事件的时间戳落入该窗口时间,则不管顺序如何、何时被处理,都能被Flink正确处理。
当选择EventTime模式时有两个重要概念:
水位WaterMark
当使用EventTime时,程序中必须使用指定生成EvnetiTime WaterMark的方式。Flink将使用水位来推断EventTime应用中的时间,也就是说基于EventTime时必须指定如何生成WaterMark。
此外,水位也是一种灵活的机制,可以在结果的延迟和完整性间做出权衡。
延迟数据处理
当使用水位在event-time
模式下处理流时,可能发生在所有相关事件到达之前就已完成了计算,这类事件称为延迟事件。 Flink具有多种处理延迟事件的选项,如重新路由它们以及更新此前已经完成的结果。
请注意,有时当EventTime程序实时处理在线数据时,它们将使用一些ProcessingTime操作,以确保其及时进行。
数据到达Flink系统时间。
具体来说,是在Source算子处将每条记录都是用当前时间作为时间戳,后续时间算子就用该时间戳。
对比Processing Time,Ingestion Time代价较昂贵,但结果更可预测。因为Ingestion Time统一在source处理时一次性分配时间戳,因此对事件的时间窗口操作不会再有时间分配,所以不会受到如处理算子所在机器时间不同步或算子传输时网络延时造成计算结果不准确的问题;而Processing Time可能会有延时导致不准确的情况。
对比Event Time,Ingestion Time不能处理乱序事件和迟到的情况,但优势是可以不指定水印。
注意,在Flink内部实现中其实是将Ingestion Time当做类似Event Time来处理,但时间戳的分配和水印生成是Flink系统自动处理的,对用户透明。
概念
Processing Time指的是数据处理算子所在的机器的系统时钟时间。具体来说,流计算使用Processing Time时,所有基于时间的算子(如time windows)会使用具体算子所在的机器的系统时钟时间。
整点时间窗口区间
小时级的Processing Time Window将包括系统时钟指示整点小时之间到达特定算子的所有记录。例如,如果一个应用程序在9:15 am开始运行,则第一个每小时处理时间窗口将包括在9:15 am和10:00 am之间处理的事件,下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。
优点
Processing Time是Flink默认采用的时间,不需要流和机器之间的协调,编码实现最为简单,它提供了最佳的性能和最低的延迟。
缺点
但也必须容忍不确定性(比如数据乱序、迟到、各节点时间不同步等)以及并不精确、近似的结果。
适用场景
Processing Time语义适用于具有严格的低延迟要求,但能容忍一定的不准确结果的应用。
大体来说 ,Flink的窗口分为两类:
而滑动窗口与滚动窗口的最大区别就是滑动窗口有重复的计算部分。
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
// 中国,就需要指定-8才是utc-0
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
例如,滑动窗口两个参数为(10分钟,5分钟)。这样,每5分钟会生成(滑动)一个窗口,包含生成时往前推10分钟内到达的事件,每次有5分钟时间内的数据重叠,如下图所示。
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
// 中国,调整时区,-8
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
activity session
来对事件进行分组。val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
// 可用Time.milliseconds(x), Time.seconds(x), Time.minutes(x)等
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
使用全局窗口时,会将所有拥有相同key的元素分配到相同的单个全局窗口中。
仅当您还指定自定义触发器时,使用全局窗口才有意义;否则,将不会执行任何计算,因为全局窗口没有可以处理聚合元素的自然尾端。
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
请点击Flink-水位
触发器决定在窗口的什么时间点上启动用户定义的数据处理任务。
触发器意义是解决水位迟到、早到引起的问题。
根据Source是否参与Checkpoint快照机制,不同Source时更新用户定义的State有不同语义:
而若需要在消费端exactly-once基础上进一步实现端到端的exactly-once,那就需要Sink端也参与到Checkpoint,在已保证Source 状态exactly-once语义的前提下,不同Sink的交付保证如下:
可参考:
可参考
Flink State 误用之痛,竟然 90% 以上的 Flink 开发都不懂
从性能和 TTL 两个维度来描述ValueState 中存 Map 与 MapState 有什么区别?
如果不懂这两者的区别,而且使用 ValueState 中存大对象,生产环境很可能会出现以下问题:
Flink 源码:从 KeyGroup 到 Rescale
阅读本文你能 get 到以下点:
有状态和无状态:
可举例使用状态场景如下:
Flink状态核心概念如下:
ManagedState和RawState
Operator State
算子状态,一般存于内存
Keyed State
KeyedState保存在内嵌的类KeyValue存储内,它是一种由Flink管理的分片式key value存储。
State Backend
决定状态怎样去存、存在哪,可配置。
有状态计算:
运行基本业务逻辑的任何应用程序都需要记下事件或中间结果,以便在以后的时间点访问它们:例如在收到下一个事件时或在特定持续时间之后。
应用程序的State(状态)是很重要的一个概念,Flink中有很多feature来处理状态。
多种多样的状态元语
Flink有多种数据结构来提供状态原语,例如原子值、列表或映射,我们可以根据这个function访问方式来选择合适的状态元语类型。
可插拔的StateBackend
应用的状态是由一个可插拔的StateBackend服务管理和设置检查点的,我们可以选择用内存或RocksDB
(一个高效的嵌入式磁盘数据存储)甚至是自定义的状态后端插件来存储应用状态。
精准一次ExactlyOnce的状态一致性
Flink的检查点和恢复算法可以保证在失败时应用状态的一致性。所以,这些失败对应用来说是透明的,不会影响正确性。
巨大的状态信息维护
Flink通过异步和增量检查点算法可维护TB级别的应用状态。
可扩展的应用
Flink通过对应用弹性分配worker数量来实现应用可扩展。
维护跨并行Task实例的状态
Managed State
Managed State 描述了已在Flink注册的应用程序的托管状态。
Apache Flink 会负责Managed State的持久化和重伸缩等工作。
KeyedState是一种由Flink管理的分片式key value存储,详见5.1.3
ValueState
就是一个Key对应的一个State,可向其包装的任何变量,以此添加容错功能。
ValueState是一个包装类型,有三个重要方法:
null
ListState
就是一个Key对应的一个State List,是状态多值ListState。
可参考org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
的ListState<byte[]> bucketStates
和 ListState<Long> maxPartCountersState
MapState
一个Key对应的一个Map State
OperatorState
KeyedState保存在内嵌的类KeyValue存储内,它是一种由Flink管理的分片式key value存储,有两个特点:
keyBy
的过程就没有办法使用 KeyedStream。对齐流键和状态键可确保所有状态更新都是本地操作,从而确保了一致性而没有事务开销。 这种对齐方式还允许Flink重新分配状态并透明地调整流分区。
KeyGroup
,是Flink重分布KeyedState的原子单元。
ValueState
存Key对应的状态单值。
可以通过update方法更新状态值,通过value()方法获取状态值。
如 WordCount可用 Word 当 Key,Count存为该Word对应的State。
MapState
类似Java中的Map,由键值对状态组成。
需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。
ListState
类似Java中的List,一个Key含有多个状态值。
可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
ReducingState
状态为单值,存储用户传入的reduceFunction的reduce计算结果状态。
每次调用add
方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
AggregatingState
AggregatingState
与ReducingState
的区别是:ReducingState 中 add(T)
和 T get()
的泛型元素为同类型,但在 AggregatingState 输入的 IN
,输出的是 OUT
。
Keyed State 随 Key 在实例间迁移Redistribute。
keyBy
创建 keyed stream 对 key 进行划分,这是使用 keyed state 的基本前提!main方法:
// 1. get streaming runtime
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2. create a source from TransactionSource
// which creates infinite stream of credit card transactions.
// Each transaction contains an account ID (accountId),
// timestamp (timestamp) of when the transaction occurred, and US$ amount (amount).
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
// The name attached to the source is just for debugging purpose
.name("transactions")
// 3.define partitioning events
val alerts: DataStream[Alert] = transactions
// 按accountId划分task,相同accountId的会被分发到同一个task处理
.keyBy(transaction => transaction.getAccountId)
// process方法接收一个KeyedProcessFunction,应用到流中每个已分组的元素上
// KeyedProcessFunction有三个泛型,
.process(new FraudDetectorV2)
.name("fraud-detector")
// 4.define sink to output result to external system such as Apache Kafka
alerts
// AlertSink logs each Alert record with log level INFO for debug purpose.
.addSink(new AlertSink)
.name("send-alerts")
KeyedState只能用于RichFunction
// KeyedProcessFunction继承自AbstractRichFunction
class FraudDetectorV2 extends KeyedProcessFunction[Long, Transaction, Alert]
将State声明为实例级别变量
@transient private var flagState: ValueState[java.lang.Boolean] = _
在实现自RichFunction
的open
方法中创建State描述符,并为State赋值
/**
* 应该在方法开始处理数据之前注册State
* 所以在open()方法内使用ValueStateDescriptor注册state
* ValueStateDescriptor包含了告诉flink如何管理变量的元数据
*
* open方法由TaskManager的StreamTask在invoke方法的beforeInvoke内调用,
* 即在调用task主逻辑之前调用open方法一次
*/
@throws[Exception]
override def open(parameters: Configuration): Unit = {
// 1. 创建State描述符 StateDescriptor
// state名为flag;
// state类型为BOOLEAN
val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
// 2. 创建用ValueStateDescriptor定义的state
// getState(StateDescriptor) 方法初次调用时就是注册;restore时就是获取checkpoint里面状态last值
flagState = getRuntimeContext.getState(flagDescriptor)
// 创建timerState
val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
timerState = getRuntimeContext.getState(timerDescriptor)
}
读写State
// Get the current state for the current key
val lastTransactionWasSmall = flagState.value
// set the flag to true
flagState.update(true)
// clean up state
flagState.clear()
non-keyed state
,每一个 operator state 都仅与一个 operator 的实例绑定。source state
,例如记录当前 KafkaSource consumer 的 offsetOperator State 可用于所有算子,常用于 Source,例如 FlinkKafkaConsumer
.
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction
round-robin
的模式均匀分配给新的Task即可需要自己实现 CheckpointedFunction
或 ListCheckpointed
接口.
fromElements
会调用FromElementsFunction
的类,其中就使用了类型为 list state 的 operator state
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {
@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
createStateSerializer(getRuntimeContext().getExecutionConfig())));
if (context.isRestored()) {
// 恢复时,从ListState中读出所有分区Offset信息放入Map准备恢复
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
} else {
LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
}
}
@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
// 先将旧状态清理
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
// 获取当前Consumer offset
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
// 将当前offset放入状态快照
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
// 这一轮checkpoint完成时
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
LOG.debug("notifyCheckpointComplete() called on closed source");
return;
}
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// only one commit operation must be in progress
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
}
try {
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
return;
}
// 获取并移除本次已完成的checkpoint
@SuppressWarnings("unchecked")
Map<KafkaTopicPartition, Long> offsets =
(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (offsets == null || offsets.size() == 0) {
LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
return;
}
// 将这次状态快照记录的offset提交到kafka
fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
} catch (Exception e) {
if (running) {
throw e;
}
// else ignore exception if we are no longer running
}
}
}
}
Flink通过联合使用StreamReplay
和Checkpoint
来实现容错。Checkpoint需要使用持久化存储来保存状态:
其中每次Checkpoint就是标记每个输入流中的特定点以及每个运算符的对应状态。通过恢复算子状态以及从Checkpoint开始重放数据记录,可以恢复数据流以及同时保持一致性(ExactlyOnce处理语义)。
Flink容错机制的具体做法就是不断给分布式数据流做Checkpoint快照,存放到可配的地方(通常是分布式文件系统中)。当由于如机器、网络、软件等错误导致程序出错时,就停止该数据流,并从最近一次成功的Checkpoint恢复和重启算子,输入流也被重置到该快照点。而在恢复过程中被重复消费的数据会被保证不造成影响。
而Checkpoint的间隔时间,就是一种在执行中用于Checkpoint的开销和恢复时间的权衡方法:
状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:
注意:
Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用,也就是说只需要注册和使用这类状态,不需要管Checkpoint快照和恢复,Flink已经帮你做了。
对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。
但Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink应用后,某个数据流元素不一定会和上次一样,还能流入该算子子任务上。因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。
public interface CheckpointedFunction {
// Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 初始化时会调用这个方法,向本地状态中填充数据
void initializeState(FunctionInitializationContext context) throws Exception;
}
可参考:
Chandy Lamport算法,可以想象就是对水管吹气,把里面的水全部吹出来,此时就是管道水排空时系统状态。
对应到Flink,就是CheckpointCoordinator定时对Source发送Checkpoint Barr
由于Flink中每个函数和算子都可以是有状态的,有状态函数可以在处理每个事件过程中存储数据,这使状态成为任何类型更复杂操作的关键构成。
Flink容错性的核心部分就是为分布式数据流和算子状态制作Checkpoint一致性快照,可以在出错时从Checkpoint中恢复状态和执行位置等。
相关详细内容还可参考
Checkpoint需要使用持久化存储来保存快照状态:
状态快照是指Flink Job的全局一致性镜像,一个快照包括:
Checkpoint为异步执行的,Checkpoint屏障不会在lock步骤中传播,并且算子可以异步地为其状态制作快照。
Flink 的 StateBackend 利用写时复制(copy-on-write)机制允许当异步生成旧版本的状态快照时,能够不受影响地继续流处理。
只有当快照被持久保存到StateBackend后,这些旧版本的状态才会被当做垃圾回收。
Flink1.11以前只支持对齐的Checkpoint,从1.11开始也可启用不对齐的Checkpoint了。
对齐的Checkpoint
每个算子需要等到已接收所有上游发送的 Barrier 对齐后才可以进行 本算子状态的Snapshot ,完成后继续向后发送 Barrier。这样,在出现反压的情况下,Barrier 从上游算子传送到下游算子可能需要很长的时间,从而导致 Checkpoint 超时的问题。
非对齐的Checkpoint
针对这一问题,Flink 1.11 新增了 Unaligned Checkpoint
机制,开启后一旦收到第一个上游Barrier就可以开始执行Checkpoint,并把上下游之间正在传输的数据也作为状态保存到快照中,这样 Checkpoint 的完成时间大大缩短,不再依赖于算子的处理能力,解决了反压场景下 Checkpoint 可能超时的问题。
可以通过 env.getCheckpointConfig().enableUnalignedCheckpoints();
开启unaligned Checkpoint
。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
开启Checkpoint
Checkpoint默认关闭。需要用env.enableCheckpointing(n毫秒)
来开启,这里的n指开启两次checkpoint之间的间隔毫秒数时间。
env.enableCheckpointing(1000)
exactly once / at least once 语义
默认为exactly once
:
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
Checkpoint超时时间
如果某次Checkpoint超过此阈值还没完成,则将进行中的Checkpoint干掉作废,单位毫秒
env.getCheckpointConfig.setCheckpointTimeout(60000)
下次Checkpoint距离上一次Checkpoint完成后的最小时间间隔毫秒数。
注意,此时不会管已设置的Checkpoint时间间隔和每次Checkpoint持续时间。也就是说,此时间隔时间永不会比本参数时间还小了。还有一点就是,此时并发Checkpoint数设置失效,强制设为1。
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1500);
比如,我们通过env.enableCheckpointing
设定了是 2 个两次Checkpoint之间的间隔毫秒数时间为1000ms。那么如果一个Checkpoint耗时900ms,本来过100ms就应做下一个 Checkpoint,导致checkpoint过于频繁。这个时候,本设置让Checkpoint完成之后最少要等 500ms才开始下一个,可以防止 Checkpoint 太过于频繁而导致业务处理的速度下降。
所以有时候因为某次Checkpoint时间过长,可能导致采用时间间隔方式受影响导致上一个Checkpoint刚完成没多久又开始下一个Checkpoint,可采用本方式。
Checkpoint并发数
默认为1,即只能有1个Checkpoint同时进行。一个没完另一个不会触发。减少对Chcekpoint开销,不会对系统产生过多影响。
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
对于这种场景可以用并发Checkpoint:有确定的处理延迟(比如经常调用较好时的外部服务接口),但仍希望频繁Checkpoint,以期当发生错误时恢复很少的数据开始处理。
Checkpoint失败后是否导致flink任务失败
默认为true。
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
注:本方法已经废弃,建议改用以下方法,默认0即不容忍任何一个Checkpoint失败:
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(int)
是否开启外部检查点
默认情况下,Checkpoint 在默认的情况下仅用于恢复失败的作业,而并不会被保留,即当程序Cancel时 Checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业Fail或Cancel时不会被清除。
我们可以将周期性的Checkpoint配置为保存到外部存储。这种检查点会将元数据持久化到外部存储,而且即使job失败也不会自动清除。这样,当job失败时可直接从这个现成的Checkpoint恢复。详细参考:Externalized checkpoints 的部署文档
有两种选择:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
当Cancel该job时也保留 Checkpoint。也就是说,我们必须在Cancel后需要手动删除Checkpoint文件。
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
当Cancel该job时删除Checkpoint。仅当Job失败时,Checkpoint 才会被保留。
为单个job设置Externalized方法:
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
在flink-conf.yaml中全局设置方法:
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
优先从 Checkpoint 恢复而不是Savepoint
默认为false。该属性确定 job 是否在最新的 checkpoint 回退,即使有更近的可以潜在地减少恢复时间的 savepoint 可用(因为checkpoint 恢复比 savepoint 恢复更快)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Checkpoint由运行在JM的组件CheckpointCoordinator
定时触发:
// checkpointInterval
long baseInterval = chkConfig.getCheckpointInterval();
if (baseInterval < minPauseBetweenCheckpoints) {
baseInterval = minPauseBetweenCheckpoints;
}
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(),
// baseInterval就是我们配置的checkpointInterval
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
triggerCheckpoint(true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
分布式快照的核心组件被称为stream barrier
,他们会被注入到数据流中和数据记录一起在算子之间流动,并且他们永不会超过记录,因为他们是按严格线性流动的。
Barrier将记录分为流入当前快照和下一个快照的两部分,且Barrier会记录下当前快照ID。Checkpoint n
将包含每个算子的状态,这些状态是对应的算子消费了严格在Checkpoint barrier n
之前的所有事件,并且不包含在此后的任何事件后而生成的状态。
JM的CheckpointCoordinator定时触发Checkpoint流程
CheckpointBarrier被注入到Source阶段的并行数据流中,第n个快照的注入点Sn就是Source覆盖数据
比如在Kafka中,注入点就是该partition最后一条记录的Offset。 第n个快照的注入点位置Sn会被报告给JobManager的组件CheckpointCoordinator。
当Job 开始做Checkpoint barrier N 的时候可以理解为逐步将状态信息填充如下图左下角的表格。
比如上图Source收到Barrier N后将partition Offset(比如Source当前消费Kafka分区Offset)放入Source状态表格中。
Barrier在注入后,和其他数据一起向数据流下游流动。
当一个中间算子收到来自所有输入流的Snapshot N的对应Barrier N后,会将自己的状态异步写入持久化存储,并将Barrier N插入输出流发送给下游。
Barrier N流动到Operator 1时,就将属于Checkpoint N到Checkpoint N-1之间的所有数据反映到当前状态快照中。
一旦作为流程序DAG终点的某个Sink算子接收到所有输出流发来的Barrier N以后先填充自己的状态表格,随后就会认为快照N已经完成,此时会发送一个快照N已完成的ACK给CheckpointCoordinator。
当CheckpointCoordinator收到所有Sink算子发出的ACK后,快照N就被认为已经成功执行完成。
此后就不会再访问Sn以前的数据了,因为认为之前的数据已经走完整个数据拓扑。
最后 CheckPointCoordinator 会把整个 StateHandle 封装成 completed CheckPoint Meta,写入到 hdfs。
而如果直到Checkpoint超时,CheckPointCoordinator仍未收集完所有的 State Handle,CheckPointCoordinator会认为本次 CheckPoint 失败,将这次 CheckPoint 产生的所有状态数据全部删除。
当有多个输入流时,需要对齐输入流的同ID的Barrier。
当 job graph 中的每个 operator 都接收到该Barrier时,就会记录下自己的状态。
拥有两个输入流的 Operators(例如CoProcessFunction
)会执行Checkpoint Barrier对齐(barrier alignment) ,以便当前快照能包含已消费两个输入流 barrier 之前(但不超过)的所有 event 而产生的状态。
一旦算子从某个输入流收到BarrierN,那就不能再处理任何该输入流的数据,直到从其他所有输入流收到BarrierN。否则会导致将快照N和快照N+1的数据记录搞混。
暂时停止处理的流的数据继续收到放入InputBuffer等待处理。
一旦算子收集齐了BarrerN,就会立刻发送所有OutputBuffer中的PENDING状态的记录给下游,然后发送BarrierN。
算子发送BarrierN完毕后,在内存中将状态制作快照,然后恢复从输入流获取、处理数据
注意,会优先将InputBuffer中的数据处理完毕,然后再从输入流处理
最后,算子将内存中制作好的状态异步写入StateBackend
注意,所有拥有多路输入流和消费来自多个上游子任务输出流的shuffle后的算子都需要使用Checkpoint对齐。
特点:
算子状态属于快照一部分
只要你的算子包含任意格式的State,那就必须作为快照一部分保存。
算子状态快照时间点
当算子接收到所有输出流的BarrierN后,就在内存中将状态制作快照,然后恢复从输入流获取、处理数据,最后算子将内存中制作好的状态异步写入StateBackend。
在这个时间点上,所有在BarrierN之前的对State的更新已经做完,而在BarrierN之后对State的更新都还没做。
StateBackend
因为状态快照可能很大,不可能全放在内存,所以被存放到可配的StateBackend
。
默认在JM内存中,但生产中一般存放在分布式存储中,比如HDFS。
当状态快照被存储后,算子会确认该次Checkpoint完成,发送该次快照的 Barrier到输出到下游的流中。
状态快照包含内容
当失败时,直接从最近一次成功的Checkpoint恢复。
恢复时:
先从最新的全量快照启动算子,然后将后续的增量快照更新到算子状态上,得到最新状态。
Flink1.11以前只支持对齐的Checkpoint,从1.11开始也可启用不对齐的Checkpoint了。
对齐的Checkpoint
每个算子需要等到已接收所有上游发送的 Barrier 对齐后才可以进行 本算子状态的Snapshot ,完成后继续向后发送 Barrier。这样,在出现反压的情况下,Barrier 从上游算子传送到下游算子可能需要很长的时间,从而导致 Checkpoint 超时的问题。
非对齐的Checkpoint
针对这一问题,Flink 1.11 新增了 Unaligned Checkpoint
机制,开启后一旦收到第一个上游Barrier就可以开始执行Checkpoint,并把上下游之间正在传输的数据也作为状态保存到快照中,这样 Checkpoint 的完成时间大大缩短,不再依赖于算子的处理能力,解决了反压场景下 Checkpoint 可能超时的问题。
可以通过 env.getCheckpointConfig().enableUnalignedCheckpoints();
开启unaligned Checkpoint
。
特点:
非对齐Checkpoint核心思想就是只要in-flight缓存数据会成为OperatorState的一部分,那Checkpoint就可以超越这些数据。
因此,非对齐方式的Checkpoint算子仅短暂停止输入流处理以标记缓冲区、发送Barrier、创建其他状态的快照,不需要再像对齐Checkpoint那样等待所有所有输入流的Barrier。
非对齐Checkpoint可保证Barrier尽快到达Sink,所以特别适合某个流路径进展缓慢的情况,如果采用对齐Checkpoint甚至或延迟小时级别。
根据用户配置以及使用的集群,Flink有三种语义:
Exactly Once
数据既不丢失也不重复。注意这里的含义是保证引擎管理的状态更新只提交一次到持久的后端存储,而不是引擎只处理一次该数据。
5,3
会被重新处理,但他们仍然只会精确影响状态sum一次,所以结果依然正确。如果不需要可以设置CheckpointingMode.AT_LEAST_ONCE
,此时可以提高性能,但不再是精确一次了。
注意:Flink通过回退和重放Source数据流,Exactly Once并不意味着每个event只被精确处理一次,而是指每个event只会精确一次地影响由Flink管理的状态。
At Least Once
数据不丢,但可能重复
具体做法是失败即从源头重试执行,但事件可能被处理多次。比如CheckpointBarrier不对齐时就进行快照,则可能导致先到达的那个流的数据被处理多次。
At Most Once
保证数据或事件最多由应用程序中的所有算子处理一次
具体做法是不尝试从错误中恢复
At Least Once
Checkpoint对齐有时候可能会增加大量耗时,对于需要超级低延时、但又要求一致性的程序,可以关掉对齐。也就是说,算子一旦看到Checkpoint Barrier就开始生成Checkpoint快照。
当跳过Checkpoint对齐后,当一些输入流的BarrierN来到后,此时算子标记完成后可以继续处理所有输入,这就造成算子可能在CheckpointN完成之前又同时在处理属于CheckpointN+1的数据,因为这些后来的数据虽然在Checkpoint N Barrier之后,但会将其包含在这次Checkpoint备份的状态之中。
这就造成在恢复CheckpointN时这些记录重复出现,因为他们既在CheckpointN的状态快照之中(已经影响了Checkpoint N记录的状态)又会在恢复时作为Checkpoint N之后数据的一部分被重放导致重复计算!
Exactly Once
所以要想Exactly Once
,就要开启对齐,使用InputBuffer将对齐阶段继续接收的数据缓存,等待对齐完成后继续处理,此时如果从CheckpointN恢复时,自然其保存的状态就不会有CheckpointN后来的数据的干扰了。
注意:Checkpoint对齐只能用于拥有多个前驱结点的算子(如Join)和拥有多个后置节点的算子(repartitioning/shuaffle)。
所以只有一个后置或前驱结点的单并行算子(如map、flatMap、filter等)即使在at least once
模式也会表现为exactly once
。
Checkpoint Barrier对齐与不对齐例子可参考一文搞懂 Flink 的 Exactly Once 和 At Least Once
转自 一文搞懂 Flink 的 Exactly Once 和 At Least Once,作者范瑞
Savepoint 其实就是手动触发的 Checkpoint,它依靠常规的 checkpoint 机制获取程序的快照并将其写入StateBackend保存。
Savepoints 允许在不丢失任何状态的情况下升级程序和 Flink 集群。
Savepoint内部有一个类似map结构,为每个有状态算子保存了Operator ID -> State
的映射。
所以我们使用DataStreamApi时,尽量为每个有状态算子声明ID,方便恢复。如果不指定会自动生成ID,只要这些ID不变就能手动从savepoint恢复。这些ID取决于程序,对程序改动很敏感。
正例如下:
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
此时savepoint生成的映射如下:
Operator ID | State
------------+------------------------
source-id | State of StatefulSource
mapper-id | State of StatefulMapper
可以看到,只生成了有状态算子的映射。而print
这样的无状态算子被忽略,不会成为状态一部分。
allowNonRestoredState
跳过无法恢复的状态将Checkpoint Barrier手动插入到所有Pipeline中从而产生分布式快照。
使用EventTime,所以每个事件总是放入同一个窗口,保证结果一致性。
可参考:
Checkpoint 会将 timer 以及有状态算子中的状态进行一致性快照保存, 包括Connector(比如KafkaConnector、HDFSConnecotr等),Window以及任何用户自定义State。
DataStream API涉及到的State有:
CheckpointedFunction
来 对存入State的值实现容错。 StreamingFileSink
实现该接口,使用状态存储了Bucket、Part-File,且实现了容错。State Backend管理的状态包括:
org.apache.flink.streaming.api.datastream.KeyedStream
使用的Keyed State
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
来直接管理的State开启Checkpoint后,可以将上述State Checkpoint持久化,防止数据丢失,出错时可一致性恢复。而具体选择的StateBackend
就决定了State内部如何组织表现,以及如何、在哪进行Checkpoint。
Checkpoint存储的可能位置包括JobManager内存、文件系统、数据库等(State默认存储在TaskManager内存中,而Checkpoint保存在JobManager内存中),具体取决于所配置的State Backend
(比如持久化巨型状态就不适合放在内存),当前有以下State Backend可用于存储 Checkpoint State:
Job级别的StateBackend配置方式如下:
StreamExecutionEnvironment.setStateBackend()
这定义了该Job保存State的数据结构以及Checkpoint数据存储位置。
全局StateBackend配置方式如下:
MemoryStateBackend为不配置时的默认选项。如果要修改默认配置,请修改flink-conf.yaml
文件中的state.backend
:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
其中:
state.backend
可选内置的jobmanager
(MemoryStateBackend), filesystem
(FsStateBackend), rocksdb
(RocksDBStateBackend),或自己实现自StateBackendFactory
的类全限定名,如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
。
state.checkpoints.dir
定义了StateBackend写Checkpoint数据和元数据文件路径。目录结构如下:
/user-defined-checkpoint-dir
/{job-id}
|
+ --shared/
+ --taskowned/
+ --chk-1/
+ --chk-2/
+ --chk-3/
...
shared
目录保存了可能被多个Checkpoint引用的文件;taskowned
存储JM不能删除的文件;chk
开头的是每次checkpoint使用的文件,数字表示checkpointId。
MemoryStateBackend为不配置时的默认选项。
存储位置
异步快照
支持,且默认开启,强烈建议使用异步快照来防止数据流阻塞。
要关闭异步采用同步方式(官方建议只在Debug时关闭异步Checkpoint):
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
使用限制
每个单独的State默认最大5MB,可用MemoryStateBackend构造函数设置
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, false))
但是必须注意该参数谨慎调大,因为State Checkpoint时TaskManager会将State Checkpoint后的数据通过限制了大小的RPC方式发送给JobManager,而JobManager需要在内存保存来自各个TaskManager的状态数据。太大了可能导致OOM!
特点
调优
managed memory
内存设为0,因为RocksDB StateBackend才会使用该部分内存。这样设定可以使得用户代码可以使用JVM能提供的最大值来执行。
存储位置
异步快照
支持,且默认开启异步快照Checkpoint来防止数据流阻塞。
如果要关闭异步采用同步方式:
new FsStateBackend(path, false);
这里的Path
是Checkpoint持久化目录,如hdfs://namenode:40010/flink/checkpoints
或file:///data/flink/checkpoints
特点
适用场景
调优
managed memory
内存设为0,因为RocksDB StateBackend才会使用该部分内存。这样设定可以使得用户代码可以使用JVM能提供的最大值来执行。
可参考:
简介
RocksDB 是一个以日志合并树( LSM 树,Kudu、HBase都有使用)作为索引结构的 KV 存储引擎。当用于在 Flink 中存储 kv 状态时,Key由<Keygroup,Key,Namespace>
的序列化字节串组成,而Value由状态的序列化字节组成。
每次注册 kv 状态时,它都会映射到列族(column-family),并将键值对以字节存储在 RocksDB 中。这意味着每次读写(READ or WRITE)操作都必须对数据进行反序列化或者序列化,与 Flink 内置的 in-memory 状态后端相比,会有一些性能开销。
存储位置
仅支持异步快照
RocksDBStateBackend仅支持异步快照,且默认开启异步快照Checkpoint来防止数据流阻塞。
可支持增量Checkpoint
RocksDB是唯一可支持的
使用限制
由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持2^31
字节。 注意: RocksDB 使用merge算子的状态(例如ListState)累积数据量大小可能悄悄超过 2^31
字节,会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。
特点
可保留的State大小仅受可用磁盘空间量的限制。
与平时将State保留在内存中的FsStateBackend相比,RocksDBStateBackend可以保留非常大的状态。 但这也意味着若使用RocksDBStateBackend,则可以实现的最大吞吐量将降低。
每次读取状态都有序列化/反序列化开销
从RocksDBStateBackend进行的所有读/写都必须经过序列化/反序列化以检索和存储State对象,这也比基于堆的StateBackend开销大很多。
可使用增量快照
不受 Java 垃圾回收的影响,与 heap 对象相比,它的内存开销更低
RocksDB是唯一可支持增量Checkpoint的
适用场景
Flink花了很多功夫在内存管理上,以使得TM很好得利用内存,不至于在容器环境因为内存超限被杀掉,也不会因为内存利用率过低导致大量内存数据落盘或缓存命中率太低。
默认,RocksDB的可用内存配置为TM的一个Slot的内存量,一般用户无需调整细节,只需在不够时增加整体内存大小即可。state.backend.rocksdb.memory.managed
就开启了RocksDB使用Managed Memory
,Flink通过配置 RocksDB 来确保其使用的内存正好与 Flink 的Managed Memory预算相同,计算粒度是Per-Slot
。也就是说,Flink会为一个Slot上的所有 RocksDB 实例使用共享的 RocksDB cache
和 RocksDB write buffer manager
调整缓冲区:
state.backend.rocksdb.memory.write-buffer-ratio
即分配给写入缓存的比例,默认0.5即50%。state.backend.rocksdb.memory.high-prio-pool-ratio
即分配给写入缓存的比例,默认0.1即10%,这部分内存优先分配给RocksDB的索引和过滤器。但专业用户也可以手动为RocksDB每个列族(每个算子的每个State就对应了一个列族)分配内存,用户自己来确保总内存不会超限。
这里说的Timer是指用来触发窗口或回调ProcessFunction的类似操作,可基于事件时间或处理时间。
RocksDBStateBackend中Timer默认存在RocksDB,需要一定成本维护,所以也可以将Timer存在Java堆。
Timer较少时,可将state.backend.rocksdb.timer-service.factory
设为heap
,可获得更好的性能。但这样设置后,Timer状态就不能被异步快照存储了。
对于HeapKeyedStateBackend,有两种实现:
支持异步 Checkpoint(默认):存储格式CopyOnWriteStateMap
仅支持同步 Checkpoint:存储格式 NestedStateMap
namespace用来标注如属于哪个window
MemoryStateBackend
内使用HeapKeyedStateBackend
时,Checkpoint 序列化数据阶段默认有最大 5 MB数据的限制
RocksDBKeyedStateBackend
的每个 state 都存储在一个单独的 column family 内,使用了基于LSM树的磁盘、内存混合型DB 写入相关操作如下图
SSTable
读取时:
Active MemTable
。SSTable
bloom filter
减少大量磁盘访问,进行过滤增量时(可参考Managing Large State in Apache Flink: An Intro to Incremental Checkpointing以及 翻译版):
以下转自Apache Flink 管理大型状态之增量 Checkpoint 详解,作者Stefan Ricther & Chris Ward,翻译 邱从贤(山智)
关于控制便捷性与性能之间平衡的策略可以参考此文档:
可参考:
以下配置在conf/flink-conf.yaml
中配置,改变后需要重启Flink应用。
Key
Default
Type
Description
state.backend
(none)
String
保存和Checkpoint State的位置
state.backend.async
true
Boolean
StateBackend是否应在可能且可配置的情况下使用异步快照方法。 某StateBackend可能不支持/仅支持异步快照,会忽略此选项。
state.backend.fs.memory-threshold
1024
Integer
State数据文件大小的最小值。 所有大小小于该值的State Chunk都以内联方式存储在根Checkpoint元数据文件中。
state.backend.fs.write-buffer-size
4096
Intege
写入文件系统的Checkpoint流的写缓冲区的默认大小。 实际的写缓冲区大小为本选项和state.backend.fs.memory-threshold
的最大值
state.backend.incremental
false
Boolean
StateBackend是否应创建增量Checkpoint(如果可能)。 对于增量Checkpoint,仅存储与前一个Checkpoint的差异,而不存储完整的Checkpoint状态。 某些StateBackend可能不支持,会忽略此选项
state.backend.local-recovery
false
Boolean
默认禁用。为此StateBackend配置本地恢复。 当前,本地恢复仅涵盖Keyed-StateBackend。 注意,目前MemoryStateBackend不支持本地恢复,请忽略此选项。
state.checkpoints.dir
(none)
String
用于在Flink支持的文件系统中存储Checkpoint的数据文件和元数据的默认目录。 必须从所有参与的进程/节点(即所有TaskManager和JobManager节点)访问存储路径
state.checkpoints.num-retained
1
Integer
要保留的最大已完成Checkpoint数
state.savepoints.dir
(none)
String
Savepoint的默认存储目录。 由StateBackend用于将Savepoint写入文件系统(适用于MemoryStateBackend,FsStateBackend,RocksDBStateBackend)
taskmanager.state.local.root-dirs
(none)
String
定义用于存储基于文件的State以进行本地恢复的根目录。 当前,本地恢复仅涵盖Keyed-StateBackend。 当前,MemoryStateBackend不支持本地恢复,请忽略此选项
为了使Flink应用程序在规模很大时仍能可靠运行,必须满足两个条件:
Checkpoint关键指标:
checkpoint_start_delay
表示从Checkpoint被Checkpoint发起到算子真正开始执行Checkpoint时间 ,计算公式如下:
checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration
如果该值持续高值,说明checkpoint barrier
从Source流到算子的时间过长,一般就意味着出现了长期背压的情况。
CheckpointBarrier对齐期间的数据缓存量
在开启exactly-once
语义后,多路input的算子收到一个input的Barrier就需要做对齐操作,在此期间就需要缓存数据。理想情况下缓存数据应该比较少,否则说明多个input的Bairrer长期不对齐。
当Checkpoint耗时过长,意味着Checkpoint耗费太多资源,算子进展太少,虽然是异步Checkpoint,但依然会对整个应用性能产生影响。
此时就应该使用StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints
调大Checkpoint之间最小间隔时间,该配置是定义了当前Checkpoint的结束与下一个Checkpoint的开始
之间必须经过的最短时间间隔,官网图例如下:
开启增量Checkpoint
将默认存在RocksDB中的Timer改为存在Java堆,谨慎使用!
Timer较少时,可将state.backend.rocksdb.timer-service.factory
设为heap
,可获得更好的性能。但这样设置后,Timer状态就不能被异步快照存储了。而且可能增加Checkpoint时间,大小也不能超过能存限制**,谨慎使用!**
Tuning RocksDB Memory
最直接的方式就是调大managed memory
,可以显著提高性能。
默认只有0.4比例,除非程序逻辑本身需要很多JavaHeap内存,否则可尽量多给managed memory
因为每个状态对应了一个RocksDB列族,每个列族都需要独有的write buffer,所以拥有很多状态的应用通常需要更多内存
可设state.backend.rocksdb.memory.managed:false
来对比性能
非托管模式中,除非使用了ColumnFamily
,否则上线公式为140MB * num-states-across-all-tasks * num-slots
(State包括Timer)
如果拥有很多状态的应用中观察到频繁地MemTable flushes
,说明写入有瓶颈了,如果你不能给RocksDB更多内存了,那此时可以配置state.backend.rocksdb.memory.write-buffer-ratio
将更多内存分配给write buffer。
专家可以尝试使用RocksDBOptionsFactory
来调整arena block size
, max background flush threads
等,以减少MemTable flushes
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// increase the max background flush threads when we have many states in one operator,
// which means we would have many column families in one DB instance.
return currentOptions.setMaxBackgroundFlushes(4);
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// decrease the arena block size from default 8MB to 1MB.
return currentOptions.setArenaBlockSize(1024 * 1024);
}
@Override
public OptionsFactory configure(Configuration configuration) {
return this;
}
}
要想是的Flink Job运行可靠,一般可遵循以下步骤规划资源:
savepoint
来重设并行度时的上限默认关闭。
目前只支持snappy compression algorithm (version 1.1.4):
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
注意,此选项不适用于增量快照,因为增量快照使用的是RocksDB的内部格式,始终使用开箱即用的snappy compression 。
Checkpoint压缩工作粒度为keyed state
下的key-group
,解压时也是此粒度,方便扩缩容。
Checkpoint时,每个Task都会生产一个状态快照,然后异步写入StateBackend。随后,每个Task会发送一个ACK给JM告知状态写入成功,该ACK是一个句柄,带有state在StateBackend中的位置。JM在收集完所有task的ACK后,将他们封装到一个Checkpoint对象中。
在恢复时,JM打开最近的Checkpoint,然后将其中的state文件句柄发回给各个对应的task以进行恢复。
使用分布式StateBackend的好处是容错性和方便扩缩容;坏处是必须通过网络远程读取访问,可能在大型状态场景导致很长的恢复时间,尽管可能只是因为很小的错误发生。
核心思想就是每次Checkpoint时,写一份主副本到远程StateBackend,再写一份副本到本地(磁盘或内存)。
这样,在恢复时大多数task只需要找到之前本地state就能从本地恢复无需从远程下载。
Checkpoint的StateBackend副本始终被认为是主副本,而本地副本是第二副本。而且他们格式不一定相同,比如存放在内存的本地状态就是Java对象。
Checkpoint时,主副本必须成功此次Checkpoint才能成功,而如果此时本地副本写入失败不会导致此次checkpoint失败。
主副本被JM确认和管理,而第二副本是由TM管理,生命周期可独立于主副本。比如保留3个最新主副本,而只保留一个本地最新副本。
恢复时,总是先尝试本地副本,如果失败就找远程主副本,如果还是失败就根据配置可找上一个checkpoint继续恢复。
更巧妙的是,即使Flink因为错误只写入部分状态到本地副本,也会尝试恢复这部分状态,然后再去远程副本恢复其他状态,因为远程状态总是完整的。
但如果TM挂了,则关联的本地状态也会全部丢失。
默认关闭。
配置
默认
类型
说明
state.backend.local-recovery
false
Boolean
默认禁用。为此StateBackend配置本地恢复。
当前,本地恢复仅涵盖Keyed-StateBackend。
注意,目前MemoryStateBackend不支持本地恢复,请忽略此选项。
注意,目前非对齐Checkpoint不支持本地Checkpoint副本恢复。
当前只支持KeyedStateBackend,未来会支持算子和timer的状态。
目前支持的StateBackend如下:
Task本地恢复模式中假定在错误后保留task调度,工作机制如下:
在减少Checkpoint花费的时间时,可首先考虑开启增量Checkpoint。 与完整Checkpoint相比,增量Checkpoint可以大大减少Checkpoint的时间,因为增量Checkpoint仅记录与先前完成的Checkpoint相比发生变化的部分,而不是生成完整数据。
一个增量checkpoint依赖之前的若干checkpoint构建。由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。
如果网络为瓶颈,则从增量Checkpoint恢复可能会较慢,因为需要读取更多delta增量文件;而如果CPU或IO为瓶颈,则从增量Checkpoint恢复更快,因为这种方式不会像全量或savepoint那样使用并解析Flink Key/Value 快照格式数据来重建本地RocksDB表。
增量Checkpoint开启方式:
new RocksDBStateBackend(path, TernaryBoolean.TRUE);
这里的Path
是Checkpoint持久化目录,如hdfs://namenode:40010/flink/checkpoints
或file:///data/flink/checkpoints
。
或使用配置state.backend.incremental: true
开启
需要注意的是,只要开启了增量Checkpoint,则在WebUI上显示的Checkpointed Data Size
就只代表增量Checkpoint数据大小,而不是整个State大小了。
是PhysicalGraph的节点,一个task是Job的基本组成单元,在Flink运行时被执行。
Task 精确封装了一个 Operator 或者 Operator Chain 的并行实例。
一个Sub-Task是指处理数据流的一个partition的task。
该术语强调同一个Operator 或者 Operator Chain 拥有多个并行实例由多个Task执行。
Operator即算子,是LogicalGraph的节点。
每个算子执行特定操作。
分布式计算中,Flink 将算子(operator)的 subtask 按一定规则链接成若干task,每个 task 由TaskManager上的一个线程执行。
把算子链接成 task 能够有效减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量。
下图的 dataflow 由5个 subtasks 执行,因此具有5个并行线程运行:
Operator Chain配置详情可参考:6.2.2
将两个连续的算子(即不会造成repatitioning的算子)连接即将他们放在同一个线程执行,同一个算子链内的算子直接交换数据,而不会进行序列化/反序列化或Flink网络相关操作,以获得更好的性能,优点:
Flink默认会尽可能将算子连接(比如两个map算组),但也可以自主控制:判断是否构成Chain的条件很苛刻,需要以下提交件全部都为true:
SlotSharingGroup
ChainingStrategy
不能是NEVER
ALWAYS
(可参与上下游连接,如FlatMap、Filter、Map)或HEAD
(只能作为上游与下游连接,但不能与下游连接,如Source)ALWAYS
ForwardPartitioner
,即发送和接收分区一一对应pipeline.operator-chaining
设置。下面这幅图,展示了Source并行度为1,FlatMap、KeyAggregation、Sink并行度均为2,最终以5个并行的Task线程来执行:
可以看到,Key Agg
和Sink因为符合Chain的条件被连接到一起,作为一个Task执行。
可在job级别禁用 Operator Chain:
StreamExecutionEnvironment.disableOperatorChaining()
细粒度控制:
开启新的Operator Chain
注意startNewChain
必须用在算子之间,不能直接放在stream后面。
以下例子filter是单独的task,但第一个map和第二个map组成了一个Operator Chain
someStream.filter(...).map(...).startNewChain().map(...)
算子级别禁用Operator Chain
someStream.map(...).disableChaining();
以下为入度为1,出度为2的一个OperatorChain示例。
OperatorChain
为一个算子链的呈现类
OperatorChain对外部来说就是一个整体,只需要将input数据传输给该算子链的HeadOperator即可
比如上图的算子链就可以看作是入度为1,出度为2的一个算子整体
上图中的实线就对应了JobEdge
上图中的虚线就是OperatorChain内部数据传输ChainingOutput
,不会经过序列化/反序列化、网络传输,而是直接通过方法传递处理。
OperatorChain.ChainingOutput如下
static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
// operatorChain的下游算子
protected final OneInputStreamOperator<T, ?> operator;
protected final Counter numRecordsIn;
protected final WatermarkGauge watermarkGauge = new WatermarkGauge();protected final StreamStatusProvider streamStatusProvider;
// 为一个算子标记side output
@Nullable
protected final OutputTag<T> outputTag;
public ChainingOutput(
OneInputStreamOperator<T, ?> operator,
StreamStatusProvider streamStatusProvider,
@Nullable OutputTag<T> outputTag) {
// operatorChain的下游算子
this.operator = operator;
...
this.streamStatusProvider = streamStatusProvider;
this.outputTag = outputTag;
}
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
return;
}
pushToOperator(record);
}
...
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
operator.setKeyContextElement1(castRecord);
// 直接调用下游算子处理数据
operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
...
}
首先,Source和Sink是特殊的算子,用来数据摄取和输出。
Operator 并行度配置可参考这里。
DataStream可以在两个算子间传输数据,有以下两种模式:
一对一
例如上图中Source
和 map()
算子之间。
可保留元素的分区和排序信息(也就是说map()
算子的1号实例可以相同顺序看到跟Source
算子的1号实例生产顺序相同的元素)。
重分发-类似MR Shuffle
例如上图中的 map()
和 keyBy/window
算子 之间,以及 keyBy/window
和 Sink
之间。
会更改数据所在Stream分区。注意此时只能保证一个算子subtask发到一个下游算子subtask的元素顺序性。如上图keyBy/window 的 subtask[2] 接收到的 map() 的 subtask[1]的数据有序,但发送到Sink的所有数据中,无法确定不同key的聚合结果的到达顺序。
每个算子subtask发送数据到不同的下游算子subtask,分发依据是具体的transformation
(相关方法在org.apache.flink.streaming.api.datastream.DataStream
):
keyBy
按照key的值hash后重分区到某个下游算子实例
broadcast
广播到所有下游算子实例分区
rebalance
轮询分配到下游算子实例分区
global
全部分配到第一个下游算子实例分区
shuffle
随机均匀分配到下游算子实例分区
forward
上下游并行度一致时,发送到对应的位于本地的下游算子分区
rescale
轮询方式将输出的元素均匀分发到下游分区的子集。
子集构建依赖于上游和下游算子的并行度。
每个TaskManager都是一个 JVM 进程,并且可以在不同的Slot线程中执行多个 subtasks,Slot控制 TaskManager 并发执行 task 的数量,每个TaskManager至少一个Slot。他们的关系示例如下图,此时作业基本并行度为2,一个taskManager的slot数量为3:
每个 Slot是TaskManager资源分配和调度最小单元,代表一份固定资源子集。例如,具有三个 slot 的 TaskManager 会将其管理的内存资源平均分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但这也意味着它们只能拥有固定的划分了的资源。
注意Slot并没有实现 CPU 隔离,当前 Slot 之间只实现了内存资源隔离划分。
通过调整 slot 的数量,用户可以决定 subtask 的隔离方式:
每个 TaskManager 只有一个 slot
意味着每个 task 在一个单独的 JVM 中运行(即每个task独享一个TM进程) 。但可以注意到,目前Flink1.11版本已经支持多个Task线程共享一个Slot,所以以上结论已经不再适用。
每个 TaskManager 多个 slot
意味着多个 subtask 共享同一个 JVM,分别在各自slot线程运行。
在同一个 JVM 中运行的Task会共享 TCP 连接(通过IO多路复用)和心跳信息,可以减少数据的网络传输。它们还可能共享数据集和数据结构,从而降低每个 task 的性能开销。
默认情况下,Flink 允许 subtask们 共享 slot,即使它们是不同 task 的 subtasks,只要它们来自同一个 job就行。因此,一个 slot 在极端场景下甚至可能会负责这个 job 的整个执行pipeline!
允许 slot sharing 有两个好处:
Flink 集群的Slot数量总数需要与 job 中使用的最高并行度(highest parallelism
)完全相同。这样不需要计算作业总共包含多少个 tasks(具有不同并行度)。
更好的资源利用率
如果不能共享slot ,则简单的 subtask(比如source
/ map
等)将会占用和复杂的 subtask (如window
)一样多的资源。
而通过slot共享,将之前示例中的job最大并行度从 2 增加到 6 就可以完全利用 按slot分隔 的资源,同时确保开销大的 subtask 在 所有TaskManager上均匀分布:
一个Slot并不一定只有一个线程运行,比如上图中一个Task Slot内部就运行了2-3个线程!
这些线程共同分享Slot获得的分隔内存资源。
不同算子是否能运行在一个Slot取决于SlotSharingGroup
根据经验,合理的 slots 数量应该和 CPU 核数相同。在使用超线程(hyper-threading)时,每个 slot 将会占用 2 个或更多的硬件线程上下文(hardware thread contexts)。
Flink会将相同SlotSharingGroup
的算子放到相同的slot执行,而将SlotSharingGroup不同的算子放到其他slot,这可用来隔离slot。
如果所有input算子都在相同SlotSharingGroup,则下游算子的SlotSharingGroup继承自input算子。
请记住,SlotSharingGroup软定义了不同的task(JobVertex)是否可在一个Slot中运行
默认的slot sharing group
的名字是default
,算子也可以显示地放入指定名字的slot sharing group
:
someStream.filter(...).slotSharingGroup("name");
CoLocationGroup
是JobVertex组,硬性定义了某个JobVertex的第i个子任务必须和其他所有JobVertex的第i个子任务运行在相同TaskManager上。
SinkFunction
相关方法发送到目的地。当Flink某个任务失败挂掉时,会对失败任务和其他受影响任务重启来恢复整个作业。
Flink中主要有两类策略来控制任务重启:
可参考
如果Flink部署在yan上,则需要依赖Yarn来实现高可用,Yarn会对失败挂掉的JobManager(AM)重启,最大重试次数的配置是yarn-site.xml
的yarn.resourcemanager.am.max-attempts
。
另一方面,Flink还支持高可用配置。
优先级最高的是job自己指定的,然后才是flink-conf.xml中指定的:
采用restart-strategy
配置。
当不允许checkpoint时,采用的是none
(也可以填off
、disable
)策略,即不重启
可用配置项:
代码:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
开启checkpoint时,不配置重启策略时默认采用fixeddelay
(也可以填fixed-delay
、failure-rate
),是一种固定间隔重启的策略,默认会1秒间隔来进行Integer.MAX_VALUE次重启尝试,超过最大次数就会导致job最终失败了。
可用配置项:
fixed-delay
1 s
代码设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
))
failurerate
开启checkpoint时 还可选failurerate
, (也可以填failure-rate
)。按失败率重启。
失败率是指每个时间间隔内发生的失败次数。
当失败率超过设定阈值,则job最终失败了。
可用配置项:
1 s
,指定重启间隔时间,比如20 s
1 min
,指定计算失败率的时间间隔代码设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
))
配置项为jobmanager.execution.failover-strategy
,Failover策略 相关接口为FailoverStrategy
可选值full
,重启job所有task。
RestartAllFailoverStrategy
默认值region
,当task失败时,重启所有可能被该出错task影响的所有task。
RestartPipelinedRegionFailoverStrategy
该策略将task分组为不同region,当任务失败时就计算必须重启以恢复job的最小范围的region set。
一个Region是指一个pipeline,是一些task的集合,这些task通过pipeline交换数据通讯:
ExecutionConfig
中配置的 ExecutionMode
决定。需要重启的 Region 的判断逻辑如下:
单点重启
可能无法保证一致性,但资源开销最小。
可参考
由JM保证TM恢复期间的跨TM一致性。
因为JM用来调度任务、管理资源,所以如果他挂了就导致整个程序失败了,这就是JM单点问题。
我们可以配置JM HA高可用,即当JM失败后进行恢复,适用于 Standalone和Yarn Cluster模式,这里我们主要分析Yarn Cluster模式。
相关源码在ZooKeeperHaServices
。
ZK存储的Flink相关数据结构如下:
high-availability.zookeeper.path.root
配置yarn-site.xml
配置yarn-site.xml
内的yarn.resourcemanager.am.max-attempts
即am重试次数,要算上初次启动,默认值为2,即当设置了JM HA后允许JM出错重启一次
flink-conf.yaml
ZK要保证JM服务自身在恢复期间的一致性,就必须使用第三方服务来存储少量的恢复用的元数据(比如最后提交的Checkpoint等),以及帮助选举哪个备选JM当Leader,避免脑裂。
high-availability
定义了HA模式,可选项如下。
NONE
,不启用HAZOOKEEPER
,ZK模式的HAhigh-availability.cluster-id
定义了Flink集群ID,用来隔离不同Flink集群。
需要为Standalone模式设置,会在YARN和Mesos中自动推断。
high-availability.storageDir
Flink持久化元数据的文件系统路径(URL)
high-availability.zookeeper.path.root
Flink存放集群节点元数据信息的ZK上的根目录路径,ZK上存了指向该数据的指针信息。
high-availability.zookeeper.quorum
Flink HA模式时使用的ZK法定节点数(quorum)
high-availability.jobmanager.port
更多Flink HA Zk高级配置可参考Advanced High-availability ZooKeeper Options
flink-conf.yaml
AM/JM重启配置flink-conf.yaml
内的yarn.application-attempts
和yarn.application-attempt-failures-validity-interval
。
Flink On Yarn时需要依赖Yarn来实现高可用,Yarn会对失败挂掉的JobManager(AM)重启,最大重试次数的配置是yarn-site.xml的yarn.resourcemanager.am.max-attempts
。
Flink的yarn-client有一些配置可以控制在container失败的情况下的行为,也可通过$FLINK_HOME/conf/flink-conf.yaml
或启动yarn-session.sh
时以-D
参数指定:
yarn.application-attempts
ApplicationMaster(运行着JobManager)及其TaskManager Container的最大失败重试次数。
当没有设置HA时默认值为1,此时若AM挂掉就直接导致整个flink yarn session失败了。
如果设为较高的值,使得可在失败时Yarn可多次尝试重启AM,但会导致整个Flink集群重启,而Yarn Client会丢失到该Flink Cluster的连接,JobManager的地址也会改变,所以需要在重启后手动设置JobManager的 host:port,所以推荐保留该值为1。
如果超过该阈值,不会再重启AM,该yarn session上提交的任务也会全部停止。比如设为5时,代表初始化1次、最大重试4次。
如果Yarn有一些额外操作(如抢占、硬件故障或reboot、NM resync等)需要重启Application,则不会计算在次数内,参考Jian Fang’s blog
注意该值不能超过yarn中yarn.resourcemanager.am.max-attempts
配置的最大重启次数。
yarn.application-attempt-failures-validity-interval
默认为10000,单位毫秒
定义yarn.application-attempts
统计的时间窗口。如果设为-1则为全局统计。
也就是说,如果定义的时间窗口内统计到的attempts达到了阈值,则会停止尝试重启,导致Job失败!
注意Yarn Container在不同版本中的行为不太相同,在大于等于2.6.0后应将yarn.application-attempt-failures-validity-interval
设置为Flink的Akka超时值。
相关连接:
flink-conf.yaml
:
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
如果在Flink WebUI上看到某个task发生back pressure warning
,那么通俗地说,这就意味着数据生产速度大于下游算子消费速度,俗称反压/背压。
数据在Flink中从Source到Sink向下流动,而反压是反向传播的。
就拿最简单的Source -> Sink
来说,如果观察到Source出现反压,则说明Sink消费速度已经跟不上Source生产速度了,所以向上游Source算子产生反压。
Flink JM会周期性地调用Task.isBackPressured()
方法,以从运行中的task中采样,监控反压指标。
默认每次采样会为每个task每50ms采样100次,可在WebUI观察该指标(60秒刷新一次,避免TM过载),比如0.01表示百分之1的样本发生反压。
该指标有几种情况:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
@Override
public boolean isBackPressured() {
if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) {
return false;
}
final CompletableFuture>[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length];
for (int i = 0; i < outputFutures.length; ++i) {
outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture();
}
return !CompletableFuture.allOf(outputFutures).isDone();
}
目前判断是否产生背压是通过output buffer
可用性,如果没有足够的buffer可用于输出说明该Taskb受到了来自下游的反压。
本章主要介绍了Flink的一些基本概念,下一章讲Flink的下载、安装和启动,请点击Flink系列2-安装和启动
手机扫一扫
移动阅读更方便
你可能感兴趣的文章