Flink学习1-基础概念
阅读原文时间:2021年04月26日阅读:8

Flink学习1-基础概念

Flink系列文章

摘要

本文是作者学习Flink的一些文档整理、记录和心得体会,希望与大家共同学习探讨。

1 Flink简介

1.1 概念

Apache Flink是一个开源的分布式流式处理框架,高性能高可用,他有强大的流式和批处理能力,通过语义保证数据处理精确性。流式处理方面,Flink能对有界、无界数据流做有状态的计算(stateful computations)。

1.2 特点

他有如下特点:

  • 能同时支持高吞吐和低事件延迟(亚秒级)

  • 真实时流处理

  • 基于数据流模型,支持DataStream API中的event time和无序处理

  • 优雅的Java和Scala API,集成了较丰富的streaming operator,自定义operator也较为方便,并且可以直接调用API完成stream的split和join,可以完整的表达DAG图。

  • 跨不同时间语义(event timeprocessing time)的弹性window(时间,计数,会话,自定义触发器)

  • 支持Flink托管的State

  • 具有Exactly Once处理保证的容错能力

  • 支持多种window语义,如Session, Tumbling, Sliding window,方便实时统计

  • 流式程序中自然背压

  • 支持流、批处理,且将批当作流的特例,最终实现批流统一。

  • Flink自主实现多层次内存管理而不完全依赖于JVM,可以在一定程度上避免大量Full-GC问题。

    在此基础上,批处理的lib包可支持图运算和机器学习(FlinkML);流式处理的lib支持复杂事件处理;流批统一的关系型SQL&Table API

  • 支持迭代计算

  • 程序可自动优化

  • 内置支持DataSet(批处理)API中的迭代程序(BSP)

  • 自定义内存管理,可在内存和核外数据处理算法之间实现高效,可靠的切换。在JVM层面实现了内存管理和优化

  • 可兼容Apache Hadoop MapReduce和Apache Storm

  • 可集成YARN,HDFS,HBase和Apache Hadoop生态系统的其他组件

1.3 示例

1.3.1 流处理

下面这个例子展示了用scala语言写的一段对5秒时间窗口内的数据进行流式word count的流处理程序:

case class WordWithCount(word: String, count: Long)
val text = env.socketTextStream(host, port, '\n')
val windowCounts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")
windowCounts.print()

1.3.2 批处理

下面这个例子展示了用scala语言写的对数据进行word count批处理程序:

case class WordWithCount(word: String, count: Long)
val text = env.readTextFile(path)
val counts = text.flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .groupBy("word")
  .sum("count")
counts.writeAsCsv(outputPath)

1.4 Flink对比其他

1.4.1 Storm

开发API较复杂,不支持状态托管, 用户必须自己处理状态持久化和一致性保证,如果某个挂了状态很难恢复

可参考

  • 流计算框架 Flink 与 Storm 的性能对比
    • 吞吐
      Storm 单线程吞吐约为 8.7 万条/秒,Flink 单线程吞吐可达 35 万条/秒。Flink 吞吐约为 Storm 的 3-5 倍。
    • 要求消息投递语义为 Exactly Once 的场景下只能选择Flink,Storm只支持“至多一次” (At Most Once) 和“至少一次” (At Least Once) 。
    • Flink支持状态管理、窗口统计等功能

1.4.2 JStorm

阿里巴巴fork,现在开始采用 flink的思想

1.4.3 Spark Streaming

微批,非真正的实时流式处理

1.4.4 Structured Streaming

同样是纯实时思想. 可参考

比较:

  • 都支持丰富时间语义,不过SS不支持IngestionTime

  • 处理模式
    SS除了定时触发、连续批触发以外,还支持了跟flink类似的连续处理模式,不再使用批处理引擎,而是类似Flink的持续处理模式,端到端延迟最低可达1ms。

  • Structured Streaming将实时数据当做被连续追加的表,流上的每一条数据都类似于将一行新数据添加到表中。
    Structured Streaming定义了无界表的概念,即每个流的数据源从逻辑上来说看做一个不断增长的动态表(无界表),从数据源不断流入的每个数据项可以看作为新的一行数据追加到动态表中。用户可以通过静态结构化数据的批处理查询方式(SQL查询),对数据进行实时查询。

  • 都可以注册流表,进行SQL分析

  • 运行架构
    类似,SS主节点是Driver,Flink是JobManager

  • 异步IO和维表Join
    FlinkSql支持异步维表Join;

    Structured Streaming不直接支持与维表的join操作,但是可以使用map、flatmap及udf等来实现该功能,所有的这些都是同步算子,不支持异步IO操作。但是Structured Streaming可直接与静态数据集join,也可以帮助实现维表的join功能,当然维表要不可变。

  • 状态管理
    Flink状态不需要用户自己维护,更灵活丰富

  • Join
    Flink Join支持更全,SS限制较多

2 Flink架构和模型

2.1 项目架构

2.1.1 Deploy-部署方式

  • Local
    单JVM,本地调试
  • Cluster-Standalone
    Flinke Standalone模式,不依赖其他资源
  • Cluster-Yarn
    最常用的模式,依赖Yarn
  • Cloud

2.1.2 Core

分布式的流式数据流的运行时,是Flink的核心实现层,包括分窗、各种Time、一致性语义、任务管理和调度、执行计划等。一般用户不用关心此层代码,只需调用上层API即可满足开发需求。

2.1.3 DataStream和DataSet API

建立在Core层之上,分别为流式处理和批处理API。需要注意的是,Flink的批处理屙屎建立在流式架构上的。

2.1.4 Libraries

建立在API层上的一些高级应用lib包,如机器学习、关系型API等。

2.2 Flink集群架构

2.2.1 概述






Flink集群主要有两个角色,即Master-JobManager和SlaveWorker-TaskManager。

2.2.2 JobManager-Master

2.2.2.1 职责

  • 协调分布式计算
  • 部署在yarn上时,负责申请container资源以启动TaskManager
  • Scheduler-调度任务
  • CheckpointCoordinator 触发、协调checkpoint
  • 故障故障

2.2.2.2 主要组件

  • ResourceManager
    负责资源分配和回收,内部有SlotManager负责管理TaskManagerSlot(最小资源分配和调度单位),整个Flink Cluster只有一个ResourceManager实例。

    Flink在不同运行环境(Yarn、Mesos、K8S等)中实现了不同ResourceManager,如YarnResourceManager

    主要方法:

    • registerJobManager(JobMasterId, ResourceID, String, JobID, Time)
      向ResourceManager注册一个JobMaster
    • requestSlot(JobMasterId, SlotRequest, Time)
      向ResourceManager申请一个Slot
  • Dispatcher

    • 为用户提供了Rest接口来提交执行Flink应用
    • 一个FlinkCluster只有一个Dispatcher实例
    • 持久化Job
    • 为每个提交的job开启一个新的JobManager以运行Job
    • 在master出错时恢复Job
    • 运行Flink WebUI以提供job运行界面
  • JobMaster

    每个JobMaster仅负责管理一个Job的执行,监控单个Job运行的所有Task。

    多个job可同时在一个Flink集群中运行,每个Job拥有自己的JobMaster。

    JobMaster主要职责有:

    • 单个Job生命周期管理,以及Job状态查询
    • 该Job对应Slot资源申请和管理
    • 调度Task执行
    • Failover容错
    • CheckpointCoordinator触发分布式Checkpoint快照

    JobMaster主要组件有:

    • RPC Endpoint
      基于Akka的RPC Server,组件之间通讯使用
    • Scheduler 使用JobGraph创建ExecutionGraph,并调度ExecutionGraph(包括为每个ExecutionVertex向Flink ResourceManager申请Slot资源、获取到资源后创建ExecutionVertex部署信息发送给TM),管理调度策略、Failover策略、Slot分配等
      • 会利用创建好的ExecutionGraph.schedulingTopology来创建failover策略类FailoverStrategy的实现类,比如RestartPipelinedRegionFailoverStrategy,即当某个节点失败时,重启一个Region的所有节点。
      • 创建调度策略SchedulingStrategy实现类,如EagerSchedulingStrategy,这个实现类表示所有Task在同一时间一起调度。调度策略详情可见 2.10.3。
    • CheckpointCoordinator
      负责定时触发(发送Checkpoint快照任务给各相关Task)算子和状态的分布式快照Checkpoint,管理整个分布式Checkpoint快照流程,搜集TM上报的状态快照ACK信息等。
    • SlotPool
      • 作用是服务由ExecutionGraph向本JM提起的Slot请求,当没有足够Slot时就向ResourceManager申请新的Slot。
      • 一旦申请到Slot,SlotPool也会保存他们,这样即使ResourceManager挂了,依然可以分配这里已空闲的Slot。
      • 当Slot无人使用时,会释放。
      • 所有Slot分配由一个自增的AllocationID标记

2.2.2.3 其他重要概念

一个Flink集群有一个JobManager进程实例,一些HA场景可能有多个StandBy JM。

2.2.3 TaskManagers-Slave worker

2.2.3.1 职责

  • 启动后,连接JobManager,等待分配任务
  • 执行 DataFowGraph中的 tasks(准确来说是 subtasks )
  • 缓存
  • 和其他TaskManager交换数据流

2.2.3.2 重要知识点

  • TaskManager至少一个
    每个 Job 至少会有一个 TaskManager。
  • 最小资源分配单位为Slot
    最小资源分配单位为Slot,他决定了并发能力。
  • TaskManagers个数
    Flink on YARN时,TaskManagers个数 = Job最大并行度 / 每个TaskManager分配的Slot数。也就是说TM数量由Flink根据你的Job情况自动推算,-yn启动参数失效了。

2.2.3.3 TaskExecutor

在TM上实际执行的类为TaskExecutor

2.2.4 客户端

客户端虽然不是Flink运行时和作业执行时的一部分,但它是被用作准备和提交 dataflow 到 JobManager 。

Job提交完成之后,客户端可以断开连接(分离模式),也可以保持连接来接收Job执行进度报告(attach模式)。

2.3 网络通讯架构

2.3.1 节点之间-Akka

Flink内部节点之间的通信是用Akka,比如JobManager、TaskManager、Yarn-ResourceManager之间的通信,比如状态从TaskManager发送到JobManager。

  • Akka优点
    • Akka是基于协程的,性能不容置疑
    • 基于scala的偏函数,易用性良好
  • Akka缺点
    • 只支持RPC通信
    • Akka版本可能跟用户Akka版本冲突,因为Akka不同版本之间无法互相通信,导致用户Akka版本无法升级。
    • Flink的Akka配置是针对Flink自身来调优的,可能跟用户自己代码中的Akka配置冲突。
    • 如果只是为了通信,没有必要用Actor Model。Actor Model重要的一点是解决了并发带来的问题。

可参考:

2.3.2 算子之间-Netty

算子之间基于Netty通信。

  • Netty优点
    Netty相比Akka而言更加底层,可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持。

可参考:


Flink Yarn Session部署流程如下:

  1. Client上传依赖jar到HDFS
    开启一个新的Flink Yarn Session时,Client首先检查申请的资源(AM所需的内存和vcore)可用的是否足够,然后将包括Flink和配置的jar上传到HDFS。

  2. Client为AM申请资源
    Client为Flink所用的AM向Yarn RM申请Container资源。

  3. Yarn启动AM(JobManager)
    Yarn的RM收到申请后,为AM在NM上分配首个Container,用来启动Flink AM。具体来说,NM会先为该AM做一些Container分配准备工作,如下载资源(就包括刚才Client上传的Flink Jar等文件)。准备完成后,就启动了AM。

    JobManager和AM运行在同一个Container中,所以AM知道JobManager的地址。所以在启动完成后,会为随后需要创建的TaskManager们生成一个新的Flink配置文件,该文件就包含了该JobManager的链接地址。该文件也会被上传到HDFS。

    此时,Flink的Web服务也会在该Container开始运行。

    注意,Yarn为app分配的所有端口都是临时端口,可使得用户并行执行多个Flink Yarn Session。

  4. AM向RM申请Container资源以启动TaskManager
    RM向拥有适合Container资源的NM发送分配指令,NM接到请求后先从HDFS下载相关的jar文件、配置文件,然后启动TaskManager。此时TaskManager就能正确运行,并连接到正确的JobManager。

  5. 此时一个Flink Yarn Session集群部署完毕,可以开始接受Job。

更多内容可参考:

2.5.1 概述

可参考:

注意:本章节基于Flink 1.10,该版本有大量修改。

Flink通过严格控制其各种组件的内存使用情况,在JVM之上提供有效的工作负载。Flink开发者已经竭尽可能来设置最优默认配置,但还是有一些时候需要细粒度内存调优。

Flink内存使用情况:

  • Total Process Memory(taskmanager.memory.process.size
    这个就是在容器环境(Yarn、Docker等)Flink程序请求的总内存大小。

    • Total Flink Memory(taskmanager.memory.flink.size
      不建议同时设定此选项和Total Process Memory,否则可能导致内存设定冲突

      • JVM Heap

        • Framework Heap
          JVM堆内存中Flink框架本身专用部分,一般不调整

          相关配置taskmanager.memory.framework.heap.size

        • Task Heap
          JVM堆内存中运行算子和用户代码专用部分

          相关配置taskmanager.memory.task.heap.size

      • Off-Heap Memory

        • Managed Memory
          由Flink管理的本地内存,Batch jobs用来排序、存放HashTable、中间结果的缓存;Streaming jobs的RocksDB State Backend。这块内存堆皮处理算子提高处理效率很重要,有些操作可直接在原始数据进行而无需序列化过程,所以Flink会尽可能在不超过配额前提下分配Managed Memory,并当该内存不足时,优雅得将数据落盘,避免OOM。

          相关配置taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction,都设置时size会覆盖fraction,如果都没设置则用默认值 0.4

          还可参考https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_tuning.html#configure-memory-for-state-backends

        • Direct Memory

          • Framework Off-heap Memory
            运行Flink框架的堆外直接(或本地)内存,一般不调整

            相关配置taskmanager.memory.managed.sizetaskmanager.memory.framework.off-heap.size

          • Task Off-heap Memory
            运行算子的堆外直接、本地的task专用内存

            相关配置taskmanager.memory.task.off-heap.size

          • Network Memory
            Flink用来在Task之间交换数据(比如网络传输Buffer)的直接内存。

            相关配置:taskmanager.memory.network.mintaskmanager.memory.network.maxtaskmanager.memory.network.fraction

    • 其他Off-Heap Memory

      • JVM metaspace
        相关配置taskmanager.memory.jvm-metaspace.size

      • JVM Overhead
        JVM其他开销使用的本地内存,比如线程栈、代码缓存、GC空间等。

        相关配置:taskmanager.memory.jvm-overhead.mintaskmanager.memory.jvm-overhead.maxtaskmanager.memory.jvm-overhead.fraction

2.5.2.1 概述

参考
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup.html

flink 1.11在flink 1.10调整了内存模型基础上,1.11进一步调整,JM和TM内存模型更统一

必须设定以下某项,因为没有默认值

TaskManager

JobManager

taskmanager.memory.flink.size

jobmanager.memory.flink.size

taskmanager.memory.process.size

jobmanager.memory.process.size

taskmanager.memory.task.heap.size
taskmanager.memory.managed.size

jobmanager.memory.heap.size

如果使用低版本升级到高版本Flink,可参考迁移指南

注意上图中Off-heap包括

  • Direct memory
  • Native memory

2.5.2.2 TaskManager内存模型


2.5.2.3 JobManager内存模型



Off-heap Memory用途

  • Flink框架,如Akka网络通信开销
  • 在作业提交时(例如一些特殊的批处理 Source)或是 Checkpoint 完成后的回调函数中执行的用户代码

2.5.2 内存所有配置

Flink大多默认配置已经足够好,一般只需要配置taskmanager.memory.process.sizetaskmanager.memory.flink.size,再配置下taskmanager.memory.managed.fraction控制Jvm堆和Managed Memory比例。

以下配置适用于TaskManager和JobManager:

Key

Default

Type

Description

taskmanager.memory.process.size

(none)

MemorySize

TaskExecutors的所有内存大小即Total Process Memory.
使用YarnConainter时应该和Contianer内存一致(不设本值,只指定yjmytm已经和Contianer内存一致)

taskmanager.memory.flink.size

(none)

MemorySize

TaskExecutor的总Flink内存大小即Total Flink Memory,除去JVM Metaspace和JVM Overhead

taskmanager.memory.framework.heap.size

128 mb

MemorySize

堆内Flink Framework内存,不会分配给Task Slot

taskmanager.memory.framework.off-heap.size

128 mb

MemorySize

堆内Flink Framework内存,包括direct 和 native,不会分配给Task Slot。
Flink 计算 JVM 最大 direct memory时会考虑本部分

taskmanager.memory.task.heap.size

(none)

MemorySize

Task Heap Memory,JVM堆内存中运行算子和用户代码的task专用部分.
未指定本值时,等于Total Flink Memory 减去(Framework Heap MemoryTask Off-Heap MemoryManaged MemoryNetwork Memory

taskmanager.memory.task.off-heap.size

0 bytes

MemorySize

Task Off-heap Memory,JVM堆外直接、本地内存中运行算子和用户代码的task专用部分.
Flink 计算 JVM 最大 direct memory时会考虑本部分

taskmanager.memory.managed.size

(none)

MemorySize

位于堆外非直接内存中的Managed MemoryBatch jobs用来排序、存放HashTable、中间结果的缓存;Streaming jobs的RocksDB State Backend Memory。
使用者即可以以MemorySegments的形式从内存管理器中分配内存,也可以从内存管理器中保留字节并将其内存使用率保持在该范围内。
如果未指定,则采用下一项配置来计算。

taskmanager.memory.managed.fraction

0.4

Float

位于堆外非直接内存中的Managed Memory比例,如果没有配置具体大小时,该空间大小由Total Flink Memory乘以本值得出。

taskmanager.memory.network.fraction

0.1

Float

(TaskManager Only)堆外直接内存中的Network Memory,Flink用来在Task之间交换数据(比如网络传输Buffer),用Total Flink Memory乘以本值来计算。但结果如果小于network.min就用min,大于max就用max。

taskmanager.memory.network.max

1 gb

MemorySize

network memory上限

taskmanager.memory.network.min

64 mb

MemorySize

network memory下限

taskmanager.memory.jvm-metaspace.size

96 mb

位于堆外的MemorySize

JVM Metaspace 内存

taskmanager.memory.jvm-overhead.fraction

0.1

Float

位于堆外的JVM其他开销使用的本地内存,比如线程栈、代码缓存、GC空间等。包括native内存但不包括direct内存,
Flink 计算 JVM 最大 direct memory时不会考虑本部分.
jvm-overhead区具体大小用Total Process Memory 乘以本值来计算。但结果如果小于jvm-overhead.min就用min,大于max就用max。

taskmanager.memory.jvm-overhead.max

1 gb

MemorySize

jvm-overhead上限

taskmanager.memory.jvm-overhead.min

192 mb

MemorySize

jvm-overhead下限

2.5.3 本地运行内存配置

本地ide直接运行且不运行集群时,只需做以下内存配置:

Memory component

配置项

本地运行时默认值

Task heap

taskmanager.memory.task.heap.size

无限大

Task off-heap

taskmanager.memory.task.off-heap.size

无限大

Managed memory

taskmanager.memory.managed.size

无限大

Network memory

taskmanager.memory.network.min
taskmanager.memory.network.max

无限大

以上配置不是必须的,未设定时采用默认值。

注意: 启动的本地进程的实际JVM堆大小不受Flink的控制,取决于您如何启动该进程。 如果您想控制JVM堆大小,则必须显式传递相应的JVM参数,例如 -Xmx,-Xms。

2.5.4 上/下界内存配置

2.5.4.1 概述

  • Network Memory可以是Total Flink Memory的一小部分
  • JVM overhead可以是Total Process Memory的一小部分

这种内存有三类设置情况,但必须在最大值和最小值之间,否则Flink会启动失败。以下以network举例。

2.5.4.2 fraction计算后处于区间

total Flink memory = 1000Mb,
network min = 64Mb,
network max = 128Mb,
network fraction = 0.1

此时network memory = 1000Mb x 0.1 = 100Mb,处于max/min之间.

2.5.4.3 fraction计算后处于区间之外

total Flink memory = 1000Mb,
network min = 128Mb,
network max = 256Mb,
network fraction = 0.1

此时 network memory = 1000Mb x 0.1 = 100Mb < min,所以 network memory = 128Mb

total Flink memory = 1000Mb,
task heap = 100Mb,
network min = 64Mb,
network max = 256Mb,
network fraction = 0.1

此时network memory 是total Flink memory的剩余内存,但必须位于min-max之间,否则失败。

2.5.5 JVM调优

可参考:

Flink进程启动时根据配置或派生来的内存组件大小自动推断设置JVM参数

JVM Arguments

TaskManager Value

JobManager Value

-Xmx and -Xms

Framework Heap + Task Heap Memory

JVM Heap Memory

-XX:MaxDirectMemorySize

Framework Off-Heap + Task Off-Heap(计算时包括了用户代码使用的本地非直接内存) + Network Memory

Off-heap Memory (计算时包括了用户代码使用的本地非直接内存;jobmanager.memory.enable-jvm-direct-memory-limit为true时才会设置此JVM选项)

-XX:MaxMetaspaceSize

JVM Metaspace

JVM Metaspace

2.5.6 内存调优指南

2.5.6.1 TaskManager

  • Framework Heap / OffHeap Framework Heap
    Framework Memory包括堆内和非堆两部分,一般不需要调优,除非确定Flink需要更多内存来存放内部数据结构或算子,以及特殊场景或数据结构,比如高并行度场景。

2.5.6.2 JobManager

  • Java Heap

  • OffHeap Heap
    包括JVM Direct MemoryNative Memory,当jobmanager.memory.enable-jvm-direct-memory-limittrue就开启对JVM Direct Memory内存大小限制,此时可通过-XX:MaxDirectMemorySize/ jobmanager.memory.off-heap.size限制。当发生了OutOfMemoryError: Direct buffer memory异常时可调整该值。

    如果不设,就从Total Flink Memory减去JVM Heap来推断该值。

2.5.6.3 TaskManager StateBackend

  • Heap StateBackend
    运行无状态Job或使用Heap StateBackend(包括Memory和FS),此时可将managed memory大小设为0,把尽可能多的内存分配给JVM Heap来运行用户代码。
  • RocksDB StateBackend 使用本地内存,并且默认状态下会限制不超过managed memory,否则可能导致内存超限而使得应用被RM杀掉。详见

2.6 Task和Operator Chain

详见Tasks and Operator Chains

Flink Application Execution指一个用户程序,可通过main()方法产生一个或多个Flink Job:

  1. 获取一个ExecutionEnvironment

    val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. Load/create 初始数据

    val text: DataStream[String] = env.readTextFile("file:///path/to/file")
  3. 数据转换

    val mapped = text.map { x => x.toInt }
  4. 指定计算结果输出位置,如各种Sink等

    windowCounts.print()
  5. 触发程序执行

    env.execute("Streaming WordCount")

每个Job由ExecutionEnvironment提供的方法来控制job执行(如并行度)。

提交目的地有几个,他们的集群生命周期和资源隔离保证有所不同:

  • Flink Session Cluster
    一个长期运行的Flink集群,可接受若干Flink Job运行。

    该模式下的Flink集群生命周期与Job无关,以前该模式成为Flink Session Cluster。

  • Flink Job Cluster
    只运行一个Flink Job的专用Flink Cluster。Flink On Yarn per-job就是该模式。

    该模式下的Flink集群的生命周期和Job绑定。

    需要较多时间用来申请资源和启动Flink Cluster,所以适合长期运行的Flink Job。

  • Flink Application Cluster
    只运行一个Flink Application上提交的Job的专用Flink Cluster。Zeppelin per-job使用该模式。

    该模式下的Flink集群的生命周期和Flink Application绑定。

    该模式中,Flink应用的main()方法在集群中运行,而不是在Client中。

    提交job时不需要先启动Flink Session Cluster再提交job到该集群,而是将应用逻辑和依赖打到一个可执行的jar中,由ApplicationClusterEntryPoint负责调用该jar的main方法来提取JobGraph执行。

指LogicGraph的运行时表示,通过在Flink Application中调用execute方法来创建和提交。

2.8 执行计划

2.8.1 逻辑执行计划 - LogicalGraph(源码中的JobGraph)

Logical Graph 是一个有向图,用来描述streaming程序的高层次逻辑。

图里的节点是Operator,边代表算子和敌营数据流(或数据集)的 input/output 关系。

逻辑计划也通常被称为dataflow graphs(数据流图)

2.8.2 物理执行计划 - ExecutionGraphw / PhysicalGraph

PhysicalGraph是将LogicalGraph翻译成在分布式运行时中运行的执行图的结果。

图里的节点是Task,边代表数据流(或数据集)之间的 input/output 关系或partition。

2.9 Partition

  • 注意这个Partition不同于Kafka的Partition,这是Flink Partition概念!
    Partition是整个数据流(或数据集)的独立子集,数据流中的每条record会被发到一个或多个partition。

  • Partition的消费者是Task。

  • 如果一个算子改变了数据流的partition划分方式,则称为repartitioning

2.10.1 Pipeline与Slot

2.10.1.1 基础概念

前面提到过,Flink资源调度单位为Slot,每个TaskManager有一个或多个Slot,每个Slot可以运行多个不同JobVertex的并行Task实例组成的pipeline。一个pipeline由多个连续task组成,比如并行度为 n 的 MapFunction 和 并行度为 n 的 ReduceFunction

Pipeline内部各算子实例之间通过流水线交互数据,效率很高。

比如有一个job包含DataSource(并行度4)、MapFunction(并行度4)、ReduceFunction(并行度3),此时一个pipeline就由Source - Map - Reduce序列组成。

如果当前有两个TM,每个TM包含3个Slot,则程序运行状况如下:

**上图中同一个颜色的就是一个pipeline!**在一个Slot中运行,多个pipeline之间是并发运行。

Flink内部使用SlotSharingGroupCoLocationGroup来定义哪些task可以共享一个 Slot, 哪些task必须严格放到同一个slot。

切记,只有不同JobVertex的实例才能放到一个Slot进行Share。

2.10.1.2 好处

  • Pipeline中的算子实例之间数据交换在内存中进行,比通过网络交换数据效率高很多
  • 同类型算子均匀分布到各个Slot,避免较重的算子全部挤到一个Slot执行

2.10.2 JobManager数据结构

2.10.2.1 概述

job运行期间,JM跟踪分布式task,以决定何时调度下一个task,以及处理运行完成的task和执行失败。

2.10.2.2 Transformation

我们先提一下一个重要的类Transformation

  • 他是一个抽象类,表示创建了一个DataStream的特定算子。每个DataStream都对应有一个底层的Transformation,表示该Stream的发起者。

  • 多个算子API会创建一棵Transformation树,当Stream程序提交运行的时候,这个树形结构图会被StreamGraphGenerator翻译为StreamGraph

  • 在运行时,一个Transformation不一定对应一个物理上的算子,因为某些算子只是逻辑上的,比如union/split/select数据流、partition等。

    比如有以下一个Transformation图:

    会在运行时转换为如下物理算子图:

    而分区信息、union、split/select等信息已经被编码到了Source到Map算子之间的边里。这些信息会在提交到集群JM后,被转为ExecutionGraph的ResultPartition和InputGate时使用。

2.10.2.3 DataStream Job提交流程

详见Flink-作业提交流程

2.10.2.2 JobGraph

JM接收参数为JobGraph,他表示数据流,由作为顶点(JobVertex)的算子和中间结果IntermediateDataSet构成。每个算子都有属性,如并行度、运行的代码、依赖的类库等。

2.10.2.3 ExecutionGraph

JM将JobGraph转换为ExecutionGraph

  • ExecutionJobVertex
    ExecutionGraph是一个并行版本的JobGraph,对于JobGraph的每个JobVertex来说,对应着多个表示每个并行子任务实例的ExecutionVertex。比如并行度100的某个算子,则JobGraph中有1个表示该算子的JobVertex,ExecutionGraph中有100个表示该算子的ExecutionVertex。

    ExecutionVertex的作用是跟踪特定子任务的执行状态。

  • ExecutionJobVertex
    而所有对应一个JobVertex的ExecutionVertex被封装在一个ExecutionJobVertex中,用来追踪该算子整体状态。

  • IntermediateResult
    追踪JobGraph.IntermediateDataSet状态

  • IntermediateResultPartition
    追踪每个partition的IntermediateDataSet状态

2.10.2.4 JobGraph和ExecutionGraph


每个ExecutionGraph都有一个与之相关的job状态信息,用来表示当前的job执行状态。

2.10.2.5 Job状态机


  • Job初始为Created
  • Job开始运行后转为Running
  • Job开始正常结束后转为Finished
  • Job出错时,先转为Failing以撤销所有运行中Task。
  • 当所有JobVertex转为final状态且配置为不可重启,则Job转为Failed状态
  • 如果Job重启,则转为Restarting,准备好后转为Created
  • 用户撤销job时,变为Cancelling,会撤销所有运行中Task。当所有Task转为final状态后,转为Cancelled
  • CancelledFailingFinished状态会导致全局的终止状态,触发Job清理
  • Suspended与上面三个状态不同,只会触发本地终止
    只会在配置了JM HA时发生。意味着该Job只是在当前JM结束,但可由新拉起的JM使用持久化HA存储来找到该Job进行恢复和重启。总之,该状态的Job不会被完全清理!

2.10.2.6 并行Task实例状态机


注意:由于一个TasK可能会被执行多次(比如在异常恢复后),所以ExecutionVertex的执行是由Execution来跟踪的,每个 ExecutionVertex 有Current ExecutionPrior Execution

2.10.3 ScheduleMode

Flink Job调度模型相关枚举类为ScheduleMode,具体调度策略类为SchedulingStrategy,决定执行图中的任务怎么开始。

  • EAGER
    立刻调度所有task。

    主要用于无界流数据。

  • LAZY_FROM_SOURCES
    从Source开始,按拓扑顺序,一旦上游所有输入数据就绪,则下游task开始。即前驱任务全部执行完成后,才开始调度后续任务,后续任务会读取上游缓存的输出数据来进行计算。

    适用于批处理作业。

  • LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST
    LAZY_FROM_SOURCES类似,不同之处在于本选项会使用batch slot请求,这种模式可支持使用比请求的request更少的slot来执行job。

    用户需要保证job不包含任何pipelined shuffle

  • (正在开发中的)Pipelined region based
    以PipelinedRegion作为单位进行调度

2.11 资源架构

2.11.1 概述


为准备好运行的Task分配Slot资源申请核心为运行在JobManager的SlotProvider,有两种分配模式:

  • 立刻分配
    立刻满足。可调用CompletableFuture#getNow(Object)来获取已分配的slot
  • 排队分配
    将申请请求排队,并返回一个future,当有一个slot可用时变得可用

2.11.2 资源管理组件

  • Flink ResourceManager
    负责资源分配和回收,内部有SlotManager负责管理TaskManagerSlot(最小资源分配和调度单位)。

    Flink在不同运行环境(Yarn、Mesos、K8S等)中实现了不同ResourceManager,如YarnResourceManager

    主要方法:

    • registerJobManager(JobMasterId, ResourceID, String, JobID, Time)
      向ResourceManager注册一个JobMaster
    • requestSlot(JobMasterId, SlotRequest, Time)
      向ResourceManager申请一个Slot
  • SlotManager
    属于ResourceManager,具体管理集群中已注册到JM的所有TaskManagerSlot的信息和状态,他们由TaskExecutor启动后注册到RM时提供。当JM为Task申请资源时,SlotManager就会从当前空闲的Slot 中按一定的匹配规则选择一个空闲的 Slot 分配给Job使用。

    内部通过TaskExecutor到Flink ResourceManager的定时心跳(包括了该TM的所有Slot状态信息)来更新Slot状态

  • SlotPool
    每个SlotPool实例属于某个Job的对应的JobMaster实例

    • 作用是服务和缓存由ExecutionGraph向本JM提起的Slot请求,当没有足够Slot时就向Flink ResourceManager申请新的Slot。
    • 一旦申请到Slot,SlotPool也会保存他们,这样即使ResourceManager挂了,依然可以分配这里已空闲的Slot。
    • 申请到Slot后,SlotPool从缓存的Slot请求中选择对应请求进行分配和Job下发执行。
    • 当Slot无人使用时,会释放。
    • 所有Slot分配由一个自增的AllocationID标记

2.11.3 分配流程


提交任务详细流程可见Flink-作业提交流程

  1. TM启动后会连接、注册到RM,成功后会将本TM携带的所有Slot信息传递到RM

  2. Flink RM收到TM的注册消息后,会有一个动态代理AkkaInvocationHandler触发ResourceManager#sendSlotReport方法将该TM注册到SlotManager,此后该TM上的Slot资源可由JM进行分配。

    实际上就是注册到SlotManagerImpl,会将这些新注册的Slot先尝试从SlotManagerImpl.pendingSlots中匹配已有的Slot请求;否则就将这个状态为TaskManagerSlot.State.FREE的Slot放入freeSlots中保存,表示为可分配的空闲Slot资源。

    如果TM连接断开,则会移除该TM注册的Slot。

    到这里,TM已经成功连接并注册到RM,但需要注意的是,TM与JM的连接和注册要等到RM向TM提起Slot资源分配请求阶段。

  3. 每个Job会对应启动一个JM,他会连接RM,然后调度本Job,向SlotPool申请Slot。
    当然最开始因为还没连接到RM所以无法分配,需要放入pending队列等待分配

  4. RM接收到JM的SlotRequest请求后,SlotManagerImpl尝试为该请求分配Slot资源
    如果findMatchingSlot(ResourceProfile)有可用TaskManagerSlot,就先从freeSlots中移除要分配的Slot,然后使用SlotManagerImpl#allocateSlot进行分配,此时就会利用AkkaInvocationHandler动态代理发送RPC 调用TM#requestSlot

  5. TM接收到requestSlot后,查询taskSlotTable,若目标Slot空闲则调用allocateSlot开始分配Slot,会连接、注册到该申请Slot的JM,最后通过RPC offerSlots将Slot分配信息告知JM

  6. JM接收到分配成功的Slot信息,此后Scheduler组件开始deploy所有ExecutionVertex,最后将Task部署描述信息TaskDeploymentDescriptor通过RPC发送给TaskExecutor,提交任务
    注意,一旦JM申请到Slot,其SlotPool也会保存他们,这样即使ResourceManager挂了,依然可以分配这里已空闲的Slot;当Slot无人使用时,会被自动释放。

  7. TM接收到JM发送的Task部署信息后,会初始化TM各种服务,利用收到的部署信息组装Task,利用ExecutionGraph创建运行时图(为每个下游inputChannel(ExecutionEdge)生成一个ResultSubPartition实例PipelinedSubpartition、为每个输入ExecutionEdge生成一个InputChannel实例,每个InputChannel都只从上游单个ResultSubPartition消费)

  8. Task组装任务完成后,从taskSlotTable匹配分配的Slot和本TM拥有的Slot,匹配上后启动executingThread线程来执行核心工作
    包括从PermanentBlobService下载相关lib jar、创建拥有这些类的引用的ClassLoader、为该Subtask的每个ResultPartition实例分别创建一个BufferPool实例(由该SubTask的所有ResultSubPartition共享,属于Floating Buffers)、初始化用户代码并创建RuntimeEnvironment(提供给执行代码访问,内容如Task的name、并行度、Configuration、数据流Reader和Writer,以及TM提供的一系列组件如MemoryManager、IO Manager等,最后执行用户定义的StreamTask

分配Slot核心源码可见SchedulerImpl#internalAllocateSlot

// slotSharingGroupId不为空就申请`SharedSlot`,否则申请SingleSlot
CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
    allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :
    allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);

allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
    if (failure != null) {
        cancelSlotRequest(
            slotRequestId,
            scheduledUnit.getSlotSharingGroupId(),
            failure);
        allocationResultFuture.completeExceptionally(failure);
    } else {
        allocationResultFuture.complete(slot);
    }
});
  1. 执行完成后,无论成功或失败,TM都将Slot标记为已占用但未执行任务的状态
  2. JM的SlotPool 缓存该Slot,但不立刻释放,以便如果任务异常需要重启时需要重启Slot的问题,可以通过延时释放Slot实现尽快调度FailOver task。
  3. 超过一定时间如果Slot仍未被使用,则SlotPool通知TM释放Slot
  4. TM得到通知后,通知RM Slot释放

2.11.4 YarnResourceManager

这是一种Flink ResourceManager实现,具体如下:

跟上面图的区别是,TaskExecutor会在JM向Flink RM申请资源时发现SlotManager上的Slot不够,Flink RM才会向Yarn RM申请Container资源来启动TE来注册到RM,随后就是RM向TE请求Slot,最后TE Offer Slot给JM。

2.11.5 心跳

TM与RM和JM之间会定时发送心跳同步Slot状态,以保证分布式系统的一致性恢复。当某组件长时间未收到其他组件的心跳信息时,就会认为对方失效并进入FailOver流程。

2.12 作业模式

Flink On Yarn分为:

  • Per-Job

  • Yarn-Session

  • Application
    以上两种模式的共同问题是需要在Client执行用户代码以编译生成 Job Graph 才提交到集群运行,过程中需要下载相关 Jar 包、上传集群,客户端和网络负载压力容易成为瓶颈,尤其当多个用户共用一个客户端时。

    1.11.0 中引入了 Application 模式(FLIP-85)来解决上述问题,按照 Application 粒度来启动一个集群,属于这个 Application 的所有 Job 在这个集群中运行。核心是 Job Graph 的生成以及作业的提交不再在客户端执行,而是转移到 JM 端执行,这样网络下载上传的负载也会分散到集群中,不再有上述 Client 单点上的瓶颈。

    用户可以通过bin/flink run-application来使用 Application 模式,目前 Yarn 和 Kubernetes(K8s)都已经支持这种模式。Yarn application 会在客户端将运行作业需要的依赖都通过Yarn Local Resource传递到 JM。K8s Application 允许用户构建包含用户 Jar 与依赖的镜像,同时会根据作业自动创建 TM,并在结束后销毁整个集群,相比 Session 模式具有更好的 隔离性。K8s 不再有严格意义上的 Per-Job 模式,Application 模式相当于 Per-Job 在集群进行提交作业的实现。

3 Flink设计理念

3.1 处理无界和有界的数据

任何类型的数据都是作为事件流产生的,如信用卡交易、传感器数据、机器日志以及社交网站或者手机上的用户交互信息。数据可以分为无界和有界的。

3.1.1 无界数据

无界数据有开端但没有定义结束。也就是说无界数据会源源不断的被生产,必须持续进行处理。处理无界数据通常要求以特定顺序(例如事件发生的顺序)摄取事件,以便能够推断出结果的完整性。

Flink通过时间和状态的精准控制能够在无界流上运行任何类型的应用程序。

3.1.2 有界数据

有界数据有明确的开始和结束。可以在处理计算有界数据前先摄取到所有数据。与无界数据不同,处理有界流不需要有序地摄取,因为可以始终对有界数据集进行排序。 有界流的处理也称为批处理。

Flink通过算法和数据结构来对有界流进行内部处理,这些算法和数据结构专门针对固定大小的数据集而设计,从而产生出色的性能。

3.1.3 同时处理有界和无界数据

3.2 Dataflow

3.2.1 概述

Flink中应用程序由用户自定义算子转换而来的流式 Dataflow 组成,这些Dataflow组成有向无环图DAG,包含若干Source和若干。

上图展示了一段Flink DataStream 程序转换为DataFlow的情况,图下方黄色圆内为一个算子。

大多数时候程序中的transformation和算子一一对应,但也有例外,比如上图,一个transformation包含了三个算子。

3.2.2 Dataflow并行度

Flink程序天生就是并行化和分布式的。在执行时,DataStream有一个或更多的StreamPartition,而且每个算子也有一个或多个Subtask(由不同线程甚至进程、机器节点执行)。

算子subtask个数就是该算子的并行度(operator.setParallelism(N)),而一个Job的不同算子可能有不同并行度。

以下是实例程序的逻辑视图和并行视图:

3.3 Flink部署方式灵活

和Spark一样,Flink既能以StandAlone方式部署,又可以跑在YARN Mesos Kubernetes等资源调度器上。

Flink在这些RM上运行时,关于提交和控制一个Application的所有通讯手段都是REST请求。

Flink可以根据应用配置的并行性来识别出所需的资源并向RM申请。为了预防任务失败,Flink会在container失败时重新申请新的资源。

3.4 Flink可运行任意规模的应用

Flink被设计来可以高效地运行任意规模的有状态的流式应用。应用程序可以并行拆分为数千个任务分布在集群中,同时执行。理论上来讲应用可以用到无限的资源。而且,Flink可以轻松维护非常大的应用程序状态。 Flink异步、增量的检查点(checkpoint)算法可以确保对处理延迟的影响最小化,同时能保证精确一次性(exactly once)的状态一致性。

目前Flink应用广泛,很多公司使用它来处理巨大规模的应用,如:

  • 处理万亿事件/天的应用
  • 处理多个TB级的状态信息的应用
  • 运行在数千个核上的应用

3.5 有装态流处理

Flink内的部分算子是有状态的,意味着一个新的事件来到,被处理的逻辑需要依赖于之前累积的事件的结果。这些装态可用来简单计算没短时间内的数量,也可以用在复杂场景,比如欺诈检测特征计算。

前面已经提到过,Flink算子可在不同线程并行执行。那么有状态算子的并行实例组在存储对应状态时是按key进行划分的,每个并行实例负责处理自己的那个key分组的事件和在本地维护这些key对应的状态。


状态总是在本地访问,可使得Flink程序达到高吞吐低延迟,利用内存达到极致性能。
有状态的Flink应用程序针对本地状态访问进行了特别优化。

任务状态信息始终保存在内存中,或是当状态信息超过可用内存时异步存储在高效的、位于磁盘的数据结构中。这样设计使得任务计算通常都是在内存中进行,延迟非常低。(是不是很熟悉,很SparkStreaming相似的思想)

3.6 Exactly Once与容错性

前面说到过,Flink能保证就算出现故障时也拥有精准一次的状态一致性。

Flink的处理方式是周期性异步(异步原因是不阻塞正在进行的数据处理逻辑)获取状态并存储的检查点,来将本地状态持久化到存储,在出错时用以恢复,重放流。

状态快照的内容是

  • 捕获整个分布式pipeline的状态
  • 记录消费数据源的offset
  • 记录消费数据到达该点时的整个JobGraph中的状态

当发生故障时,就从最后一次成功存储的Checkpoint恢复状态,重置数据源,从状态中记录的消费offset开始重新消费。

3.7 端到端的Exactly Once

必须要保证每个从Source发出的event只能精准一次地影响Sink,要求:

  • Source端必须是可重放的
  • Sink必须是事务化或幂等的

4 Flink重要概念

4.1 Streams

Stream流,就是流式处理中的基本概念。虽然流数据分为各种不同特征的类型,但是Flink可以巧妙高效的进行处理:

4.1.1 有界流和无界流


Flink的设计哲学最擅长于处理无界数据,但是对于有界数据(其实就是批处理)也提供了高效的操作方式。

4.1.2 实时流和记录型流

一般来说处理这类流数据的方式有两种:

  • 数据一旦生成就立刻实时处理
  • 先将流数据持久化到存储系统(如文件系统等),稍后再处理它们。

4.1.3 Record

数据流或数据集的基本组成元素就是Record。

算子和函数将Record作为输入和输出。

4.2 State

这里

4.3 Time

4.3.1 概述

Time是流式应用状态中的一个重要概念。

大多流式数据本身就有时间语义,因为每个事件都是在特定的时间点上产生的。而且通常流式计算是以时间为基础的。在流式处理中很重要的方面是应用程序如何去测量时间即event-timeprocessing-time的差别。

在流处理中设定使用哪种时间,请参考Flink学习3-API介绍-DataStream-处理时间设定。这个设定决定了Source怎么表现(比如是否分配timestamp)以及时间窗口算子使用哪种时间作为计算标准。

4.3.2 Event Time

EventTime是每个单独的Event在其生产设备上发生的时间,比如APP向服务端上报事件时在APP内该事件的发生时间,在APP内就会被嵌入到该event中。服务端可以从该event中提取EventTime时间戳。

说白了,EventTime是事件在现实世界中发生的时间,在之前就产生,和后来到达的Flink服务端时间无关。Flink可从每条记录中提取timestamp

处理具有EventTime语义的流的应用程序,是基于事件的时间戳来计算结果。 因此,无论是处理记录型还是实时的流事件,通过EventTime处理都会得到准确和一致的结果。但由于事件往往乱序到达,所以不可能无线等待,只能等待有限时间。只要在有限等待时间内所有数据都达到Flink系统,则基于EventTime的处理就会产生正确和一致性结果,就算是乱序、事件迟到、重跑历史数据(比如一个月的Kafka历史数据,ProcessingTIme可能就只有几秒区间,这时就适合用EventTime处理)等场景。比如有1小时的EventTime时间窗口,则只要事件的时间戳落入该窗口时间,则不管顺序如何、何时被处理,都能被Flink正确处理。

当选择EventTime模式时有两个重要概念:

  • 水位WaterMark
    当使用EventTime时,程序中必须使用指定生成EvnetiTime WaterMark的方式。Flink将使用水位来推断EventTime应用中的时间,也就是说基于EventTime时必须指定如何生成WaterMark。

    此外,水位也是一种灵活的机制,可以在结果的延迟和完整性间做出权衡。

  • 延迟数据处理
    当使用水位在event-time模式下处理流时,可能发生在所有相关事件到达之前就已完成了计算,这类事件称为延迟事件。 Flink具有多种处理延迟事件的选项,如重新路由它们以及更新此前已经完成的结果。

请注意,有时当EventTime程序实时处理在线数据时,它们将使用一些ProcessingTime操作,以确保其及时进行。

4.3.3 Ingestion Time

数据到达Flink系统时间。

具体来说,是在Source算子处将每条记录都是用当前时间作为时间戳,后续时间算子就用该时间戳。

对比Processing Time,Ingestion Time代价较昂贵,但结果更可预测。因为Ingestion Time统一在source处理时一次性分配时间戳,因此对事件的时间窗口操作不会再有时间分配,所以不会受到如处理算子所在机器时间不同步或算子传输时网络延时造成计算结果不准确的问题;而Processing Time可能会有延时导致不准确的情况。

对比Event Time,Ingestion Time不能处理乱序事件和迟到的情况,但优势是可以不指定水印。

注意,在Flink内部实现中其实是将Ingestion Time当做类似Event Time来处理,但时间戳的分配和水印生成是Flink系统自动处理的,对用户透明。

4.3.4 Processing Time

  • 概念
    Processing Time指的是数据处理算子所在的机器的系统时钟时间。具体来说,流计算使用Processing Time时,所有基于时间的算子(如time windows)会使用具体算子所在的机器的系统时钟时间。

  • 整点时间窗口区间
    小时级的Processing Time Window将包括系统时钟指示整点小时之间到达特定算子的所有记录。例如,如果一个应用程序在9:15 am开始运行,则第一个每小时处理时间窗口将包括在9:15 am和10:00 am之间处理的事件,下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。

  • 优点
    Processing Time是Flink默认采用的时间,不需要流和机器之间的协调,编码实现最为简单,它提供了最佳的性能和最低的延迟。

  • 缺点
    但也必须容忍不确定性(比如数据乱序、迟到、各节点时间不同步等)以及并不精确、近似的结果。

  • 适用场景
    Processing Time语义适用于具有严格的低延迟要求,但能容忍一定的不准确结果的应用。

4.4 窗口

4.4.1 概述

大体来说 ,Flink的窗口分为两类:

  • Time Window
    以某一指定的时间长度来衡量窗口大小
  • Count Window
    以某一指定的Event数量来衡量窗口大小

    Flink的窗口又能细分为:
  • 翻滚窗口(Tumbling Window,无重叠)
  • 滑动窗口(Sliding Window,有重叠)
  • 会话窗口(Session Window,活动间隙)

而滑动窗口与滚动窗口的最大区别就是滑动窗口有重复的计算部分。

4.4.2 Tumbling Window-滚动窗口

4.4.2.1 概述

  • 滚动窗口分配器将每个元素分配给固定大小的窗口
  • 每个窗口之间无重叠部分
  • 窗口大小可按Time或Event Count衡量
  • 如果滚动窗口的大小指定为5分钟,则将每5分钟启动一个新窗口,如上图所示

4.4.2.2 源码

Flink-时间窗口源码分析

4.4.2.3 实例

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    // 中国,就需要指定-8才是utc-0
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

4.4.3 Sliding Window-滑动窗口

4.4.3.1 概述

  • 滑动窗口分配器将所有元素分配给固定大小的窗口
  • 滑动窗口有两个参数:
    1. 窗口大小
    2. 滑动间隔(步长)
  • 有重叠
    因此,如果滑动间隔小于窗口大小,那么滑动窗口会有重叠部分。此时,元素会被分配到多个窗口。
  • 窗口大小可按Time或Event Count衡量
  • 适合求最近时间的统计,比如BI常用这种模式窗口

例如,滑动窗口两个参数为(10分钟,5分钟)。这样,每5分钟会生成(滑动)一个窗口,包含生成时往前推10分钟内到达的事件,每次有5分钟时间内的数据重叠,如下图所示。

4.4.3.2 实例

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    // 中国,调整时区,-8
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

4.4.4 Session Window-会话窗口

4.4.4.1 概述

  • Session Window分配器通过activity session来对事件进行分组。
  • 无固定的时间窗口大小对齐
  • 起点和终点不固定
    没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭,例如,不活动的间隙时。
  • 无重叠
    与滚动窗口和滑动窗口相比,会话窗口不会重叠
  • Gap可配
    会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
  • Gap可配静态固定的或动态提取的
    用来定义gap周期长度。当gap过期后,当前session关闭,后来的事件分配到下一个SessionWindow.
  • 实现原理
    SessionWindow算子为每个到来的事件都创建一个新的Window,只要符合规则就merge到一个窗口。一个SessionWindow算子为了被merge,需要一个Merge TriggerWindow Function(如ReduceFunction, AggregateFunction, ProcessWindowFunction ,而FoldFunction 不能被合并.)
  • 适合线上用户行为分析
    如分析某个登录用户一段时间内的行为统计

4.4.4.2 实例

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    // 可用Time.milliseconds(x), Time.seconds(x), Time.minutes(x)等
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

4.4.5 Global Window-全局窗口

4.4.5.1 概述

使用全局窗口时,会将所有拥有相同key的元素分配到相同的单个全局窗口中。

仅当您还指定自定义触发器时,使用全局窗口才有意义;否则,将不会执行任何计算,因为全局窗口没有可以处理聚合元素的自然尾端。

4.4.5.2 实例

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

4.5 Watermarker(水印/水位)和EventTime

请点击Flink-水位

4.6 触发器

触发器决定在窗口的什么时间点上启动用户定义的数据处理任务。

触发器意义是解决水位迟到、早到引起的问题。

4.7 Flink数据类型

4.8 转换

根据Source是否参与Checkpoint快照机制,不同Source时更新用户定义的State有不同语义:

而若需要在消费端exactly-once基础上进一步实现端到端的exactly-once,那就需要Sink端也参与到Checkpoint,在已保证Source 状态exactly-once语义的前提下,不同Sink的交付保证如下:

  • 比如HDFS Sink就是两阶段提交,开始Checkpoint快照就将当前写入的文件变为PENDING,待通知Checkpoint成功后就变为FINISHED对读可见,这样就能实现精确一次的语义了。

5 状态、Checkpoint、Savepoint

可参考:

5.1 状态

5.1.1 概述

可参考

  • Flink State 误用之痛,竟然 90% 以上的 Flink 开发都不懂
    从性能和 TTL 两个维度来描述ValueState 中存 Map 与 MapState 有什么区别?

    如果不懂这两者的区别,而且使用 ValueState 中存大对象,生产环境很可能会出现以下问题:

    • CPU 被打满
    • 吞吐上不去
  • Flink 源码:从 KeyGroup 到 Rescale
    阅读本文你能 get 到以下点:

    • KeyGroup、KeyGroupRange 介绍
    • maxParallelism 介绍及采坑记
    • 数据如何映射到每个 subtask 上?
    • 任务改并发时,KeyGroup rescale 的过程

有状态和无状态:

  • 每个有价值的流式应用一般都是有状态的。Flink中的状态是指中间状态,如算子的状态。该状态托管到Flink系统内部
    有状态算子需要跨多个事件记录信息用于后续计算,比如Window相关算子
  • 对应无状态的就比如函数,输入到输出无状态。

可举例使用状态场景如下:

Flink状态核心概念如下:

  • ManagedState和RawState

  • Operator State
    算子状态,一般存于内存

  • Keyed State
    KeyedState保存在内嵌的类KeyValue存储内,它是一种由Flink管理的分片式key value存储。

  • State Backend
    决定状态怎样去存、存在哪,可配置。

有状态计算:

  • Spark的做法是,在调用算子之前就提取状态,然后和数据一起调用无状态算子,最大的好处就是使用无状态算子就能实现有状态计算。
  • Flink的做法是,某些需要状态的算子,可将状态保存在StateBackend,数据调用算子时可以使用算子含有的状态,对计算结果造成影响。

运行基本业务逻辑的任何应用程序都需要记下事件或中间结果,以便在以后的时间点访问它们:例如在收到下一个事件时或在特定持续时间之后。

应用程序的State(状态)是很重要的一个概念,Flink中有很多feature来处理状态。

  • 多种多样的状态元语
    Flink有多种数据结构来提供状态原语,例如原子值、列表或映射,我们可以根据这个function访问方式来选择合适的状态元语类型。

  • 可插拔的StateBackend
    应用的状态是由一个可插拔的StateBackend服务管理和设置检查点的,我们可以选择用内存或RocksDB(一个高效的嵌入式磁盘数据存储)甚至是自定义的状态后端插件来存储应用状态。

  • 精准一次ExactlyOnce的状态一致性
    Flink的检查点和恢复算法可以保证在失败时应用状态的一致性。所以,这些失败对应用来说是透明的,不会影响正确性。

  • 巨大的状态信息维护
    Flink通过异步和增量检查点算法可维护TB级别的应用状态。

  • 可扩展的应用
    Flink通过对应用弹性分配worker数量来实现应用可扩展。

  • 维护跨并行Task实例的状态

  • Managed State
    Managed State 描述了已在Flink注册的应用程序的托管状态。

    Apache Flink 会负责Managed State的持久化和重伸缩等工作。

  • KeyedState是一种由Flink管理的分片式key value存储,详见5.1.3

    • ValueState
      就是一个Key对应的一个State,可向其包装的任何变量,以此添加容错功能。

      ValueState是一个包装类型,有三个重要方法:

      • update
        设置状态
      • value
        获取当前值。在初始或clear后该方法返回null
      • clear
        删除内容
    • ListState
      就是一个Key对应的一个State List,是状态多值ListState。

      可参考org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkListState<byte[]> bucketStatesListState<Long> maxPartCountersState

    • MapState
      一个Key对应的一个Map State

  • OperatorState

5.1.3 Keyed State

5.1.3.1 概念

KeyedState保存在内嵌的类KeyValue存储内,它是一种由Flink管理的分片式key value存储,有两个特点:

  • 只能应用于 KeyedStream 的函数与操作中,例如 Keyed UDF, window state
    Flink严格地将KeyedState与有KeyedState的运算符读取的流一起进行分区和分发。因此,只有KeyedStream才能访问KeyValue的Keyed State,并且仅限于与当前事件的键关联的值,即在整个程序中没有keyBy的过程就没有办法使用 KeyedStream。
  • keyed state 是已经分区/划分好的,每一个 key 只能属于某一个 keyed state
    每个 Key 对应一个 State,即一个 Operator 实例处理多个 Key,访问相应的多个 State,并由此就衍生了 Keyed State。

对齐流键和状态键可确保所有状态更新都是本地操作,从而确保了一致性而没有事务开销。 这种对齐方式还允许Flink重新分配状态并透明地调整流分区。

  • 每个Key映射到一个有状态算子实例保存
  • 每个Operator一般有多个并行实例,同key数据由同一个Operator实例处理
  • 一个KeyedState只会对应一个Operator实例,由该实例所在TM负责保存,但一个Operator实例会有多个Keyed state(因为一个Operator实例有多个Key,每个Key一个Keyed State)
  • KeyedState又被组织为KeyGroup,是Flink重分布KeyedState的原子单元。
    KeyGroup的个数和Job的task最大并行度相同。每个KeyedOperator实例会处理一个或多个KeyGroup。

5.1.3.2 分类


  • ValueState
    存Key对应的状态单值。

    可以通过update方法更新状态值,通过value()方法获取状态值。

    如 WordCount可用 Word 当 Key,Count存为该Word对应的State。

  • MapState
    类似Java中的Map,由键值对状态组成。

    需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。

  • ListState
    类似Java中的List,一个Key含有多个状态值。

    可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

  • ReducingState
    状态为单值,存储用户传入的reduceFunction的reduce计算结果状态。

    每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

  • AggregatingState
    AggregatingStateReducingState的区别是:ReducingState 中 add(T)T get()的泛型元素为同类型,但在 AggregatingState 输入的 IN,输出的是 OUT

5.1.3.3 并行度改变时

Keyed State 随 Key 在实例间迁移Redistribute。

5.1.3.4 API使用

5.1.3.4.1 例子1

  • keyBy创建 keyed stream 对 key 进行划分,这是使用 keyed state 的基本前提!
  • sum 方法会调用内置的 StreamGroupedReduce 实现

5.1.3.4.2 官方反欺诈例子
  1. main方法:

    // 1. get streaming runtime
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. create a source from TransactionSource
    // which creates infinite stream of credit card transactions.
    // Each transaction contains an account ID (accountId),
    // timestamp (timestamp) of when the transaction occurred, and US$ amount (amount).
    val transactions: DataStream[Transaction] = env
    .addSource(new TransactionSource)
    // The name attached to the source is just for debugging purpose
    .name("transactions")

    // 3.define partitioning events
    val alerts: DataStream[Alert] = transactions
    // 按accountId划分task,相同accountId的会被分发到同一个task处理
    .keyBy(transaction => transaction.getAccountId)
    // process方法接收一个KeyedProcessFunction,应用到流中每个已分组的元素上
    // KeyedProcessFunction有三个泛型,key类型,输入元素的类型,输出元素的类型.
    .process(new FraudDetectorV2)
    .name("fraud-detector")

    // 4.define sink to output result to external system such as Apache Kafka
    alerts
    // AlertSink logs each Alert record with log level INFO for debug purpose.
    .addSink(new AlertSink)
    .name("send-alerts")

  1. KeyedState只能用于RichFunction

    // KeyedProcessFunction继承自AbstractRichFunction
    class FraudDetectorV2 extends KeyedProcessFunction[Long, Transaction, Alert]
  2. 将State声明为实例级别变量

    @transient private var flagState: ValueState[java.lang.Boolean] = _
  3. 在实现自RichFunctionopen方法中创建State描述符,并为State赋值

    /**
     * 应该在方法开始处理数据之前注册State
     * 所以在open()方法内使用ValueStateDescriptor注册state
     * ValueStateDescriptor包含了告诉flink如何管理变量的元数据
     *
     * open方法由TaskManager的StreamTask在invoke方法的beforeInvoke内调用,
     * 即在调用task主逻辑之前调用open方法一次
     */
     @throws[Exception]
     override def open(parameters: Configuration): Unit = {
       // 1. 创建State描述符 StateDescriptor
       // state名为flag;
       // state类型为BOOLEAN
       val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
    
       // 2. 创建用ValueStateDescriptor定义的state
       // getState(StateDescriptor) 方法初次调用时就是注册;restore时就是获取checkpoint里面状态last值
       flagState = getRuntimeContext.getState(flagDescriptor)
    
       // 创建timerState
       val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
       timerState = getRuntimeContext.getState(timerDescriptor)
     }
  4. 读写State

    // Get the current state for the current key
    val lastTransactionWasSmall = flagState.value
    
    // set the flag to true
    flagState.update(true)
    
    // clean up state
    flagState.clear()

5.1.4 Operator State

5.1.4.1 概述

  • Operator State又称为non-keyed state,每一个 operator state 都仅与一个 operator 的实例绑定。
  • 常见的 operator state 是source state,例如记录当前 KafkaSource consumer 的 offset

Operator State 可用于所有算子,常用于 Source,例如 FlinkKafkaConsumer.

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction

5.1.4.2 分类

  • ListState
  • UnionList State
  • BroadState State

5.1.4.3 并行度改变时

  • ListState

    均匀分配,即将原来的State中的元素按照round-robin的模式均匀分配给新的Task即可
  • UnionList State

    先将所有 State 合并为全量 State ,再分发给每个task实例。由用户来决定恢复时怎么做,比如特定某个并发做特定某个事情。
  • BroadState State

    此时本来每个Task实例的State就相同,新的每个task也拿一份即可。

5.1.4.4 API使用

需要自己实现 CheckpointedFunctionListCheckpointed 接口.

5.1.4.4.1 例1


fromElements会调用FromElementsFunction的类,其中就使用了类型为 list state 的 operator state

5.1.4.4.1 例2
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction {

    @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();

        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,
            createStateSerializer(getRuntimeContext().getExecutionConfig())));

        if (context.isRestored()) {
            // 恢复时,从ListState中读出所有分区Offset信息放入Map准备恢复
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // populate actual holder for restored state
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
        } else {
            LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    @Override
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            // 先将旧状态清理
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                // 获取当前Consumer offset
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                // 将当前offset放入状态快照
                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

    // 这一轮checkpoint完成时
    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }

        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }

        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
            // only one commit operation must be in progress
            if (LOG.isDebugEnabled()) {
                LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
                    getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
            }

            try {
                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
                        getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
                    return;
                }

                // 获取并移除本次已完成的checkpoint
                @SuppressWarnings("unchecked")
                Map<KafkaTopicPartition, Long> offsets =
                    (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);

                // remove older checkpoints in map
                for (int i = 0; i < posInMap; i++) {
                    pendingOffsetsToCommit.remove(0);
                }

                if (offsets == null || offsets.size() == 0) {
                    LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
                    return;
                }
                // 将这次状态快照记录的offset提交到kafka
                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
            } catch (Exception e) {
                if (running) {
                    throw e;
                }
                // else ignore exception if we are no longer running
            }
        }
    }
}

5.1.5 State持久化

5.1.5.1概述

Flink通过联合使用StreamReplayCheckpoint来实现容错。Checkpoint需要使用持久化存储来保存状态:

  • 可回放重复消费记录的数据源
    比如消息队列(如Kafka)、文件系统(如HDFS)
  • 可存放状态的持久化存储
    HDFS等分布式文件系统

其中每次Checkpoint就是标记每个输入流中的特定点以及每个运算符的对应状态。通过恢复算子状态以及从Checkpoint开始重放数据记录,可以恢复数据流以及同时保持一致性(ExactlyOnce处理语义)。

Flink容错机制的具体做法就是不断给分布式数据流做Checkpoint快照,存放到可配的地方(通常是分布式文件系统中)。当由于如机器、网络、软件等错误导致程序出错时,就停止该数据流,并从最近一次成功的Checkpoint恢复和重启算子,输入流也被重置到该快照点。而在恢复过程中被重复消费的数据会被保证不造成影响。

而Checkpoint的间隔时间,就是一种在执行中用于Checkpoint的开销和恢复时间的权衡方法:

  • 间隔越长,恢复时的记录数越多,恢复耗时更长
  • 间隔越短,一定时间内用于Checkpoint的时间百分比越长,恢复耗时更短

5.1.5.2 KeyedState和OperatorState Checkpoint区别

状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:

  • 将算子子任务本地内存数据在Checkpoint时将状态快照后持久化到状态后端StateBackend;
  • 初始化或重启应用时,以一定的逻辑从StateBackend中读出对应状态数据,并转为算子子任务的本地内存数据。

注意:

  • Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用,也就是说只需要注册和使用这类状态,不需要管Checkpoint快照和恢复,Flink已经帮你做了。

  • 对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。

    但Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink应用后,某个数据流元素不一定会和上次一样,还能流入该算子子任务上。因此,我们需要根据自己的业务场景来设计snapshot和restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。

    public interface CheckpointedFunction {

    // Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    // 初始化时会调用这个方法,向本地状态中填充数据
    void initializeState(FunctionInitializationContext context) throws Exception;

    }

5.2 Checkpoint

可参考:

5.2.1 概述

5.2.1.1 Chandy Lamport


Chandy Lamport算法,可以想象就是对水管吹气,把里面的水全部吹出来,此时就是管道水排空时系统状态。

对应到Flink,就是CheckpointCoordinator定时对Source发送Checkpoint Barr

5.2.1.2 怎么保证精确一次容错?

  • 简单场景

    每条数据处理后就记录下该数据位置对应当时的状态。如果发生fail,就从最后一次成功的状态快照进行恢复,从而保证精确一次。
  • 分布式场景

5.2.1.3 到底是什么的状态?

由于Flink中每个函数和算子都可以是有状态的,有状态函数可以在处理每个事件过程中存储数据,这使状态成为任何类型更复杂操作的关键构成。

Flink容错性的核心部分就是为分布式数据流和算子状态制作Checkpoint一致性快照,可以在出错时从Checkpoint中恢复状态和执行位置等。

相关详细内容还可参考

5.2.1.4 Checkpoint持久化了什么?

Checkpoint需要使用持久化存储来保存快照状态:

  • 可回放重复消费记录的数据源
    比如消息队列(如Kafka)、文件系统(如HDFS)
  • 可存放状态的持久化存储
    HDFS等分布式文件系统

状态快照是指Flink Job的全局一致性镜像,一个快照包括:

  • 一个指向每个DataSource的指针(如Kafka的offset)
  • 每个有状态算子的状态副本,该副本是处理了 sources offset 之前所有的事件后而生成的状态。

5.2.2 Checkpoint异步性

Checkpoint为异步执行的,Checkpoint屏障不会在lock步骤中传播,并且算子可以异步地为其状态制作快照。

Flink 的 StateBackend 利用写时复制(copy-on-write)机制允许当异步生成旧版本的状态快照时,能够不受影响地继续流处理。

只有当快照被持久保存到StateBackend后,这些旧版本的状态才会被当做垃圾回收。

5.2.3 Checkpoint Aligned和Unaligned

Flink1.11以前只支持对齐的Checkpoint,从1.11开始也可启用不对齐的Checkpoint了。

  • 对齐的Checkpoint
    每个算子需要等到已接收所有上游发送的 Barrier 对齐后才可以进行 本算子状态的Snapshot ,完成后继续向后发送 Barrier。这样,在出现反压的情况下,Barrier 从上游算子传送到下游算子可能需要很长的时间,从而导致 Checkpoint 超时的问题。

  • 非对齐的Checkpoint
    针对这一问题,Flink 1.11 新增了 Unaligned Checkpoint机制,开启后一旦收到第一个上游Barrier就可以开始执行Checkpoint,并把上下游之间正在传输的数据也作为状态保存到快照中,这样 Checkpoint 的完成时间大大缩短,不再依赖于算子的处理能力,解决了反压场景下 Checkpoint 可能超时的问题。

    可以通过 env.getCheckpointConfig().enableUnalignedCheckpoints();开启unaligned Checkpoint

5.2.4 Checkpoint配置

val env = StreamExecutionEnvironment.getExecutionEnvironment()
  • 开启Checkpoint
    Checkpoint默认关闭。需要用env.enableCheckpointing(n毫秒)来开启,这里的n指开启两次checkpoint之间的间隔毫秒数时间。

    env.enableCheckpointing(1000)
  • exactly once / at least once 语义
    默认为exactly once:

    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  • Checkpoint超时时间
    如果某次Checkpoint超过此阈值还没完成,则将进行中的Checkpoint干掉作废,单位毫秒

    env.getCheckpointConfig.setCheckpointTimeout(60000)
  • 下次Checkpoint距离上一次Checkpoint完成后的最小时间间隔毫秒数。
    注意,此时不会管已设置的Checkpoint时间间隔和每次Checkpoint持续时间。也就是说,此时间隔时间永不会比本参数时间还小了。还有一点就是,此时并发Checkpoint数设置失效,强制设为1。

    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1500);

    比如,我们通过env.enableCheckpointing设定了是 2 个两次Checkpoint之间的间隔毫秒数时间为1000ms。那么如果一个Checkpoint耗时900ms,本来过100ms就应做下一个 Checkpoint,导致checkpoint过于频繁。这个时候,本设置让Checkpoint完成之后最少要等 500ms才开始下一个,可以防止 Checkpoint 太过于频繁而导致业务处理的速度下降。

    所以有时候因为某次Checkpoint时间过长,可能导致采用时间间隔方式受影响导致上一个Checkpoint刚完成没多久又开始下一个Checkpoint,可采用本方式。

  • Checkpoint并发数
    默认为1,即只能有1个Checkpoint同时进行。一个没完另一个不会触发。减少对Chcekpoint开销,不会对系统产生过多影响。

    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    对于这种场景可以用并发Checkpoint:有确定的处理延迟(比如经常调用较好时的外部服务接口),但仍希望频繁Checkpoint,以期当发生错误时恢复很少的数据开始处理。

  • Checkpoint失败后是否导致flink任务失败
    默认为true。

    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)

    注:本方法已经废弃,建议改用以下方法,默认0即不容忍任何一个Checkpoint失败:

    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(int)
  • 是否开启外部检查点
    默认情况下,Checkpoint 在默认的情况下仅用于恢复失败的作业,而并不会被保留,即当程序Cancel时 Checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业Fail或Cancel时不会被清除。

    我们可以将周期性的Checkpoint配置为保存到外部存储。这种检查点会将元数据持久化到外部存储,而且即使job失败也不会自动清除。这样,当job失败时可直接从这个现成的Checkpoint恢复。详细参考:Externalized checkpoints 的部署文档

    有两种选择:

    • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
      当Cancel该job时也保留 Checkpoint。也就是说,我们必须在Cancel后需要手动删除Checkpoint文件。

    • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
      当Cancel该job时删除Checkpoint。仅当Job失败时,Checkpoint 才会被保留。

      为单个job设置Externalized方法:

      env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    在flink-conf.yaml中全局设置方法:

    execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
  • 优先从 Checkpoint 恢复而不是Savepoint
    默认为false。该属性确定 job 是否在最新的 checkpoint 回退,即使有更近的可以潜在地减少恢复时间的 savepoint 可用(因为checkpoint 恢复比 savepoint 恢复更快)

    env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

5.2.5 Checkpoint原理

Checkpoint由运行在JM的组件CheckpointCoordinator定时触发:

// checkpointInterval
long baseInterval = chkConfig.getCheckpointInterval();
if (baseInterval < minPauseBetweenCheckpoints) {
    baseInterval = minPauseBetweenCheckpoints;
}

private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
    return timer.scheduleAtFixedRate(
        new ScheduledTrigger(),
        // baseInterval就是我们配置的checkpointInterval
        initDelay, baseInterval, TimeUnit.MILLISECONDS);
}

private final class ScheduledTrigger implements Runnable {

    @Override
    public void run() {
        try {
            triggerCheckpoint(true);
        }
        catch (Exception e) {
            LOG.error("Exception while triggering checkpoint for job {}.", job, e);
        }
    }
}

5.2.5.1 CheckpointBarrier

分布式快照的核心组件被称为stream barrier,他们会被注入到数据流中和数据记录一起在算子之间流动,并且他们永不会超过记录,因为他们是按严格线性流动的。

Barrier将记录分为流入当前快照和下一个快照的两部分,且Barrier会记录下当前快照ID。Checkpoint n 将包含每个算子的状态,这些状态是对应的算子消费了严格在Checkpoint barrier n之前的所有事件,并且不包含在此后的任何事件后而生成的状态。

  1. JM的CheckpointCoordinator定时触发Checkpoint流程

  2. CheckpointBarrier被注入到Source阶段的并行数据流中,第n个快照的注入点Sn就是Source覆盖数据
    比如在Kafka中,注入点就是该partition最后一条记录的Offset。 第n个快照的注入点位置Sn会被报告给JobManager的组件CheckpointCoordinator。

    当Job 开始做Checkpoint barrier N 的时候可以理解为逐步将状态信息填充如下图左下角的表格。

    比如上图Source收到Barrier N后将partition Offset(比如Source当前消费Kafka分区Offset)放入Source状态表格中。

  3. Barrier在注入后,和其他数据一起向数据流下游流动。

  4. 当一个中间算子收到来自所有输入流的Snapshot N的对应Barrier N后,会将自己的状态异步写入持久化存储,并将Barrier N插入输出流发送给下游。

    Barrier N流动到Operator 1时,就将属于Checkpoint N到Checkpoint N-1之间的所有数据反映到当前状态快照中。

  5. 一旦作为流程序DAG终点的某个Sink算子接收到所有输出流发来的Barrier N以后先填充自己的状态表格,随后就会认为快照N已经完成,此时会发送一个快照N已完成的ACK给CheckpointCoordinator。

  6. 当CheckpointCoordinator收到所有Sink算子发出的ACK后,快照N就被认为已经成功执行完成。
    此后就不会再访问Sn以前的数据了,因为认为之前的数据已经走完整个数据拓扑。

  7. 最后 CheckPointCoordinator 会把整个 StateHandle 封装成 completed CheckPoint Meta,写入到 hdfs。

  8. 而如果直到Checkpoint超时,CheckPointCoordinator仍未收集完所有的 State Handle,CheckPointCoordinator会认为本次 CheckPoint 失败,将这次 CheckPoint 产生的所有状态数据全部删除。

5.2.5.2 Checkpoint Barrier对齐过程

当有多个输入流时,需要对齐输入流的同ID的Barrier。

当 job graph 中的每个 operator 都接收到该Barrier时,就会记录下自己的状态。

拥有两个输入流的 Operators(例如CoProcessFunction)会执行Checkpoint Barrier对齐(barrier alignment) ,以便当前快照能包含已消费两个输入流 barrier 之前(但不超过)的所有 event 而产生的状态。

  1. 一旦算子从某个输入流收到BarrierN,那就不能再处理任何该输入流的数据,直到从其他所有输入流收到BarrierN。否则会导致将快照N和快照N+1的数据记录搞混。

    暂时停止处理的流的数据继续收到放入InputBuffer等待处理。

  2. 一旦算子收集齐了BarrerN,就会立刻发送所有OutputBuffer中的PENDING状态的记录给下游,然后发送BarrierN。

  3. 算子发送BarrierN完毕后,在内存中将状态制作快照,然后恢复从输入流获取、处理数据
    注意,会优先将InputBuffer中的数据处理完毕,然后再从输入流处理

  4. 最后,算子将内存中制作好的状态异步写入StateBackend

注意,所有拥有多路输入流和消费来自多个上游子任务输出流的shuffle后的算子都需要使用Checkpoint对齐。

特点:

  • 当若干快照并发执行时,数据流中会同时存在多个Barrier。
  • 需要注意的是,其实Barrier不会中断数据流,所以非常轻量级。

5.2.5.3 算子状态快照

  • 算子状态属于快照一部分
    只要你的算子包含任意格式的State,那就必须作为快照一部分保存。

  • 算子状态快照时间点
    当算子接收到所有输出流的BarrierN后,就在内存中将状态制作快照,然后恢复从输入流获取、处理数据,最后算子将内存中制作好的状态异步写入StateBackend。

    在这个时间点上,所有在BarrierN之前的对State的更新已经做完,而在BarrierN之后对State的更新都还没做。

  • StateBackend
    因为状态快照可能很大,不可能全放在内存,所以被存放到可配的StateBackend

    默认在JM内存中,但生产中一般存放在分布式存储中,比如HDFS。

    当状态快照被存储后,算子会确认该次Checkpoint完成,发送该次快照的 Barrier到输出到下游的流中。

  • 状态快照包含内容

    • 对于每个并行流Source,是快照启动时流中的offset/位置
    • 对于每个算子实例,是一个指向作为快照一部分的状态的指针

5.2.5.4 普通Checkpoint恢复

当失败时,直接从最近一次成功的Checkpoint恢复。

恢复时:

  1. 重新部署整个分布式数据流
  2. 将Checkpoint中包含的状态给Operator
  3. Source会从Checkpoint中包含的位置K开始读取数据流
    比如Kafka中就是记录下的偏移量K。

5.2.5.5 增量Checkpoint恢复

先从最新的全量快照启动算子,然后将后续的增量快照更新到算子状态上,得到最新状态。

5.2.6 Unaligned Checkpoint

5.2.6.1 概述

Flink1.11以前只支持对齐的Checkpoint,从1.11开始也可启用不对齐的Checkpoint了。

  • 对齐的Checkpoint
    每个算子需要等到已接收所有上游发送的 Barrier 对齐后才可以进行 本算子状态的Snapshot ,完成后继续向后发送 Barrier。这样,在出现反压的情况下,Barrier 从上游算子传送到下游算子可能需要很长的时间,从而导致 Checkpoint 超时的问题。

  • 非对齐的Checkpoint
    针对这一问题,Flink 1.11 新增了 Unaligned Checkpoint机制,开启后一旦收到第一个上游Barrier就可以开始执行Checkpoint,并把上下游之间正在传输的数据也作为状态保存到快照中,这样 Checkpoint 的完成时间大大缩短,不再依赖于算子的处理能力,解决了反压场景下 Checkpoint 可能超时的问题。

    可以通过 env.getCheckpointConfig().enableUnalignedCheckpoints();开启unaligned Checkpoint

特点:

  • 优点
    CheckpointBarrier可以直接越过in-flight数据,所以与吞吐不再有关联
  • 限制
    • 不能直接对非对齐Checkpoint Job进行rescale或修改JobGraph,必须在rescale前进行Savepoint(会对齐)
    • 目前不支持并发非对齐Checkpoint(本模式时间更短,所以其实不太需要并发)
    • 在恢复过程中,未对齐的检查点会中断有关水印的隐含保证,参考Unaligned checkpoints

5.2.6.2 原理

非对齐Checkpoint核心思想就是只要in-flight缓存数据会成为OperatorState的一部分,那Checkpoint就可以超越这些数据。

  1. 算子收到第一个Barrier后,存到输入缓存InputBuffer中
  2. 算子对缓存中的Barrier立刻处理,将该Barrier放入输出OutputBuffer末尾以立刻发送给下游算子
  3. 算子将所有已超越的记录(上图中超过了 3、2、1、d、c、b、a、z、y、x)标记为异步存储,并据此创建其自身状态的快照。

因此,非对齐方式的Checkpoint算子仅短暂停止输入流处理以标记缓冲区、发送Barrier、创建其他状态的快照,不需要再像对齐Checkpoint那样等待所有所有输入流的Barrier。

非对齐Checkpoint可保证Barrier尽快到达Sink,所以特别适合某个流路径进展缓慢的情况,如果采用对齐Checkpoint甚至或延迟小时级别。

5.2.6.3 Unaligned Checkpoint恢复

  1. 算子首先恢复in-flight数据
  2. 然后再开始从上游算子消费处理数据,重新部署整个分布式数据流
  3. 将Checkpoint中包含的状态给Operator
  4. Source会从Checkpoint中包含的位置K开始读取数据流
    比如Kafka中就是记录下的偏移量K。

5.2.6.4 小结

  • 使用场景
    • 多上游中的一个路径进展很慢
      非对齐方式的Checkpoint确保Barrier尽快到达Sink,特别适合至少有一个数据移动很缓慢的路径的应用,这种场景下对齐等待时间可能达到小时级。
    • 背压导致Checkpoint持续时间过长
      采用本方式后,大多数情况下Checkpoint持续时间和端到端延迟无关了
  • 不适用场景
    但问题是会增加额外的IO压力,在写入StateBackend的IO是瓶颈时不适用。
  • Savepoint只能对齐Checkpoint,不能非对齐Checkpoint。

5.2.7 Exactly Once / At Least Once / At Most Once

根据用户配置以及使用的集群,Flink有三种语义:

  • Exactly Once
    数据既不丢失也不重复。注意这里的含义是保证引擎管理的状态更新只提交一次到持久的后端存储,而不是引擎只处理一次该数据。

    • Checkpoint
      Flink精确一次是由Checkpoint Barrier对齐实现的,通过这种机制,流应用程序中每个算子的所有状态都会定期一致性地做 checkpoint。一旦程序在运行中发生失败,就把所有算子都统一回滚到最后一次成功执行的全局一致Checkpoint。回滚期间,会STW。回滚后,Source源也回滚到了这个Checkpoint时的offset。也就是说,整个流应用程序基本上是回到最近一次的一致状态,然后程序可以从该状态重新启动。

      可以看到T3回滚后,Source offset已经重置,5,3会被重新处理,但他们仍然只会精确影响状态sum一次,所以结果依然正确。

    如果不需要可以设置CheckpointingMode.AT_LEAST_ONCE,此时可以提高性能,但不再是精确一次了。

      注意:Flink通过回退和重放Source数据流,Exactly Once并不意味着每个event只被精确处理一次,而是指每个event只会精确一次地影响由Flink管理的状态。
    • 至少一次事件传递和对重复数据去重
      对每个算子实现至少一次事件传递 + 数据去重,要求为每个算子维护一个事务日志
  • At Least Once
    数据不丢,但可能重复

    具体做法是失败即从源头重试执行,但事件可能被处理多次。比如CheckpointBarrier不对齐时就进行快照,则可能导致先到达的那个流的数据被处理多次。

  • At Most Once
    保证数据或事件最多由应用程序中的所有算子处理一次

    具体做法是不尝试从错误中恢复

5.2.8 Checkpoint Barrier对齐与Exactly Once 、 At Least Once

  • At Least Once
    Checkpoint对齐有时候可能会增加大量耗时,对于需要超级低延时、但又要求一致性的程序,可以关掉对齐。也就是说,算子一旦看到Checkpoint Barrier就开始生成Checkpoint快照。

    当跳过Checkpoint对齐后,当一些输入流的BarrierN来到后,此时算子标记完成后可以继续处理所有输入,这就造成算子可能在CheckpointN完成之前又同时在处理属于CheckpointN+1的数据,因为这些后来的数据虽然在Checkpoint N Barrier之后,但会将其包含在这次Checkpoint备份的状态之中。

    这就造成在恢复CheckpointN时这些记录重复出现,因为他们既在CheckpointN的状态快照之中(已经影响了Checkpoint N记录的状态)又会在恢复时作为Checkpoint N之后数据的一部分被重放导致重复计算!

  • Exactly Once
    所以要想Exactly Once,就要开启对齐,使用InputBuffer将对齐阶段继续接收的数据缓存,等待对齐完成后继续处理,此时如果从CheckpointN恢复时,自然其保存的状态就不会有CheckpointN后来的数据的干扰了。

注意:Checkpoint对齐只能用于拥有多个前驱结点的算子(如Join)和拥有多个后置节点的算子(repartitioning/shuaffle)。
所以只有一个后置或前驱结点的单并行算子(如map、flatMap、filter等)即使在at least once模式也会表现为exactly once

Checkpoint Barrier对齐与不对齐例子可参考一文搞懂 Flink 的 Exactly Once 和 At Least Once

5.2.9 QA

转自 一文搞懂 Flink 的 Exactly Once 和 At Least Once,作者范瑞

5.3 Savepoint

5.3.1 概述

Savepoint 其实就是手动触发的 Checkpoint,它依靠常规的 checkpoint 机制获取程序的快照并将其写入StateBackend保存。

Savepoints 允许在不丢失任何状态的情况下升级程序和 Flink 集群。

5.3.2 Operator ID与Savepoint状态

Savepoint内部有一个类似map结构,为每个有状态算子保存了Operator ID -> State的映射。

所以我们使用DataStreamApi时,尽量为每个有状态算子声明ID,方便恢复。如果不指定会自动生成ID,只要这些ID不变就能手动从savepoint恢复。这些ID取决于程序,对程序改动很敏感。

正例如下:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

此时savepoint生成的映射如下:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

可以看到,只生成了有状态算子的映射。而print这样的无状态算子被忽略,不会成为状态一部分。

5.3.3 Savepoint与Operator改动

  • Savepoint后,在Job内增加了新有状态算子,再从Savepoint恢复
    可以,心有状态算子在没有状态情况下初始化
  • Savepoint后,在Job内删除了有状态算子,再从Savepoint恢复
    此时会因为Savepoint默认会恢复状态内的所有算子给新Job,此时会导致无法恢复,只能加上allowNonRestoredState跳过无法恢复的状态
  • Savepoint后,在Job内对有状态算子重新排序,再从Savepoint恢复
    如果有状态算子指定了uid,则不受影响;如果没有,则可能导致有状态算子自动生成的ID发生变化,导致无法恢复从以前的Savepoint恢复
  • Savepoint后,在Job内对无状态算子添加、删除或重排序,再从Savepoint恢复
    如果有状态算子指定了uid,则不受影响;如果没有,则可能导致有状态算子自动生成的ID发生变化,导致无法恢复从以前的Savepoint恢复
  • Savepoint后,改变程序并行度,再从Savepoint恢复
    没问题,恢复后使用新的并行度即可
  • 可以移动savepoint内的文件吗?
    目前不行,因为metadata文件内使用的是绝对路径。非要改的话,就编辑该文件内容。

5.3.4 Savepoint对比Checkpoint

  • Checkpoint在Flink Job执行期间定期在 TaskManager 节点上创建快照并生成 Checkpoint,所以在恢复时Flink 仅需要最后成功完成的Checkpoint即可。也就是说,一旦成功完成了新的Checkpoint,旧的就可以被丢弃。
  • 管理者不同
    • Savepoints 由用户触发、管理、删除,并在新的 checkpoint 完成时不会自动过期,可通过命令行或在Cancel一个 Job 的同时通过 REST API 来创建 Savepoint。
    • Checkpoint由Flink负责管理、删除等。
  • Checkpoint可以使用RocksDB增量模式,比Savepoint更轻量级
  • Savepoint的数据以标准格式输出存储,允许版本升级或配置变更时使用

5.3.5 原理

将Checkpoint Barrier手动插入到所有Pipeline中从而产生分布式快照。


使用EventTime,所以每个事件总是放入同一个窗口,保证结果一致性。

5.4 State Backend 状态后端

可参考:

5.4.1 概述


Checkpoint 会将 timer 以及有状态算子中的状态进行一致性快照保存, 包括Connector(比如KafkaConnector、HDFSConnecotr等),Window以及任何用户自定义State。

DataStream API涉及到的State有:

  • Window在触发计算前收集、聚合的元素
  • 转换函数可能使用 key/value State 来存储值
  • 转换继承CheckpointedFunction来 对存入State的值实现容错。
    比如StreamingFileSink实现该接口,使用状态存储了Bucket、Part-File,且实现了容错。

State Backend管理的状态包括:

  • org.apache.flink.streaming.api.datastream.KeyedStream使用的Keyed State
  • 实现org.apache.flink.streaming.api.checkpoint.CheckpointedFunction来直接管理的State

开启Checkpoint后,可以将上述State Checkpoint持久化,防止数据丢失,出错时可一致性恢复。而具体选择的StateBackend就决定了State内部如何组织表现,以及如何、在哪进行Checkpoint。

Checkpoint存储的可能位置包括JobManager内存、文件系统、数据库等(State默认存储在TaskManager内存中,而Checkpoint保存在JobManager内存中),具体取决于所配置的State Backend(比如持久化巨型状态就不适合放在内存),当前有以下State Backend可用于存储 Checkpoint State:

  • MemoryStateBackend(默认)
  • FsStateBackend
  • RocksDBStateBackend

Job级别的StateBackend配置方式如下:

StreamExecutionEnvironment.setStateBackend()

这定义了该Job保存State的数据结构以及Checkpoint数据存储位置。

全局StateBackend配置方式如下:

  • MemoryStateBackend为不配置时的默认选项。如果要修改默认配置,请修改flink-conf.yaml文件中的state.backend

    state.backend: filesystem
    
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

    其中:

    • state.backend
      可选内置的jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),或自己实现自StateBackendFactory的类全限定名,如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

    • state.checkpoints.dir
      定义了StateBackend写Checkpoint数据和元数据文件路径。目录结构如下:

      /user-defined-checkpoint-dir
          /{job-id}
              |
              + --shared/
              + --taskowned/
              + --chk-1/
              + --chk-2/
              + --chk-3/
              ...

      shared目录保存了可能被多个Checkpoint引用的文件;taskowned存储JM不能删除的文件;chk开头的是每次checkpoint使用的文件,数字表示checkpointId。

5.4.2 MemoryStateBackend


  • MemoryStateBackend为不配置时的默认选项。

  • 存储位置

    • 平时
      以Java Object形式放在TaskManager的JVM内存中。具体来说, Key/value state 和 Window算子持有HashTable来存储State值和触发器等。
    • Checkpoint时
      将State进行快照,然后将它作为Checkpoint消息一部分发送给JobManager,然后放在JobManager的JVM内存中。
  • 异步快照
    支持,且默认开启,强烈建议使用异步快照来防止数据流阻塞。

    要关闭异步采用同步方式(官方建议只在Debug时关闭异步Checkpoint):

    new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
  • 使用限制

    • 每个单独的State默认最大5MB,可用MemoryStateBackend构造函数设置

      env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE, false))

    但是必须注意该参数谨慎调大,因为State Checkpoint时TaskManager会将State Checkpoint后的数据通过限制了大小的RPC方式发送给JobManager,而JobManager需要在内存保存来自各个TaskManager的状态数据。太大了可能导致OOM!

    • 同时由于Flink 节点间采用Akka通信,所以单状态大小受限于Akka Frame Size限制。
    • JobManager的JVM内存必须能存下所有聚合后的State
  • 特点

    • 这种方式轻量级无需其他依赖,但受JVM内存限制,只能对小型State做Checkpoint,适用于
      • 本地开发测试
      • 计数器
      • 几乎无状态的Job,比如仅由每次只处理一条记录的算子(Map、FlatMap、Filter等)组成
      • KafkaConsumer,只需要很少的State用于记录消费offset情况
    • 每次运算值需要读取状态时使用Java object 读写,代价较小;
  • 调优
    managed memory内存设为0,因为RocksDB StateBackend才会使用该部分内存。这样设定可以使得用户代码可以使用JVM能提供的最大值来执行。

5.4.3 FsStateBackend


  • 存储位置

    • 平时
      以Java Object形式放在TaskManager的JVM内存中,读写代价小
    • Checkpoint发生时
      以Java Object形式将状态的Checkpoint序列化后保存到分布式文件系统,还有极小的State元数据存储在JobManager内存中(或是在高可用模式下将元数据存放在元数据Checkpoint中)。
  • 异步快照
    支持,且默认开启异步快照Checkpoint来防止数据流阻塞。

    如果要关闭异步采用同步方式:

    new FsStateBackend(path, false);

    这里的Path是Checkpoint持久化目录,如hdfs://namenode:40010/flink/checkpointsfile:///data/flink/checkpoints

  • 特点

    • 当使用分布式文件系统如HDFS或Alluxio时,可保证单节点故障时也不会丢状态的数据,使得Flink程序高可用、强一致性。
    • 读写本地State时直接使用Java Object读写,开销较小
    • 当Checkpoint发生时需要将状态序列化再保存到HDFS,有一定开销。
  • 适用场景

    • 大型State,很大的key/value State
    • 很宽的Window
    • 需要高可用
    • 同时对读写性能也有较高要求
    • 注意,不支持增量Checkpoint
  • 调优
    managed memory内存设为0,因为RocksDB StateBackend才会使用该部分内存。这样设定可以使得用户代码可以使用JVM能提供的最大值来执行。

5.4.4 RocksDBStateBackend

可参考:

5.4.4.1 概述


  • 简介
    RocksDB 是一个以日志合并树( LSM 树,Kudu、HBase都有使用)作为索引结构的 KV 存储引擎。当用于在 Flink 中存储 kv 状态时,Key由<Keygroup,Key,Namespace> 的序列化字节串组成,而Value由状态的序列化字节组成。

    每次注册 kv 状态时,它都会映射到列族(column-family),并将键值对以字节存储在 RocksDB 中。这意味着每次读写(READ or WRITE)操作都必须对数据进行反序列化或者序列化,与 Flink 内置的 in-memory 状态后端相比,会有一些性能开销。

  • 存储位置

    • 平时
      将运行时State数据保存在RocksDB数据库中,RocksDB 数据库默认将数据存储在 TaskManager 节点磁盘上的的数据目录。
    • Checkpoint发生时
      将整个RocksDB数据库Checkpoint到配置的文件系统目录。同样的,还有极小的State元数据存储在JobManager内存中(或是在高可用模式下将元数据存放在元数据Checkpoint中)。
  • 仅支持异步快照
    RocksDBStateBackend仅支持异步快照,且默认开启异步快照Checkpoint来防止数据流阻塞。

  • 可支持增量Checkpoint
    RocksDB是唯一可支持的

  • 使用限制
    由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持2^31 字节。 意: RocksDB 使用merge算子的状态(例如ListState)累积数据量大小可能悄悄超过 2^31 字节,会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。

  • 特点

    • 可保留的State大小仅受可用磁盘空间量的限制。

      与平时将State保留在内存中的FsStateBackend相比,RocksDBStateBackend可以保留非常大的状态。 但这也意味着若使用RocksDBStateBackend,则可以实现的最大吞吐量将降低。

    • 每次读取状态都有序列化/反序列化开销
      从RocksDBStateBackend进行的所有读/写都必须经过序列化/反序列化以检索和存储State对象,这也比基于堆的StateBackend开销大很多。

    • 可使用增量快照

    • 不受 Java 垃圾回收的影响,与 heap 对象相比,它的内存开销更低

    • RocksDB是唯一可支持增量Checkpoint的

  • 适用场景

    • 巨型State,巨大的key/value State
    • 很宽的Window,如天级窗口聚合计算
    • 需要高可用
    • 对计算性能要求较低,因为RocksDB读写有序列化/反序列化开销

Flink花了很多功夫在内存管理上,以使得TM很好得利用内存,不至于在容器环境因为内存超限被杀掉,也不会因为内存利用率过低导致大量内存数据落盘或缓存命中率太低。

默认,RocksDB的可用内存配置为TM的一个Slot的内存量,一般用户无需调整细节,只需在不够时增加整体内存大小即可。state.backend.rocksdb.memory.managed就开启了RocksDB使用Managed Memory,Flink通过配置 RocksDB 来确保其使用的内存正好与 Flink 的Managed Memory预算相同,计算粒度是Per-Slot。也就是说,Flink会为一个Slot上的所有 RocksDB 实例使用共享的 RocksDB cacheRocksDB write buffer manager

调整缓冲区:

  • 写入缓冲内存不够时(现象为频繁flush)可调整state.backend.rocksdb.memory.write-buffer-ratio即分配给写入缓存的比例,默认0.5即50%。
  • 读取缓冲命中率低时可调整state.backend.rocksdb.memory.high-prio-pool-ratio即分配给写入缓存的比例,默认0.1即10%,这部分内存优先分配给RocksDB的索引和过滤器。

但专业用户也可以手动为RocksDB每个列族(每个算子的每个State就对应了一个列族)分配内存,用户自己来确保总内存不会超限。

5.4.4.3 Timer(Heap vs RocksDB)

这里说的Timer是指用来触发窗口或回调ProcessFunction的类似操作,可基于事件时间或处理时间。

RocksDBStateBackend中Timer默认存在RocksDB,需要一定成本维护,所以也可以将Timer存在Java堆。

Timer较少时,可将state.backend.rocksdb.timer-service.factory设为heap,可获得更好的性能。但这样设置后,Timer状态就不能被异步快照存储了。

5.4.4.4 列族(ColumnFamily)级别的预定义选项

5.4.4.5 通过 RocksDBOptionsFactory 配置 RocksDB 选项

5.4.5 存储原理

5.4.5.1 概述

5.4.5.2 HeapKeyedStateBackend

对于HeapKeyedStateBackend,有两种实现:

  • 支持异步 Checkpoint(默认):存储格式CopyOnWriteStateMap

  • 仅支持同步 Checkpoint:存储格式 NestedStateMap

  • namespace用来标注如属于哪个window

  • MemoryStateBackend 内使用HeapKeyedStateBackend时,Checkpoint 序列化数据阶段默认有最大 5 MB数据的限制

5.4.5.3 RocksDBKeyedStateBackend

  • RocksDBKeyedStateBackend的每个 state 都存储在一个单独的 column family 内,使用了基于LSM树的磁盘、内存混合型DB

写入相关操作如下图

  1. 将数据写入内存中的Active MemTable
  2. MemTable会被后台线程周期性Flush到磁盘,生成按Key排序的只读的不可变文件SSTable
    当 MemTable 写满时,它将成为 READ ONLY MemTable,并被一个新申请的 MemTable 替换。
  3. 刷到磁盘的 SSTable 文件会被后台线程多路归,并实现进一步的整合为大文件,提升读取效率
    合并后的 sstable 包含所有的键值对,RocksDB 会删除合并前的 sstable。
  4. 每个注册状态都是一个列族,这意味着每个状态都包含独享的 MemTables 和 SSTables 集。

读取时:

  • 首先访问Active MemTable
  • 若找不到,则访问SSTable
    • 优先查RocksDB BlockCache
    • 没有则查操作系统缓存PageCache
    • 在没有就查本地磁盘
    • 在此过程中可使用bloom filter减少大量磁盘访问,进行过滤

增量时(可参考Managing Large State in Apache Flink: An Intro to Incremental Checkpointing以及 翻译版):

  • Flink 将所有新生成的 sstable 备份到持久化存储(比如 HDFS,S3),并在新的 checkpoint 中引用。Flink 并不备份前一个 checkpoint 中已经存在的 sstable,而是在需要时仅引用他们,并记录下引用次数。Flink 还能够保证所有的 checkpoint 都不会引用已经删除的文件,因为 RocksDB 中文件删除是由压缩完成的,压缩后会将原来的内容合并写成一个新的 sstable。因此,Flink 增量 checkpoint 能够切断 checkpoint 历史。


以下转自Apache Flink 管理大型状态之增量 Checkpoint 详解,作者Stefan Ricther & Chris Ward,翻译 邱从贤(山智)


关于控制便捷性与性能之间平衡的策略可以参考此文档:

5.4.6 高级用法

  • KeyedStateCheckpointOutputStream
  • OperatorStateCheckpointOutputStream

5.4.7 性能对比

  • Windowed Word Count Flink 不同 StateBackends 吞吐量对比

    此时使用 FileSystem 和 Memory 的吞吐差异不大,而使用 RocksDB 的吞吐仅其余两者的十分之一左右。
  • Windowed Word Count Flink 不同 StateBackends 延迟对比

    可见FileSystem 和 Memory 作为 Backends 时,延迟基本一致且较低,而RocksDB在OnYarn模式下 QPS 2000以后延迟指数级上升

5.4.8 更多配置

可参考:

以下配置在conf/flink-conf.yaml中配置,改变后需要重启Flink应用。

Key

Default

Type

Description

state.backend

(none)

String

保存和Checkpoint State的位置

state.backend.async

true

Boolean

StateBackend是否应在可能且可配置的情况下使用异步快照方法。 某StateBackend可能不支持/仅支持异步快照,会忽略此选项。

state.backend.fs.memory-threshold

1024

Integer

State数据文件大小的最小值。 所有大小小于该值的State Chunk都以内联方式存储在根Checkpoint元数据文件中。

state.backend.fs.write-buffer-size

4096

Intege

写入文件系统的Checkpoint流的写缓冲区的默认大小。 实际的写缓冲区大小为本选项和state.backend.fs.memory-threshold的最大值

state.backend.incremental

false

Boolean

StateBackend是否应创建增量Checkpoint(如果可能)。 对于增量Checkpoint,仅存储与前一个Checkpoint的差异,而不存储完整的Checkpoint状态。 某些StateBackend可能不支持,会忽略此选项

state.backend.local-recovery

false

Boolean

默认禁用。为此StateBackend配置本地恢复。 当前,本地恢复仅涵盖Keyed-StateBackend。 注意,目前MemoryStateBackend不支持本地恢复,请忽略此选项。

state.checkpoints.dir

(none)

String

用于在Flink支持的文件系统中存储Checkpoint的数据文件和元数据的默认目录。 必须从所有参与的进程/节点(即所有TaskManager和JobManager节点)访问存储路径

state.checkpoints.num-retained

1

Integer

要保留的最大已完成Checkpoint数

state.savepoints.dir

(none)

String

Savepoint的默认存储目录。 由StateBackend用于将Savepoint写入文件系统(适用于MemoryStateBackend,FsStateBackend,RocksDBStateBackend)

taskmanager.state.local.root-dirs

(none)

String

定义用于存储基于文件的State以进行本地恢复的根目录。 当前,本地恢复仅涵盖Keyed-StateBackend。 当前,MemoryStateBackend不支持本地恢复,请忽略此选项

5.5.1 概述

为了使Flink应用程序在规模很大时仍能可靠运行,必须满足两个条件:

  • 应用程序必须能够可靠地运行Checkpoint
  • 发生故障后,程序重启后的资源必须足够赶上输入数据流

5.5.2 监控状态和Checkpoint

可参考Monitoring Checkpointing

Checkpoint关键指标:

  • checkpoint_start_delay
    表示从Checkpoint被Checkpoint发起到算子真正开始执行Checkpoint时间 ,计算公式如下:
    checkpoint_start_delay = end_to_end_duration - synchronous_duration - asynchronous_duration

    如果该值持续高值,说明checkpoint barrier从Source流到算子的时间过长,一般就意味着出现了长期背压的情况。

  • CheckpointBarrier对齐期间的数据缓存量
    在开启exactly-once语义后,多路input的算子收到一个input的Barrier就需要做对齐操作,在此期间就需要缓存数据。理想情况下缓存数据应该比较少,否则说明多个input的Bairrer长期不对齐。

5.5.3 Checkpoint调优

当Checkpoint耗时过长,意味着Checkpoint耗费太多资源,算子进展太少,虽然是异步Checkpoint,但依然会对整个应用性能产生影响。

此时就应该使用StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints调大Checkpoint之间最小间隔时间,该配置是定义了当前Checkpoint的结束与下一个Checkpoint的开始之间必须经过的最短时间间隔,官网图例如下:

  • 图一为正常Checkpoint
  • 图二为Checkpoint执行时间超过了设定的间隔,导致每次Checkpoint刚结束就开始下一个Checkpoint
  • 图三表示采用了minPause限制,即使Checkpoint执行时间过久,也必须等到minPause后才能跑下一个Checkpoint

5.5.4 RocksDB调优

  • 开启增量Checkpoint

  • 将默认存在RocksDB中的Timer改为存在Java堆,谨慎使用!
    Timer较少时,可将state.backend.rocksdb.timer-service.factory设为heap,可获得更好的性能。但这样设置后,Timer状态就不能被异步快照存储了。而且可能增加Checkpoint时间,大小也不能超过能存限制**,谨慎使用!**

  • Tuning RocksDB Memory

    1. 最直接的方式就是调大managed memory,可以显著提高性能。
      默认只有0.4比例,除非程序逻辑本身需要很多JavaHeap内存,否则可尽量多给managed memory

    2. 因为每个状态对应了一个RocksDB列族,每个列族都需要独有的write buffer,所以拥有很多状态的应用通常需要更多内存

    3. 可设state.backend.rocksdb.memory.managed:false来对比性能
      非托管模式中,除非使用了ColumnFamily,否则上线公式为140MB * num-states-across-all-tasks * num-slots(State包括Timer)

    4. 如果拥有很多状态的应用中观察到频繁地MemTable flushes,说明写入有瓶颈了,如果你不能给RocksDB更多内存了,那此时可以配置state.backend.rocksdb.memory.write-buffer-ratio将更多内存分配给write buffer。

    5. 专家可以尝试使用RocksDBOptionsFactory来调整arena block sizemax background flush threads等,以减少MemTable flushes

          public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
      @Override
      public DBOptions createDBOptions(DBOptions currentOptions, Collection&lt;AutoCloseable&gt; handlesToClose) {
          // increase the max background flush threads when we have many states in one operator,
          // which means we would have many column families in one DB instance.
          return currentOptions.setMaxBackgroundFlushes(4);
      }
      
      @Override
      public ColumnFamilyOptions createColumnOptions(
          ColumnFamilyOptions currentOptions, Collection&lt;AutoCloseable&gt; handlesToClose) {
          // decrease the arena block size from default 8MB to 1MB. 
          return currentOptions.setArenaBlockSize(1024 * 1024);
      }
      
      @Override
      public OptionsFactory configure(Configuration configuration) {
          return this;
      }
      }

5.5.5 资源规划

要想是的Flink Job运行可靠,一般可遵循以下步骤规划资源:

  1. 正常运行应具有足够的容量,但应在恒定的背压下无法运行。
  2. 在无故障且无背压运行程序所需的资源的基础上,还需提供一些额外的资源,以备在发生错误恢复时有足够资源来追上恢复任务期间累积的未处理数据。
    具体取决于任务恢复速度以及具体场景需要多快恢复
  3. 短暂的背压是没问题的,他是负载高峰或要写入的外部系统突然变慢时的必要流控手段。
  4. 一些算子(如巨大的window)会在每次触发时给下游算子带来负载高峰,所以下游算子并行度规划时需要考虑window大小、频率以及需要处理完成的速度
  5. Job最大并行度应该设置足够,决定了使用savepoint来重设并行度时的上限

5.5.6 Checkpoint压缩

默认关闭。

目前只支持snappy compression algorithm (version 1.1.4):

ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);

注意,此选项不适用于增量快照,因为增量快照使用的是RocksDB的内部格式,始终使用开箱即用的snappy compression 。

Checkpoint压缩工作粒度为keyed state下的key-group,解压时也是此粒度,方便扩缩容。

5.5.7 Task本地恢复

5.5.7.1 现存问题-大状态任务恢复慢

Checkpoint时,每个Task都会生产一个状态快照,然后异步写入StateBackend。随后,每个Task会发送一个ACK给JM告知状态写入成功,该ACK是一个句柄,带有state在StateBackend中的位置。JM在收集完所有task的ACK后,将他们封装到一个Checkpoint对象中。

在恢复时,JM打开最近的Checkpoint,然后将其中的state文件句柄发回给各个对应的task以进行恢复。

使用分布式StateBackend的好处是容错性和方便扩缩容;坏处是必须通过网络远程读取访问,可能在大型状态场景导致很长的恢复时间,尽管可能只是因为很小的错误发生。

5.5.7.2 Task本地恢复

5.5.7.2.1 概述

核心思想就是每次Checkpoint时,写一份主副本到远程StateBackend,再写一份副本到本地(磁盘或内存)。

这样,在恢复时大多数task只需要找到之前本地state就能从本地恢复无需从远程下载。

5.5.7.2.2 两个副本间的关系

Checkpoint的StateBackend副本始终被认为是主副本,而本地副本是第二副本。而且他们格式不一定相同,比如存放在内存的本地状态就是Java对象。

Checkpoint时,主副本必须成功此次Checkpoint才能成功,而如果此时本地副本写入失败不会导致此次checkpoint失败。

主副本被JM确认和管理,而第二副本是由TM管理,生命周期可独立于主副本。比如保留3个最新主副本,而只保留一个本地最新副本。

恢复时,总是先尝试本地副本,如果失败就找远程主副本,如果还是失败就根据配置可找上一个checkpoint继续恢复。

更巧妙的是,即使Flink因为错误只写入部分状态到本地副本,也会尝试恢复这部分状态,然后再去远程副本恢复其他状态,因为远程状态总是完整的。

但如果TM挂了,则关联的本地状态也会全部丢失。

5.5.7.2.3 本地Checkpoint副本配置

默认关闭。

配置

默认

类型

说明

state.backend.local-recovery

false

Boolean

默认禁用。为此StateBackend配置本地恢复。
当前,本地恢复仅涵盖Keyed-StateBackend。
注意,目前MemoryStateBackend不支持本地恢复,请忽略此选项。

注意,目前非对齐Checkpoint不支持本地Checkpoint副本恢复。

5.5.7.2.4 不同StateBackend的本地Checkpoint副本

当前只支持KeyedStateBackend,未来会支持算子和timer的状态。

目前支持的StateBackend如下:

  • FsStateBackend
    将状态复制到一个本地文件,会带来一定写入和磁盘空间开销。未来也许会做一个内存副本版本。
  • RocksDBStateBackend
5.5.7.2.5 保留分配的task调度

Task本地恢复模式中假定在错误后保留task调度,工作机制如下:

  • 每个task记住之前的分配,请求同样的slot来开始恢复
  • 如果该slot不可用,则task向RM请求一个全新的slot
    这样,即使一个TM不再可用,一个不能回归到之前位置的task不会再驱动其他正在恢复的任务从之前的slot中移出
  • 这样做的好处是使得从本地恢复状态的task数量最大化,避免了夺取其他task之前的slot带来的级联效应(互相夺取slot)。

5.6 增量Checkpoint

5.6.1 概述

在减少Checkpoint花费的时间时,可首先考虑开启增量Checkpoint。 与完整Checkpoint相比,增量Checkpoint可以大大减少Checkpoint的时间,因为增量Checkpoint仅记录与先前完成的Checkpoint相比发生变化的部分,而不是生成完整数据。

一个增量checkpoint依赖之前的若干checkpoint构建。由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。

如果网络为瓶颈,则从增量Checkpoint恢复可能会较慢,因为需要读取更多delta增量文件;而如果CPU或IO为瓶颈,则从增量Checkpoint恢复更快,因为这种方式不会像全量或savepoint那样使用并解析Flink Key/Value 快照格式数据来重建本地RocksDB表。

增量Checkpoint开启方式:

new RocksDBStateBackend(path, TernaryBoolean.TRUE);

这里的Path是Checkpoint持久化目录,如hdfs://namenode:40010/flink/checkpointsfile:///data/flink/checkpoints
或使用配置state.backend.incremental: true开启

需要注意的是,只要开启了增量Checkpoint,则在WebUI上显示的Checkpointed Data Size就只代表增量Checkpoint数据大小,而不是整个State大小了。

5.6.2 原理

  • 全量Checkpoint
    全量Checkpoint会在每个节点做备份数据时将数据遍历一遍,然后再写到外部存储中,这种情况会影响备份性能。下图ss是不可变的数据文件,manifest是元数据文件
  • 增量Checkpoint
    此时只会将新产生的文件持久化,而不用管旧文件,提高Checkpoint性能。

5.6.3 调优

5.7 状态最佳实践

6 Task和Operator

6.1 Task

6.1.1 概述

是PhysicalGraph的节点,一个task是Job的基本组成单元,在Flink运行时被执行。

Task 精确封装了一个 Operator 或者 Operator Chain 的并行实例。

6.1.1 Sub-Task

一个Sub-Task是指处理数据流的一个partition的task。

该术语强调同一个Operator 或者 Operator Chain 拥有多个并行实例由多个Task执行。

6.2 Operator

6.2.1 概述

Operator即算子,是LogicalGraph的节点。

每个算子执行特定操作。

6.2.2 Operator与Task

分布式计算中,Flink 将算子(operator)的 subtask 按一定规则链接成若干task,每个 task 由TaskManager上的一个线程执行。

把算子链接成 task 能够有效减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量。

下图的 dataflow 由5个 subtasks 执行,因此具有5个并行线程运行:

  • 上方那个Operator Chain是整个算子链的概要图
    可以看到source和map组成了一个chain在一个task执行;keyBy是一个单独的task;sink也是一个单独task
  • 下方是加入了并行概念后的算子链视图
    因为source/map/keyBy 并行度都为2,所以共有4个Task;sink并行度为1;所以总共有5个task。

Operator Chain配置详情可参考:6.2.2

6.2.3 Operator Chain

6.2.3.1 概述

将两个连续的算子(即不会造成repatitioning的算子)连接即将他们放在同一个线程执行,同一个算子链内的算子直接交换数据,而不会进行序列化/反序列化或Flink网络相关操作,以获得更好的性能,优点:

  • 减少线程切换
  • 内部传输过程不会进行序列化/反序列化或Flink网络相关操作
  • 减少数据在缓冲区的交换
  • 减少了延迟的同时提高整体的吞吐量

Flink默认会尽可能将算子连接(比如两个map算组),但也可以自主控制:判断是否构成Chain的条件很苛刻,需要以下提交件全部都为true:

  • 下游StreamNode的入边数为1,即只有一个input,而不是多个输入
  • 上游StreamNode和下游StreamNode同在一个SlotSharingGroup
  • 上游的ChainingStrategy不能是NEVER
    即只能是ALWAYS(可参与上下游连接,如FlatMap、Filter、Map)或HEAD(只能作为上游与下游连接,但不能与下游连接,如Source)
  • 下游必须是ALWAYS
  • 该StreamEdge的Partitioner必须是ForwardPartitioner,即发送和接收分区一一对应
  • 该StreamEdge的ShuffleMode不能为BATCH,该模式下生产者先生产完整数据,然后消费者才开始消费数据;还有个模式是PIPELINED,此时生产者消费者同时完成生产消费;UNDEFINED的含义是由Flink推断到底采用以上哪种模式。
  • 上下游并行度相同
  • StreamGraph允许Chain机制。默认true,可通过pipeline.operator-chaining设置。

下面这幅图,展示了Source并行度为1,FlatMap、KeyAggregation、Sink并行度均为2,最终以5个并行的Task线程来执行:

可以看到,Key Agg和Sink因为符合Chain的条件被连接到一起,作为一个Task执行。

6.2.3.2 配置

可在job级别禁用 Operator Chain:

StreamExecutionEnvironment.disableOperatorChaining()

细粒度控制:

  • 开启新的Operator Chain
    注意startNewChain必须用在算子之间,不能直接放在stream后面。

    以下例子filter是单独的task,但第一个map和第二个map组成了一个Operator Chain

    someStream.filter(...).map(...).startNewChain().map(...)
  • 算子级别禁用Operator Chain

    someStream.map(...).disableChaining();

6.2.3.3 实现原理

以下为入度为1,出度为2的一个OperatorChain示例。

  • OperatorChain为一个算子链的呈现类

  • OperatorChain对外部来说就是一个整体,只需要将input数据传输给该算子链的HeadOperator即可
    比如上图的算子链就可以看作是入度为1,出度为2的一个算子整体

  • 上图中的实线就对应了JobEdge

  • 上图中的虚线就是OperatorChain内部数据传输ChainingOutput,不会经过序列化/反序列化、网络传输,而是直接通过方法传递处理。

    OperatorChain.ChainingOutput如下

    static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        // operatorChain的下游算子
        protected final OneInputStreamOperator<T, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
    protected final StreamStatusProvider streamStatusProvider;
    
    // 为一个算子标记side output
    @Nullable
    protected final OutputTag&lt;T&gt; outputTag;
    
    public ChainingOutput(
            OneInputStreamOperator&lt;T, ?&gt; operator,
            StreamStatusProvider streamStatusProvider,
            @Nullable OutputTag&lt;T&gt; outputTag) {
        // operatorChain的下游算子   
        this.operator = operator;
        ...
        this.streamStatusProvider = streamStatusProvider;
        this.outputTag = outputTag;
    }
    
    @Override
    public void collect(StreamRecord&lt;T&gt; record) {
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }
    
        pushToOperator(record);
    }
    ...
    protected &lt;X&gt; void pushToOperator(StreamRecord&lt;X&gt; record) {
        try {
            // we know that the given outputTag matches our OutputTag so the record
            // must be of the type that our operator expects.
            @SuppressWarnings("unchecked")
            StreamRecord&lt;T&gt; castRecord = (StreamRecord&lt;T&gt;) record;
    
            numRecordsIn.inc();
            operator.setKeyContextElement1(castRecord);
            // 直接调用下游算子处理数据
            operator.processElement(castRecord);
        }
        catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
    ...
    }

首先,Source和Sink是特殊的算子,用来数据摄取和输出。

6.2.5 Operator 并行度

Operator 并行度配置可参考这里

6.2.6 Operator数据交换与分区策略

DataStream可以在两个算子间传输数据,有以下两种模式:

  • 一对一
    例如上图中Sourcemap()算子之间。

    可保留元素的分区和排序信息(也就是说map()算子的1号实例可以相同顺序看到跟Source算子的1号实例生产顺序相同的元素)。

  • 重分发-类似MR Shuffle
    例如上图中的 map()keyBy/window算子 之间,以及 keyBy/windowSink 之间。

    会更改数据所在Stream分区。注意此时只能保证一个算子subtask发到一个下游算子subtask的元素顺序性。如上图keyBy/window 的 subtask[2] 接收到的 map() 的 subtask[1]的数据有序,但发送到Sink的所有数据中,无法确定不同key的聚合结果的到达顺序。

    每个算子subtask发送数据到不同的下游算子subtask,分发依据是具体的transformation(相关方法在org.apache.flink.streaming.api.datastream.DataStream):

    • keyBy
      按照key的值hash后重分区到某个下游算子实例

    • broadcast
      广播到所有下游算子实例分区

    • rebalance
      轮询分配到下游算子实例分区

    • global
      全部分配到第一个下游算子实例分区

    • shuffle
      随机均匀分配到下游算子实例分区

    • forward
      上下游并行度一致时,发送到对应的位于本地的下游算子分区

    • rescale
      轮询方式将输出的元素均匀分发到下游分区的子集。

      子集构建依赖于上游和下游算子的并行度。

      • 比如上游算子并行度2,下游为4,此时每个上游算子轮询各自分发到下游的两个算子。
      • 如果上游并行度4,下游为2,此时每两个上游算子分发到一个下游算子。
      • 如果不是倍数,则下游分发的源头数目不一致

6.3 TaskSlot 和 资源

6.3.1 概述

每个TaskManager都是一个 JVM 进程,并且可以在不同的Slot线程中执行多个 subtasks,Slot控制 TaskManager 并发执行 task 的数量,每个TaskManager至少一个Slot。他们的关系示例如下图,此时作业基本并行度为2,一个taskManager的slot数量为3:

6.3.2 Slot与资源隔离

每个 Slot是TaskManager资源分配和调度最小单元,代表一份固定资源子集。例如,具有三个 slot 的 TaskManager 会将其管理的内存资源平均分成三等份给每个 slot。 划分资源意味着 subtask 之间不会竞争资源,但这也意味着它们只能拥有固定的划分了的资源。

注意Slot并没有实现 CPU 隔离,当前 Slot 之间只实现了内存资源隔离划分。

通过调整 slot 的数量,用户可以决定 subtask 的隔离方式:

  • 每个 TaskManager 只有一个 slot
    意味着每个 task 在一个单独的 JVM 中运行(即每个task独享一个TM进程) 。但可以注意到,目前Flink1.11版本已经支持多个Task线程共享一个Slot,所以以上结论已经不再适用。

  • 每个 TaskManager 多个 slot
    意味着多个 subtask 共享同一个 JVM,分别在各自slot线程运行。

    在同一个 JVM 中运行的Task会共享 TCP 连接(通过IO多路复用)和心跳信息,可以减少数据的网络传输。它们还可能共享数据集和数据结构,从而降低每个 task 的性能开销。

6.3.3 Subtask共享slot

默认情况下,Flink 允许 subtask们 共享 slot,即使它们是不同 task 的 subtasks,只要它们来自同一个 job就行。因此,一个 slot 在极端场景下甚至可能会负责这个 job 的整个执行pipeline!

允许 slot sharing 有两个好处:

  • Flink 集群的Slot数量总数需要与 job 中使用的最高并行度(highest parallelism)完全相同。这样不需要计算作业总共包含多少个 tasks(具有不同并行度)。

  • 更好的资源利用率
    如果不能共享slot ,则简单的 subtask(比如source/ map等)将会占用和复杂的 subtask (如window)一样多的资源。

    而通过slot共享,将之前示例中的job最大并行度从 2 增加到 6 就可以完全利用 按slot分隔 的资源,同时确保开销大的 subtask 在 所有TaskManager上均匀分布:

6.3.4 slot与线程

一个Slot并不一定只有一个线程运行,比如上图中一个Task Slot内部就运行了2-3个线程!

这些线程共同分享Slot获得的分隔内存资源。

不同算子是否能运行在一个Slot取决于SlotSharingGroup

6.3.5 slot数量设置

根据经验,合理的 slots 数量应该和 CPU 核数相同。在使用超线程(hyper-threading)时,每个 slot 将会占用 2 个或更多的硬件线程上下文(hardware thread contexts)。

6.3.6 SlotSharingGroup

6.3.6.1 概述

Flink会将相同SlotSharingGroup的算子放到相同的slot执行,而将SlotSharingGroup不同的算子放到其他slot,这可用来隔离slot。

如果所有input算子都在相同SlotSharingGroup,则下游算子的SlotSharingGroup继承自input算子。

请记住,SlotSharingGroup软定义了不同的task(JobVertex)是否可在一个Slot中运行

6.3.6.2 配置

默认的slot sharing group的名字是default,算子也可以显示地放入指定名字的slot sharing group

someStream.filter(...).slotSharingGroup("name");

6.3.7 CoLocationGroup

CoLocationGroup是JobVertex组,硬性定义了某个JobVertex的第i个子任务必须和其他所有JobVertex的第i个子任务运行在相同TaskManager上。

6.4 Task运行模型

6.4.1 概述

  • Task运行在一个独占线程中,多个Task线程可以共享一份Slot资源
  • Source Task使用SourceFunction生产数据
  • 运算Task负责从InputGate读取数据,调用OperatorChain,最终结果输出到ResultSubPartition,通过Netty发送给下游算子的InputChannel。
  • 最后是Sink的InputChannel接收到数据后,通过调用SinkFunction相关方法发送到目的地。

6.4.2 概述

7 容错性和高可用

7.1 概述

当Flink某个任务失败挂掉时,会对失败任务和其他受影响任务重启来恢复整个作业。

Flink中主要有两类策略来控制任务重启:

  • 重启策略
    决定了是否重启、何时重启失败和受影响任务
  • Failover策略
    决定了job中的哪些任务应该被重启。

可参考

如果Flink部署在yan上,则需要依赖Yarn来实现高可用,Yarn会对失败挂掉的JobManager(AM)重启,最大重试次数的配置是yarn-site.xmlyarn.resourcemanager.am.max-attempts

另一方面,Flink还支持高可用配置。

7.2 重启策略

优先级最高的是job自己指定的,然后才是flink-conf.xml中指定的:

采用restart-strategy配置。

7.2.1 不允许Checkpoint时默认采用none

当不允许checkpoint时,采用的是none(也可以填offdisable)策略,即不重启

可用配置项:

  • restart-strategy: none

代码:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

7.2.2 开启Checkpoint时默认fixeddelay

开启checkpoint时,不配置重启策略时默认采用fixeddelay(也可以填fixed-delayfailure-rate),是一种固定间隔重启的策略,默认会1秒间隔来进行Integer.MAX_VALUE次重启尝试,超过最大次数就会导致job最终失败了。

可用配置项:

  • restart-strategy
    默认fixed-delay
  • restart-strategy.fixed-delay.attempts
    默认1,即当job失败时重试一次
  • restart-strategy.fixed-delay.delay
    重试间隔,默认1 s

代码设置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
))

7.2.3 开启Checkpoint时还可选failurerate

开启checkpoint时 还可选failurerate, (也可以填failure-rate)。按失败率重启。

失败率是指每个时间间隔内发生的失败次数。

当失败率超过设定阈值,则job最终失败了。

可用配置项:

  • restart-strategy
    failure-rate
  • restart-strategy.failure-rate.delay
    默认 1 s,指定重启间隔时间,比如20 s
  • restart-strategy.failure-rate.failure-rate-interval
    默认1 min,指定计算失败率的时间间隔
  • restart-strategy.failure-rate.max-failures-per-interval
    默认 1, 失败率最大值

代码设置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每个时间间隔的最大故障次数
  Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
  Time.of(10, TimeUnit.SECONDS) // 延时
))

7.3 Failover策略

配置项为jobmanager.execution.failover-strategy,Failover策略 相关接口为FailoverStrategy

  • 可选值full,重启job所有task。
    RestartAllFailoverStrategy

  • 默认值region,当task失败时,重启所有可能被该出错task影响的所有task。
    RestartPipelinedRegionFailoverStrategy

    该策略将task分组为不同region,当任务失败时就计算必须重启以恢复job的最小范围的region set。

    一个Region是指一个pipeline,是一些task的集合,这些task通过pipeline交换数据通讯:

    • DataStream 和 流式 Table/SQL job的所有数据交换都是 Pipelined 形式的。
    • 批处理式 Table/SQL job的所有数据交换默认都是 Batch 形式的。
    • DataSet job中的数据交换形式会根据 ExecutionConfig 中配置的 ExecutionMode决定。

    需要重启的 Region 的判断逻辑如下:

    • 出错 Task 所在 Region 需要重启。
    • 如果要重启的 Region 需要消费的数据有部分无法访问(丢失或损坏),产出该部分数据的 Region 也需要重启。
    • 需要重启的 Region 的下游 Region 也需要重启。这是出于保障数据一致性的考虑,因为一些非确定性的计算或者记录分发可导致同一个 ResultPartition 每次产生时包含的数据都不相同。
  • 单点重启
    可能无法保证一致性,但资源开销最小。

7.4 TaskManager和JobManager高可用

可参考

7.4.1 TaskManager HA

由JM保证TM恢复期间的跨TM一致性。

7.4.2 JobManager单点问题与HA

因为JM用来调度任务、管理资源,所以如果他挂了就导致整个程序失败了,这就是JM单点问题。

我们可以配置JM HA高可用,即当JM失败后进行恢复,适用于 Standalone和Yarn Cluster模式,这里我们主要分析Yarn Cluster模式。

7.4.3 Yarn Cluster模式 JM ZK HA

7.4.3.1 ZK HA概述

相关源码在ZooKeeperHaServices

ZK存储的Flink相关数据结构如下:

  • flink
    ZK存放Flink数据的根目录,通过high-availability.zookeeper.path.root配置
  • cluster_id
    某个Flink集群的数据根节点。包括Standalone模式和Flink集群模式的Cluster
  • job-id
    Flink集群上运行的job信息,包括Checkpoint信息
  • checkpoints
    触发Checkpoint快照后,每个TM会生成Snapshot,并把对应的句柄传递给JM,JM会创建全局Checkpoint并持久化,最后将文件句柄写入ZK。
  • persisted_job_graph
    JM接收到客户端提交的JobGraph后会生成并持久化ExecutionGraph。具体来说,JM会先将ExecutionGraph持久化到ZK,再开始调度Task到TM执行。

7.4.3.2 配置yarn-site.xml

配置yarn-site.xml内的yarn.resourcemanager.am.max-attempts

即am重试次数,要算上初次启动,默认值为2,即当设置了JM HA后允许JM出错重启一次

要保证JM服务自身在恢复期间的一致性,就必须使用第三方服务来存储少量的恢复用的元数据(比如最后提交的Checkpoint等),以及帮助选举哪个备选JM当Leader,避免脑裂。

  • high-availability
    定义了HA模式,可选项如下。

    • 默认NONE,不启用HA
    • ZOOKEEPER,ZK模式的HA
    • 恢复工厂类的全限定名的HA
  • high-availability.cluster-id
    定义了Flink集群ID,用来隔离不同Flink集群。

    需要为Standalone模式设置,会在YARN和Mesos中自动推断。

  • high-availability.storageDir
    Flink持久化元数据的文件系统路径(URL)

  • high-availability.zookeeper.path.root
    Flink存放集群节点元数据信息的ZK上的根目录路径,ZK上存了指向该数据的指针信息。

  • high-availability.zookeeper.quorum
    Flink HA模式时使用的ZK法定节点数(quorum)

  • high-availability.jobmanager.port

更多Flink HA Zk高级配置可参考Advanced High-availability ZooKeeper Options

配置flink-conf.yaml内的yarn.application-attemptsyarn.application-attempt-failures-validity-interval

Flink On Yarn时需要依赖Yarn来实现高可用,Yarn会对失败挂掉的JobManager(AM)重启,最大重试次数的配置是yarn-site.xml的yarn.resourcemanager.am.max-attempts

Flink的yarn-client有一些配置可以控制在container失败的情况下的行为,也可通过$FLINK_HOME/conf/flink-conf.yaml或启动yarn-session.sh时以-D参数指定:

  • yarn.application-attempts
    ApplicationMaster(运行着JobManager)及其TaskManager Container的最大失败重试次数。

    当没有设置HA时默认值为1,此时若AM挂掉就直接导致整个flink yarn session失败了。

    如果设为较高的值,使得可在失败时Yarn可多次尝试重启AM,但会导致整个Flink集群重启,而Yarn Client会丢失到该Flink Cluster的连接,JobManager的地址也会改变,所以需要在重启后手动设置JobManager的 host:port,所以推荐保留该值为1。

    如果超过该阈值,不会再重启AM,该yarn session上提交的任务也会全部停止。比如设为5时,代表初始化1次、最大重试4次。

    如果Yarn有一些额外操作(如抢占、硬件故障或reboot、NM resync等)需要重启Application,则不会计算在次数内,参考Jian Fang’s blog

    注意该值不能超过yarn中yarn.resourcemanager.am.max-attempts配置的最大重启次数。

  • yarn.application-attempt-failures-validity-interval
    默认为10000,单位毫秒

    定义yarn.application-attempts统计的时间窗口。如果设为-1则为全局统计。

    也就是说,如果定义的时间窗口内统计到的attempts达到了阈值,则会停止尝试重启,导致Job失败!

    注意Yarn Container在不同版本中的行为不太相同,在大于等于2.6.0后应将yarn.application-attempt-failures-validity-interval设置为Flink的Akka超时值。

    相关连接:

flink-conf.yaml:

high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10

8 BackPressure背压/反压

8.1 概述

如果在Flink WebUI上看到某个task发生back pressure warning,那么通俗地说,这就意味着数据生产速度大于下游算子消费速度,俗称反压/背压。

数据在Flink中从Source到Sink向下流动,而反压是反向传播的。

就拿最简单的Source -> Sink来说,如果观察到Source出现反压,则说明Sink消费速度已经跟不上Source生产速度了,所以向上游Source算子产生反压。

8.2 反压指标采样

Flink JM会周期性地调用Task.isBackPressured()方法,以从运行中的task中采样,监控反压指标。

默认每次采样会为每个task每50ms采样100次,可在WebUI观察该指标(60秒刷新一次,避免TM过载),比如0.01表示百分之1的样本发生反压。

该指标有几种情况:

  • OK: 0 <= Ratio <= 0.10

  • LOW: 0.10 < Ratio <= 0.5

  • HIGH: 0.5 < Ratio <= 1

    @Override
    public boolean isBackPressured() {
    if (invokable == null || consumableNotifyingPartitionWriters.length == 0 || !isRunning()) {
    return false;
    }
    final CompletableFuture[] outputFutures = new CompletableFuture[consumableNotifyingPartitionWriters.length];
    for (int i = 0; i < outputFutures.length; ++i) {
    outputFutures[i] = consumableNotifyingPartitionWriters[i].getAvailableFuture();
    }
    return !CompletableFuture.allOf(outputFutures).isDone();
    }

目前判断是否产生背压是通过output buffer可用性,如果没有足够的buffer可用于输出说明该Taskb受到了来自下游的反压。

8.3 JM配置

  • web.backpressure.refresh-interval
    有效的反压结果被废弃并重新进行采样的时间间隔 (默认: 60000ms, 即1 min)。
  • web.backpressure.num-samples
    用于确定反压采样的样本数 (默认: 100)。
  • web.backpressure.delay-between-samples
    用于确定反压采样的间隔时间 (默认: 50 (ms))。

总结

本章主要介绍了Flink的一些基本概念,下一章讲Flink的下载、安装和启动,请点击Flink系列2-安装和启动

好文推荐

参考文档

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器