本文已收录至Github,推荐阅读 Java随想录
微信公众号:Java随想录
目录
接前面上篇,此为下篇。
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
先来看下Flink提供的状态有哪些,Flink中状态分为两种类型:
Keyed State
基于KeyedStream上的状态,这个状态是跟特定的Key绑定,KeyedStream流上的每一个Key都对应一个State,每一个Operator可以启动多个Thread处理,但是相同Key的数据只能由同一个Thread处理,因此一个Keyed状态只能存在于某一个Thread中,一个Thread会有多个Keyed state。
Non-Keyed State(Operator State)
Operator State与Key无关,而是与Operator绑定,整个Operator只对应一个State。比如:Flink中的Kafka Connector就使用了Operator State,它会在每个Connector实例中,保存该实例消费Topic的所有(partition, offset)映射。
Flink针对Keyed State提供了以下可以保存State的数据结构
<IN, OUT>
:保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState
相反的是, 聚合类型可能与添加到状态的元素的类型不同。使用 add(IN)
添加的元素会调用用户指定的 AggregateFunction
进行聚合。ReducingState
相反,聚合类型可能与添加到状态的元素类型不同。 使用add(T)
添加的元素会调用用户指定的 FoldFunction
折叠成聚合值。案例1:使用ValueState keyed state检查车辆是否发生了急加速
object ValueStateTest {
case class CarInfo(carId: String, speed: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01", 8888)
stream.map(data => {
val arr = data.split(" ")
CarInfo(arr(0), arr(1).toLong)
}).keyBy(_.carId)
.map(new RichMapFunction[CarInfo, String]() {
//保存上一次车速
private var lastTempState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
val lastTempStateDesc = new ValueStateDescriptor[Long]("lastTempState", createTypeInformation[Long])
lastTempState = getRuntimeContext.getState(lastTempStateDesc)
}
override def map(value: CarInfo): String = {
val lastSpeed = lastTempState.value()
this.lastTempState.update(value.speed)
if ((value.speed - lastSpeed).abs > 30 && lastSpeed != 0)
"over speed" + value.toString
else
value.carId
}
}).print()
env.execute()
}
}
案例2:使用 MapState 统计单词出现次数
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//MapState 实现 WordCount
object KeyedStateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromCollection(List("I love you","hello spark","hello flink","hello hadoop"))
val pairStream = stream.flatMap(_.split(" ")).map((_,1)).keyBy(_._1)
pairStream.map(new RichMapFunction[(String,Int),(String,Int)] {
private var map:MapState[String,Int] = _
override def open(parameters: Configuration): Unit = {
//定义map state存储的数据类型
val desc = new MapStateDescriptor[String,Int]("sum",createTypeInformation[String],createTypeInformation[Int])
//注册map state
map = getRuntimeContext.getMapState(desc)
}
override def map(value: (String, Int)): (String, Int) = {
val key = value._1
val v = value._2
if(map.contains(key)){
map.put(key,map.get(key) + 1)
}else{
map.put(key,1)
}
val iterator = map.keys().iterator()
while (iterator.hasNext){
val key = iterator.next()
println("word:" + key + "\t count:" + map.get(key))
}
value
}
}).setParallelism(3)
env.execute()
}
}
案例3:使用ReducingState统计每辆车的速度总和
import com.msb.state.ValueStateTest.CarInfo
import org.apache.flink.api.common.functions.{ReduceFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//统计每辆车的速度总和
object ReduceStateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01", 8888)
stream.map(data => {
val arr = data.split(" ")
CarInfo(arr(0), arr(1).toLong)
}).keyBy(_.carId)
.map(new RichMapFunction[CarInfo, CarInfo] {
private var reduceState: ReducingState[Long] = _
override def map(elem: CarInfo): CarInfo = {
reduceState.add(elem.speed)
println("carId:" + elem.carId + " speed count:" + reduceState.get())
elem
}
override def open(parameters: Configuration): Unit = {
val reduceDesc = new ReducingStateDescriptor[Long]("reduceSpeed", new ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
}, createTypeInformation[Long])
reduceState = getRuntimeContext.getReducingState(reduceDesc)
}
})
env.execute()
}
}
案例4:使用AggregatingState统计每辆车的速度总和
import com.msb.state.ValueStateTest.CarInfo
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor, ReducingState, ReducingStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//统计每辆车的速度总和
object ReduceStateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("node01", 8888)
stream.map(data => {
val arr = data.split(" ")
CarInfo(arr(0), arr(1).toLong)
}).keyBy(_.carId)
.map(new RichMapFunction[CarInfo, CarInfo] {
private var aggState: AggregatingState[Long,Long] = _
override def map(elem: CarInfo): CarInfo = {
aggState.add(elem.speed)
println("carId:" + elem.carId + " speed count:" + aggState.get())
elem
}
override def open(parameters: Configuration): Unit = {
val aggDesc = new AggregatingStateDescriptor[Long,Long,Long]("agg",new AggregateFunction[Long,Long,Long] {
//初始化累加器值
override def createAccumulator(): Long = 0
//往累加器中累加值
override def add(value: Long, acc: Long): Long = acc + value
//返回最终结果
override def getResult(accumulator: Long): Long = accumulator
//合并两个累加器值
override def merge(a: Long, b: Long): Long = a+b
},createTypeInformation[Long])
aggState = getRuntimeContext.getAggregatingState(aggDesc)
}
})
env.execute()
}
}
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的enableCheckpointing()方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000);
这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。
检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。具体可以通过调用检查点配置的 setCheckpointStorage()来配置,需要传入一个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)。对于实际生产应用,我们一般会将 CheckpointStorage 配置为高可用的分布式文件系统(HDFS,S3 等)。
Flink中基于异步轻量级的分布式快照技术提供了Checkpoint容错机制,分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理,包括上面提到的用户自定义使用的Keyed State和Operator State,当未来程序出现问题,可以基于保存的快照容错。
Flink会在输入的数据集上间隔性地生成checkpoint barrier,通过栅栏(barrier)将间隔时间段内的数据划分到相应的checkpoint中。当程序出现异常时,Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。例如在KafkaConsumer算子中维护offset状态,当系统出现问题无法从Kafka中消费数据时,可以将offset记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。
默认情况Flink不开启检查点,用户需要在程序中通过调用方法配置和开启检查点,另外还可以调整其他相关参数
Checkpoint开启和时间间隔指定
开启检查点并且指定检查点时间间隔为1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值
env.enableCheckpointing(1000)
exactly-ance和at-least-once语义选择
选择exactly-once语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,与此同时,Flink的性能也相对较弱,而at-least-once语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。如下通过setCheckpointingMode()方法来设定语义模式,默认情况下使用的是exactly-once模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
Checkpoint超时时间
超时时间指定了每次Checkpoint执行过程中的上限时间范围,一旦Checkpoint执行时间超过该阈值,Flink将会中断Checkpoint过程,并按照超时处理。该指标可以通过setCheckpointTimeout方法设定,默认为10分钟
env.getCheckpointConfig.setCheckpointTimeout(5 * 60 * 1000)
Checkpoint之间最小时间间隔
该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
最大并行执行的Checkpoint数量
在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
任务取消后,是否删除Checkpoint中保存的数据
设置为RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留CheckPoint数据,以便根据实际需要恢复到指定的CheckPoint
设置为DELETE_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会删除CheckPoint数据,只有Job执行失败的时候才会保存CheckPoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
容忍的检查的失败数
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制。Savepoints是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
要使用Savepoints,需要按照以下步骤进行:
配置状态后端: 在Flink中,状态可以保存在不同的后端存储中,例如内存、文件系统或分布式存储系统(如HDFS)。要启用Savepoint,您需要在Flink配置文件中配置合适的状态后端。通常,使用分布式存储系统作为状态后端是比较常见的做法,因为它可以提供更好的可靠性和容错性。
生成Savepoint: 在您的Flink应用程序运行时,可以通过以下方式手动触发生成Savepoint:
bin/flink savepoint <jobID> [targetDirectory]
其中,<jobID>
是您要保存状态的Flink作业的Job ID,[targetDirectory]
是可选的目标目录,用于保存Savepoint数据。如果没有提供targetDirectory
,Savepoint将会保存到Flink配置中所配置的状态后端中。
恢复Savepoint: 要恢复到Savepoint状态,可以通过以下方式提交作业:
bin/flink run -s :savepointPath [:runArgs]
其中,savepointPath
是之前生成的Savepoint的路径,runArgs
是您提交作业时的其他参数。
确保应用程序状态的兼容性: 在使用Savepoints时,应用程序的状态结构和代码必须与生成Savepoint的版本保持兼容。这意味着在更新应用程序代码后,可能需要做一些额外的工作来保证状态的向后兼容性,以便能够成功恢复到旧的Savepoint。
在Flink中提供了StateBackend来存储和管理状态数据
Flink一共实现了三种类型的状态管理器:MemoryStateBackend、FsStateBackend、RocksDBStateBackend
基于内存的状态管理器将状态数据全部存储在JVM堆内存中。基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。
Flink将MemoryStateBackend作为默认状态后端管理器
env.setStateBackend(new MemoryStateBackend(100*1024*1024))
注意:聚合类算子的状态会同步到JobManager内存中,因此对于聚合类算子比较多的应用会对JobManager的内存造成一定的压力,进而影响集群。
和MemoryStateBackend有所不同,FsStateBackend是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统
env.setStateBackend(new FsStateBackend("path",true))
如果path是本地文件路径,其格式:file:///
如果path是HDFS文件路径,格式为:hdfs://
第二个参数代表是否异步保存状态数据到HDFS,异步方式能够尽可能避免checkpoint的过程中影响流式计算任务。FsStateBackend更适合任务量比较大的应用,例如:包含了时间范围非常长的窗口计算,或者状态比较大的场景。
RocksDBStateBackend是Flink中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend需要单独引入相关的依赖包到工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.2</version>
</dependency>
env.setStateBackend(new RocksDBStateBackend("hdfs://"))
RocksDBStateBackend采用异步的方式进行状态数据的Snapshot,任务中的状态数据首先被写入本地RockDB中,这样在RockDB仅会存储正在进行计算的热数据,而需要进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。
与FsStateBackend相比,RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。
全局配置需要需改集群中的配置文件,修改flink-conf.yaml
配置FsStateBackend
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
配置MemoryStateBackend
state.backend: jobmanager
配置RocksDBStateBackend
state.backend.rocksdb.checkpoint.transfer.thread.num: 1 同时操作RocksDB的线程数
state.backend.rocksdb.localdir: 本地path RocksDB存储状态数据的本地文件路径
在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有数据都到齐了才开始处理。所以聚合计算其实在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。
说白了窗口就是将无界流通过窗口切割成一个个的有界流,窗口是左开右闭的。
Flink中的窗口分为两类:基于时间的窗口(Time-based Window)和基于数量的窗口(Count-based Window)。
时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。
计数窗口包含了:滚动计数窗口和滑动计数窗口。
时间窗口、计数窗口只是对窗口的一个大致划分。在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以由不同的功能应用。
根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。
滚动窗口每个窗口的大小固定,且相邻两个窗口之间没有重叠。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有窗口大小,我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。
基于时间的滚动窗口:
DataStream<T> input = ...
// tumbling event-time windows
input
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function> (...)
// tumbling processing-time windows
input
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function> (...)
在上面的代码中,我们使用了TumblingEventTimeWindows
和TumblingProcessingTimeWindows
来创建基于Event Time或Processing Time的滚动时间窗口。窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time
中的seconds
、minutes
、hours
和days
来设置。
基于计数的滚动窗口:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TumblingCountWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
input
.keyBy(value -> 1)
.countWindow(3)
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
})
.print();
env.execute();
}
}
在上面的代码中,我们使用了countWindow
方法来创建一个基于数量的滚动窗口,窗口大小为3个元素。当窗口中的元素数量达到3时,窗口就会触发计算。在这个例子中,我们使用了reduce
函数来对窗口中的元素进行求和。
滑动窗口的大小固定,但窗口之间不是首尾相接,而有部分重合。同样,滑动窗口也可以基于时间和计算定义。
滑动窗口的参数有两个:窗口大小和滑动步长。滑动步长是固定的。
基于时间的滑动窗口:
DataStream<T> input = ...
// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function> (...)
基于计数的滑动窗口:
DataStream<T> input = ...
input
.keyBy(...)
.countWindow(10, 5)
.<window function> (...)
countWindow
方法来创建一个基于计数的滑动窗口,窗口大小为10个元素,滑动步长为5个元素。当窗口中的元素数量达到10时,窗口就会触发计算。
会话窗口是Flink中一种基于时间的窗口类型,每个窗口的大小不固定,且相邻两个窗口之间没有重叠。“会话”终止的标志就是隔一段时间没有数据来:
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<T> input = ...
input
.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function> (...)
在上面的代码中,使用了EventTimeSessionWindows
来创建基于Event Time的会话窗口。withGap
方法用来设置会话窗口之间的间隔时间,当两个元素之间的时间差超过这个值时,它们就会被分配到不同的会话窗口中。
在Flink中,数据流可以按键分区(keyed)或非按键分区(non-keyed)。按键分区是指将数据流根据特定的键值进行分区,使得相同键值的元素被分配到同一个分区中。这样可以保证相同键值的元素由同一个worker实例处理。只有按键分区的数据流才能使用键分区状态和计时器。
非按键分区是指数据流没有根据特定的键值进行分区。这种情况下,数据流中的元素可以被任意分配到不同的分区中。
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)来开窗,还是直接在没有按键分区的DataStream上开窗。也就是在调用窗口算子之前是否有keyBy操作。
按键分区窗口:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class KeyedWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
input
.keyBy(value -> 1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
})
.print();
env.execute();
}
}
在上面的代码中,使用了keyBy
方法来对数据流进行按键分区,然后使用window
方法来创建一个基于Event Time的滚动时间窗口。在这个例子中,我们使用了reduce
函数来对窗口中的元素进行求和。
非按键分区窗口:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class NonKeyedWindowExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);
AllWindowedStream<Long, ?> windowedStream = input.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
windowedStream.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}).print();
env.execute();
}
}
在上面的代码中,使用了windowAll
方法来对非按键分区的数据流进行窗口操作。windowAll
方法接受一个WindowAssigner
参数,用来指定窗口类型。然后使用了reduce
函数来对窗口中的元素进行求和。
按键分区窗口(Keyed Windows)经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
非按键分区(Non-Keyed Windows)如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。所以在实际应用中一般不推荐使用这种方式
所谓的“窗口函数”(window functions),就是定义窗口如何进行计算的操作。
窗口函数根据处理的方式可以分为两类:增量聚合函数和全量聚合函数。
增量聚合函数每来一条数据就立即进行计算,中间保持着聚合状态;但是不立即输出结果。等到窗口到了结束时间需要输出计算结果的时候,取出之前聚合的状态直接输出。
常见的增量聚合的函数有:reduce(reduceFunction)、aggregate(aggregateFunction)、sum()、min()、max()。
下面是一个使用增量聚合函数的Java代码示例:
DataStream<Tuple2<String, Integer>> input = ...
input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {
return new Tuple2<>(t0.f0, t0.f1 + t1.f1);
}
});
这段代码首先使用keyBy
方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的时间窗口,并使用reduce
方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是将具有相同key(即f0相同)的元素的第二个元素(f1)相加。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值之和的数据流。
另外还有一个常用的函数是聚合函数(AggregateFunction),ReduceFunction和AggregateFunction都是增量聚合函数,但它们之间有一些区别。AggregateFunction则更加灵活,ReduceFunction的输入类型、输出类型和中间状态类型必须相同,而AggregateFunction则允许这三种类型不同。
例如,如果我们希望计算一组数据的平均值,应该怎样做聚合呢?这时我们需要计算两个状态量:数据的总和(sum),以及数据的个数(count),而最终输出结果是两者的商(sum/count)。如果用ReduceFunction,那么我们应该先把数据转换成二元组 (sum, count)的形式,然后进行归约聚合,最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务,可我们却需要 map-reduce-map 三步操作,这显然不够高效。而使用AggregateFunction则可以更加简单地实现这个需求。
下面是使用AggregateFunction计算平均值的代码示例:
DataStream<Tuple2<String, Double>> input = ...
input
.keyBy(new KeySelector<Tuple2<String, Double>, String>() {
@Override
public String getKey(Tuple2<String, Double> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String, Double>, Tuple2<Double, Integer>, Double>() {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0, 0);
}
@Override
public Tuple2<Double, Integer> add(Tuple2<String, Double> value, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
});
这段代码首先使用keyBy
方法按照Tuple2中的第一个元素(f0)进行分组。然后,它定义了一个5秒的翻滚事件时间窗口,并使用aggregate
方法对每个窗口内的数据进行聚合操作。在这个例子中,聚合操作是计算具有相同key(即f0相同)的元素的第二个元素(f1)的平均值。最终,这段代码将输出一个包含每个key在每个5秒窗口内f1值平均值的数据流。
全量聚合函数(Full Window Functions)是指在整个窗口中的所有数据都准备好后才进行计算。Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。
与增量聚合函数不同,全窗口函数可以访问窗口中的所有数据,因此可以执行更复杂的计算。例如,可以计算窗口中数据的中位数,或者对窗口中的数据进行排序。
WindowFunction接收一个Iterable类型的输入,其中包含了窗口中所有的数据。ProcessWindowFunction则更加强大,它不仅可以访问窗口中的所有数据, 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。WindowFunction作用可以被 ProcessWindowFunction 全覆盖。一般在实际应用,用 ProcessWindowFunction比较多,直接使用 ProcessWindowFunction 就可以了。
下面是使用WindowFunction计算窗口内数据总和的代码示例:
public class SumWindowFunction extends WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
int sum = 0;
for (Tuple2<String, Integer> value : input) {
sum += value.f1;
}
out.collect(new Tuple2<>(key, sum));
}
}
DataStream<Tuple2<String, Integer>> input = ...
input.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new SumWindowFunction());
下面是一个使用ProcessWindowFunction统计网站1天UV的代码示例。在这个例子中,我们使用了状态来存储每个窗口中访问过网站的用户ID,以便在窗口结束时计算UV。此外,我们还使用了定时器,在窗口结束时触发计算UV的操作。我们还使用了context对象来获取窗口的开始时间和结束时间,并将它们输出到结果中:
public class UVProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, String>, Tuple3<String, Long, Integer>, String, TimeWindow> {
private ValueState<Set<String>> userIdState; // 状态,用来存储每个窗口中访问过网站的用户ID
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化状态
ValueStateDescriptor<Set<String>> stateDescriptor = new ValueStateDescriptor<>("userIdState", new SetTypeInfo<>(Types.STRING));
userIdState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void process(String key, Context context, Iterable<Tuple2<String, String>> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
Set<String> userIds = userIdState.value();
if (userIds == null) {
userIds = new HashSet<>();
}
for (Tuple2<String, String> value : input) {
userIds.add(value.f0); // 将用户ID添加到状态中
}
userIdState.update(userIds);
context.timerService().registerEventTimeTimer(context.window().getEnd()); // 注册定时器,在窗口结束时触发计算UV的操作
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
super.onTimer(timestamp, ctx, out);
Set<String> userIds = userIdState.value();
if (userIds != null) {
long windowStart = ctx.window().getStart();
out.collect(new Tuple3<>(ctx.getCurrentKey(), windowStart, userIds.size())); // 计算UV并输出结果,包括窗口的开始时间和结束时间
userIdState.clear(); // 清空状态
}
}
}
DataStream<Tuple2<String, String>> input = ... // 输入数据流,其中第一个字段为用户ID,第二个字段为网站URL
input.keyBy(new KeySelector<Tuple2<String, String>, String>() {
@Override
public String getKey(Tuple2<String, String> value) throws Exception {
return value.f1; // 按照网站URL分组
}
})
.window(TumblingEventTimeWindows.of(Time.days(1))) // 设置窗口大小为1天
.process(new UVProcessWindowFunction());
全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。所以运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。
增量聚合的优点:高效,输出更加实时。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。
全窗口的优点:提供更多的信息,可以认为是更加“通用”的窗口操作。
它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者ProcessWindowFunction。
// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
// AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
下面我们举一个具体的实例来说明。在网站的各种统计指标中,一个很重要的统计指标就是热门的链接,想要得到热门的 url,前提是得到每个链接的“热门度”。一般情况下,可以用url 的浏览量(点击量)表示热门度。我们这里统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。我们可以定义滑动窗口,并结合增量聚合函数和全窗口函数来得到统计结果:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class UrlCountViewExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(100);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序流的watermark生成
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
stream.print("input");
//统计每个url的访问量
stream.keyBy(data -> data.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new UrlViewCountAgg(),new UrlViewCountResult())
.print();
env.execute();
}
//增量聚合,来一条数据 + 1
public static class UrlViewCountAgg implements AggregateFunction<Event,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
//包装窗口信息,输出UrlViewCount
public static class UrlViewCountResult extends ProcessWindowFunction<Long,UrlViewCount,String, TimeWindow>{
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
Long count = elements.iterator().next();
out.collect(new UrlViewCount(s,count,start,end));
}
}
}
为了方便处理,单独定义了一个POJO类,来表示输出结果的数据类型
public class UrlViewCount {
public String url;
public Long count;
public Long windowStart;
public Long windowEnd;
public UrlViewCount() {
}
public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
this.url = url;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "UrlViewCount{" +
"url='" + url + '\'' +
", count=" + count +
", windowStart=" + new Timestamp(windowStart) +
", windowEnd=" + new Timestamp(windowEnd) +
'}';
}
}
代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一,得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。
窗口重叠是指在使用滑动窗口时,多个窗口之间存在重叠部分。这意味着同一批数据可能会被多个窗口同时处理。
例如,假设我们有一个数据流,它包含了0到9的整数。我们定义了一个大小为5的滑动窗口,滑动距离为2。那么,我们将会得到以下三个窗口:
在这个例子中,窗口1和窗口2之间存在重叠部分,即2, 3, 4。同样,窗口2和窗口3之间也存在重叠部分,即4, 5, 6。
enableOptimizeWindowOverlap
方法是用来启用Flink的窗口重叠优化功能的。它可以减少计算重叠窗口时的计算量。
在我之前给出的代码示例中,我没有使用enableOptimizeWindowOverlap
方法来启用窗口重叠优化功能。这意味着Flink不会尝试优化计算重叠窗口时的计算量。
如果你想使用窗口重叠优化功能,你可以在你的代码中添加以下行:
env.getConfig().enableOptimizeWindowOverlap();
这将启用窗口重叠优化功能,Flink将尝试优化计算重叠窗口时的计算量。
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,这块了解一下即可。
在 Apache Flink 中,移除器(Evictor)是用于在滚动窗口或会话窗口中控制数据保留和清理的组件。它可以根据特定的策略从窗口中删除一些数据,以确保窗口中保留的数据量不超过指定的限制。移除器通常与窗口分配器一起使用,窗口分配器负责确定数据属于哪个窗口,而移除器则负责清理窗口中的数据。
以下是一个使用 Flink 移除器的代码示例,演示如何在滚动窗口中使用基于计数的移除器。
javaCopy codeimport org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class FlinkEvictorExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个包含整数和时间戳的流
DataStream<Tuple2<Integer, Long>> dataStream = env.fromElements(
Tuple2.of(1, System.currentTimeMillis()),
Tuple2.of(2, System.currentTimeMillis() + 1000),
Tuple2.of(3, System.currentTimeMillis() + 2000),
Tuple2.of(4, System.currentTimeMillis() + 3000),
Tuple2.of(5, System.currentTimeMillis() + 4000),
Tuple2.of(6, System.currentTimeMillis() + 5000)
);
// 在滚动窗口中使用基于计数的移除器,保留最近3个元素
dataStream
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(CountTrigger.of(3))
.evictor(CountEvictor.of(3))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
.print();
env.execute("Flink Evictor Example");
}
// 自定义聚合函数
private static class MyAggregateFunction implements AggregateFunction<Tuple2<Integer, Long>, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Tuple2<Integer, Long> value, Integer accumulator) {
return accumulator + 1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
}
// 自定义处理窗口函数
private static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, String, Integer, TimeWindow> {
private transient ListState<Integer> countState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<Integer> descriptor = new ListStateDescriptor<>("countState", Integer.class);
countState = getRuntimeContext().getListState(descriptor);
}
@Override
public void process(Integer key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
int count = elements.iterator().next();
countState.add(count);
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
String result = "Window: " + windowStart + " to " + windowEnd + ", Count: " + countState.get().iterator().next();
out.collect(result);
}
}
}
在上述示例中,创建了一个包含整数和时间戳的数据流,并使用基于计数的移除器将滚动窗口的大小限制为最近的3个元素。在聚合函数中,我们简单地将元素的数量累加起来,并在处理窗口函数中收集结果。最后,我们打印窗口的开始时间、结束时间和元素数量。
Flink定义了三类时间
Flink流式计算的时候需要显示定义时间语义,根据不同的时间语义来处理数据,比如指定的时间语义是事件时间,那么我们就要切换到事件时间的世界观中,窗口的起始与终止时间都是以事件时间为依据
在Flink中默认使用的是Process Time,如果要使用其他的时间语义,在执行环境中可以设置
//设置时间语义为Ingestion Time
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
//设置时间语义为Event Time 我们还需要指定一下数据中哪个字段是事件时间(下文会讲)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
基于事件时间的Window操作
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object EventTimeWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("node01", 8888).assignAscendingTimestamps(data => {
val splits = data.split(" ")
splits(0).toLong
})stream
.flatMap(x=>x.split(" ").tail)
.map((_, 1))
.keyBy(_._1)
// .timeWindow(Time.seconds(10))
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((v1: (String, Int), v2: (String, Int)) => {
(v1._1, v1._2 + v2._2)
})
.print()env.execute()
}
}
Watermark本质就是时间戳,说白了Watermark就是来处理迟到数据的。
在使用Flink处理数据的时候,数据通常都是按照事件产生的时间(事件时间)的顺序进入到Flink,但是在遇到特殊情况下,比如遇到网络延迟或者使用Kafka(多分区) 很难保证数据都是按照事件时间的顺序进入Flink,很有可能是乱序进入。
如果使用的是事件时间这个语义,数据一旦是乱序进入,那么在使用Window处理数据的时候,就会出现延迟数据不会被计算的问题
举例: Window窗口长度10s,滚动窗口
001 zs 2020-04-25 10:00:01
001 zs 2020-04-25 10:00:02
001 zs 2020-04-25 10:00:03
001 zs 2020-04-25 10:00:11 窗口触发执行
001 zs 2020-04-25 10:00:05 延迟数据,不会被上一个窗口所计算导致计算结果不正确
Watermark+Window可以很好的解决延迟数据的问题。
Flink窗口计算的过程中,如果数据全部到达就会到窗口中的数据做处理,如果过有延迟数据,那么窗口需要等待全部的数据到来之后,再触发窗口执行,需要等待多久?不可能无限期等待,我们用户可以自己来设置延迟时间,这样就可以尽可能保证延迟数据被处理。
根据用户指定的延迟时间生成水印(Watermak = 最大事件时间-指定延迟时间),当Watermak 大于等于窗口的停止时间,这个窗口就会被触发执行。
举例:Window窗口长度10s(01~10),滚动窗口,指定延迟时间3s
001 ls 2020-04-25 10:00:01 wm:2020-04-25 09:59:58
001 ls 2020-04-25 10:00:02 wm:2020-04-25 09:59:59
001 ls 2020-04-25 10:00:03 wm:2020-04-25 10:00:00
001 ls 2020-04-25 10:00:09 wm:2020-04-25 10:00:06
001 ls 2020-04-25 10:00:12 wm:2020-04-25 10:00:09
001 ls 2020-04-25 10:00:08 wm:2020-04-25 10:00:05 延迟数据
001 ls 2020-04-25 10:00:13 wm:2020-04-25 10:00:10
如果没有Watermark在倒数第三条数据来的时候,就会触发执行,那么倒数第二条的延迟数据就不会被计算,那么有了水印可以处理延迟3s内的数据。
注意:如果数据不会乱序进入Flink,没必要使用Watermark
DataStream API提供了自定义水印生成器和内置水印生成器。
周期性水印(Periodic Watermark)根据事件或者处理时间周期性的触发水印生成器(Assigner),默认100ms,每隔100毫秒自动向流里注入一个Watermark
周期性水印API 1:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(100)
val stream = env.socketTextStream("node01", 8888).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)) {
override def extractTimestamp(element: String): Long = {
element.split(" ")(0).toLong
}
})
周期性水印API 2:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object EventTimeDelayWindow {
class MyTimestampAndWatermarks(delayTime:Long) extends AssignerWithPeriodicWatermarks[String] {var maxCurrentWatermark: Long = _
//水印=最大事件时间-延迟时间 后被调用 水印是递增,小于上一个水印不会被发射出去
override def getCurrentWatermark: Watermark = {
//产生水印
new Watermark(maxCurrentWatermark - delayTime)
}
//获取当前的时间戳 先被调用
override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
val currentTimeStamp = element.split(" ")(0).toLong
maxCurrentWatermark = math.max(currentTimeStamp,maxCurrentWatermark)
currentTimeStamp
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(100)
val stream = env.socketTextStream("node01", 8888).assignTimestampsAndWatermarks(new MyTimestampAndWatermarks(3000L))stream
.flatMap(x => x.split(" ").tail)
.map((_, 1))
.keyBy(_._1)
// .timeWindow(Time.seconds(10))
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
val start = context.window.getStart
val end = context.window.getEnd
var count = 0
for (elem <- elements) {
count += elem._2
}
println("start:" + start + " end:" + end + " word:" + key + " count:" + count)
}
})
.print()
env.execute()
}
}
间歇性水印生成器
间歇性水印(Punctuated Watermark)在观察到事件后,会依据用户指定的条件来决定是否发射水印。
比如,在车流量的数据中,001卡口通信经常异常,传回到服务器的数据会有延迟问题,其他的卡口都是正常的,那么这个卡口的数据需要打上水印。
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
object PunctuatedWatermarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//卡口号、时间戳
env.socketTextStream("node01", 8888)
.map(data => {
val splits = data.split(" ")
(splits(0), splits(1).toLong)
})
.assignTimestampsAndWatermarks(new myWatermark(3000))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.reduce((v1: (String, Long), v2: (String, Long)) => {
(v1._1 + "," + v2._1, v1._2 + v2._2)
}).print()env.execute()
}
class myWatermark(delay: Long) extends AssignerWithPunctuatedWatermarks[(String, Long)] {
var maxTimeStamp:Long = _override def checkAndGetNextWatermark(elem: (String, Long), extractedTimestamp: Long): Watermark = {
maxTimeStamp = extractedTimestamp.max(maxTimeStamp)
if ("001".equals(elem._1)) {
new Watermark(maxTimeStamp - delay)
} else {
new Watermark(maxTimeStamp)
}
}
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
element._2
}
}
}
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了、本该被丢弃的数据。
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。
sideOutputLateData() 方法,传入一个输出标签,用来标记分治的迟到数据流
DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。
在Flink实际开发过程中,可能会遇到source 进来的数据,需要连接数据库里面的字段,再做后面的处理,比如,想要通过id获取对应的地区名字,这时候需要通过id查询地区维度表,获取具体的地区名。
对于不同的应用场景,关联维度表的方式不同
场景1:维度表信息基本不发生改变,或者发生改变的频率很低。
实现方案:采用Flink提供的CachedFile。
Flink提供了一个分布式缓存(CachedFile),类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在TaskManager节点中,防止task重复拉取。 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。 当程序执行,Flink自动将文件或者目录复制到所有TaskManager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从TaskManager节点的本地文件系统访问它。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerCachedFile("/root/id2city","id2city")
val socketStream = env.socketTextStream("node01",8888)
val stream = socketStream.map(_.toInt)
stream.map(new RichMapFunction[Int,String] {private val id2CityMap = new mutable.HashMap[Int,String]()
override def open(parameters: Configuration): Unit = {
val file = getRuntimeContext().getDistributedCache().getFile("id2city")
val str = FileUtils.readFileUtf8(file)
val strings = str.split("\r\n")
for(str <- strings){
val splits = str.split(" ")
val id = splits(0).toInt
val city = splits(1)
id2CityMap.put(id,city)
}
}
override def map(value: Int): String = {
id2CityMap.getOrElse(value,"not found city")
}
}).print()
env.execute()
在集群中查看对应TaskManager的log日志,发现注册的file会被拉取到各个TaskManager的工作目录区。
场景2:对于维度表更新频率比较高并且对于查询维度表的实时性要求比较高
实现方案:使用定时器,定时加载外部配置文件或者数据库
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.socketTextStream("node01",8888)stream.map(new RichMapFunction[String,String] {
private val map = new mutable.HashMap[String,String]()
override def open(parameters: Configuration): Unit = {
println("init data ...")
query()
val timer = new Timer(true)
timer.schedule(new TimerTask {
override def run(): Unit = {
query()
}
//1s后,每隔2s执行一次
},1000,2000)
}
def query()={
val source = Source.fromFile("D:\\code\\StudyFlink\\data\\id2city","UTF-8")
val iterator = source.getLines()
for (elem <- iterator) {
val vs = elem.split(" ")
map.put(vs(0),vs(1))
}
}
override def map(key: String): String = {
map.getOrElse(key,"not found city")
}
}).print()
env.execute()</code></pre></li>
场景3:对于维度表更新频率高并且对于查询维度表的实时性要求高
实现方案:将更改的信息同步值Kafka配置Topic中,然后将kafka的配置流信息变成广播流,广播到业务流的各个线程中。
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置连接kafka的配置信息
val props = new Properties()
//注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据)
props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
props.setProperty("group.id","flink-kafka-001")
props.setProperty("key.deserializer",classOf[StringSerializer].getName)
props.setProperty("value.deserializer",classOf[StringSerializer].getName)
val consumer = new FlinkKafkaConsumer[String]("configure",new SimpleStringSchema(),props)
//从topic最开始的数据读取
// consumer.setStartFromEarliest()
//从最新的数据开始读取
consumer.setStartFromLatest()
//动态配置信息流
val configureStream = env.addSource(consumer)
//业务流
val busStream = env.socketTextStream("node01",8888)
val descriptor = new MapStateDescriptor[String, String]("dynamicConfig",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
//设置广播流的数据描述信息
val broadcastStream = configureStream.broadcast(descriptor)
//connect关联业务流与配置信息流,broadcastStream流中的数据会广播到下游的各个线程中
busStream.connect(broadcastStream)
.process(new BroadcastProcessFunction[String,String,String] {
override def processElement(line: String, ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = {
val broadcast = ctx.getBroadcastState(descriptor)
val city = broadcast.get(line)
if(city == null){
out.collect("not found city")
}else{
out.collect(city)
}
} //kafka中配置流信息,写入到广播流中
override def processBroadcastElement(line: String, ctx: BroadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
val broadcast = ctx.getBroadcastState(descriptor)
//kafka中的数据
val elems = line.split(" ")
broadcast.put(elems(0),elems(1))
}
}).print()
env.execute()
在Spark中有DataFrame这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。Flink也提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API以及SQL能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套API编写流式应用和批量应用,从而达到真正意义的批流统一
在 Flink 1.8 架构里,如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。 Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,阿里巴巴的 Blink 团队在这方面做了大量的工作,已经实现了 Table API & SQL 层的流批统一。阿里巴巴已经将 Blink 开源回馈给 Flink 社区。
在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能,取名叫: Blink Planner。在使用Table API和SQL开发Flink应用之前,通过添加Maven的依赖配置到项目中,在本地工程中引入相应的依赖库,库中包含了Table API和SQL接口。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
和DataStream API一样,Table API和SQL中具有相同的基本编程模型。首先需要构建对应的TableEnviroment创建关系型编程环境,才能够在程序中使用Table API和SQL来编写应用程序,另外Table API和SQL接口可以在应用中同时使用,Flink SQL基于Apache Calcite框架实现了SQL标准协议,是构建在Table API之上的更高级接口。
首先需要在环境中创建TableEnvironment对象,TableEnvironment中提供了注册内部表、执行Flink SQL语句、注册自定义函数等功能。根据应用类型的不同,TableEnvironment创建方式也有所不同,但是都是通过调用create()方法创建
流计算环境下创建TableEnviroment:
//创建流式计算的上下文环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(env)
Table API 顾名思义,就是基于“表”(Table)的一套 API,专门为处理表而设计的,它提供了关系型编程模型,可以用来处理结构化数据,支持表和视图的概念。在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了,非常实用。
下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从CSV文件中读取数据,然后执行简单的查询并将结果写入到另一个CSV文件中。
首先我们需要导入maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
</dependency>
代码示例如下:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
public class TableAPIExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
DataSet<Tuple2<String, Integer>> data = env.readCsvFile("input.csv")
.includeFields("11")
.types(String.class, Integer.class);
Table table = tableEnv.fromDataSet(data, "name, age");
tableEnv.createTemporaryView("people", table);
Table result = tableEnv.sqlQuery("SELECT name, age FROM people WHERE age > 30");
DataSet<Tuple2<String, Integer>> output = tableEnv.toDataSet(result, Tuple2.class);
output.writeAsCsv("output.csv");
env.execute();
}
}
在这个例子中,使用readCsvFile
方法从CSV文件中读取数据,并使用includeFields
和types
方法指定要包含的字段和字段类型。接下来,使用fromDataSet
方法将数据集转换为表,并使用createTemporaryView
方法创建一个临时视图。然后,使用sqlQuery
方法执行SQL查询,并使用toDataSet
方法将结果转换为数据集。最后,使用writeAsCsv
方法将结果写入到CSV文件中,并使用execute
方法启动执行。
除了上面这种写法外,我们还有下面2种写法:
//这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明
//“$”符号用来指定表中的一个字段。代码和直接执行SQL是等效的。
Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"),$("user"))
//这其实是一种简略的写法,我们将 Table 对象名 eventTable 直接以字符串拼接的形式添加到 SQL 语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视图的步骤。
Table clickTable = tableEnvironment.sqlQuery("select url, user from " +eventTable);
在环境中注册之后,我们就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
得到的 newTable 是一个中间转换结果,如果之后又希望直接使用这个表执行 SQL,又该怎么做呢?由于 newTable 是一个 Table 对象,并没有在表环境中注册;所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用:
tableEnv.createTemporaryView("NewTable", newTable);
这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图” (createTemporaryView)。
// 将表转换成数据流,并打印
tableEnv.toDataStream(aliceVisitTable).print();
// 将数据流转换成表。
// 另外,我们还可以在 fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:
Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));
在Flink中,动态表(Dynamic Tables)是一种特殊的表,它可以随时间变化。它们通常用于表示无限流数据,例如事件流或服务器日志。与静态表不同,动态表可以在运行时插入、更新和删除行。
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作持续查询(Continuous Query)。
下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从Kafka主题中读取数据,然后执行持续查询并将结果写入到另一个Kafka主题中。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class DynamicTableExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE input (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'input-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
tableEnv.executeSql("CREATE TABLE output (" +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'output-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
Table result = tableEnv.sqlQuery("SELECT name, age FROM input WHERE age > 30");
tableEnv.toAppendStream(result, Row.class).print();
result.executeInsert("output");
env.execute();
}
}
在这个例子中,首先创建了一个StreamExecutionEnvironment
来设置执行环境,并使用StreamTableEnvironment.create
方法创建了一个StreamTableEnvironment
。然后,使用executeSql
方法创建了两个Kafka表:一个用于读取输入数据,另一个用于写入输出数据。接下来,使用sqlQuery
方法执行持续查询,并使用toAppendStream
方法将结果转换为数据流。最后,使用executeInsert
方法将结果写入到输出表中,并使用execute
方法启动执行。
在 Table API编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。
其中最简单的当然就是连接到控制台打印输出:
CREATE TABLE ResultTable (
user STRING,
cnt BIGINT
WITH (
'connector' = 'print'
);
需要导入maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
创建一个连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为 Kafka,并定义必要的配置参数。
CREATE TABLE KafkaTable (
`user` STRING,
`url` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
创建 JDBC 表的方法与前面Kafka 大同小异:
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable
SELECT id, name, age, status FROM T;
在Flink中创建一张表有两种方法:
Table API中已经提供了TableSource从外部系统获取数据,例如常见的数据库、文件系统和Kafka消息队列等外部系统。
从文件中创建Table(静态表)
Flink允许用户从本地或者分布式文件系统中读取和写入数据,在Table API中可以通过CsvTableSource类来创建,只需指定相应的参数即可。但是文件格式必须是CSV格式的。其他文件格式也支持(在Flink还有Connector的来支持其他格式或者自定义TableSource)
//创建流式计算的上下文环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建Table API的上下文环境
val tableEvn = StreamTableEnvironment.create(env)val source = new CsvTableSource("D:\\code\\StudyFlink\\data\\tableexamples"
, Array[String]("id", "name", "score")
, Array(Types.INT, Types.STRING, Types.DOUBLE)
)
//将source注册成一张表 别名:exampleTab
tableEvn.registerTableSource("exampleTab",source)
tableEvn.scan("exampleTab").printSchema()</code></pre>
代码最后不需要env.execute(),这并不是一个流式计算任务
从DataStream中创建Table(动态表)
前面已经知道Table API是构建在DataStream API和DataSet API之上的一层更高级的抽象,因此用户可以灵活地使用Table API将Table转换成DataStream或DataSet数据集,也可以将DataSteam或DataSet数据集转换成Table,这和Spark中的DataFrame和RDD的关系类似
Flink支持把自定义POJOs类的所有case类的属性名字变成字段名,也可以通过基于字段偏移位置和字段名称两种方式重新修改:
//导入table库中的隐式转换
import org.apache.flink.table.api.scala._
// 基于位置重新指定字段名称为"field1", "field2", "field3"
val table = tStreamEnv.fromDataStream(stream, 'field1, 'field2, 'field3)
// 将DataStream转换成Table,并且将字段名称重新成别名
val table: Table = tStreamEnv.fromDataStream(stream, 'rowtime as 'newTime, 'id as 'newId,'variable as 'newVariable)
注意:要导入隐式转换。如果使用as 修改字段,必须修改表中所有的字段。
在Table对象上使用select操作符查询需要获取的指定字段,也可以使用filter或where方法过滤字段和检索条件,将需要的数据检索出来。
object TableAPITest {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//初始化Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(streamEnv)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val data = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
val table: Table = tableEvn.fromDataStream(data)
//查询
tableEvn.toAppendStream[Row](
table.select('sid,'callType as 'type,'callTime,'callOut))
.print()
//过滤查询
tableEvn.toAppendStream[Row](
table.filter('callType==="success") //filter
.where('callType==="success")) //where
.print()
tableEvn.execute("sql")
}
其中toAppendStream函数是吧Table对象转换成DataStream对象。
举例:我们统计每个基站的日志数量。
val table: Table = tableEvn.fromDataStream(data)
tableEvn.toRetractStream[Row](
table.groupBy('sid).select('sid, 'sid.count as 'logCount))
.filter(_._1==true) //返回的如果是true才是Insert的数据
.print()
在代码中可以看出,使用toAppendStream和toRetractStream方法将Table转换为DataStream[T]数据集,T可以是Flink自定义的数据格式类型Row,也可以是用户指定的数据格式类型。在使用toRetractStream方法时,返回的数据类型结果为DataStream[(Boolean,T)],Boolean类型代表数据更新类型,True对应INSERT操作更新的数据,False对应DELETE操作更新的数据。
用户可以在Table API中自定义函数类,常见的抽象类和接口是:
案例:使用Table完成基于流的WordCount
object TableAPITest2 {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//初始化Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(streamEnv)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888)
val table: Table = tableEvn.fromDataStream(stream,'words)
var my_func =new MyFlatMapFunction()//自定义UDF
val result: Table = table.flatMap(my_func('words)).as('word, 'count)
.groupBy('word) //分组
.select('word, 'count.sum as 'c) //聚合
tableEvn.toRetractStream[Row](result)
.filter(_._1==true)
.print()
tableEvn.execute("table_api")
}
//自定义UDF
class MyFlatMapFunction extends TableFunction[Row]{
//定义类型
override def getResultType: TypeInformation[Row] = {
Types.ROW(Types.STRING, Types.INT)
}
//函数主体
def eval(str:String):Unit ={
str.trim.split(" ")
.foreach({word=>{
var row =new Row(2)
row.setField(0,word)
row.setField(1,1)
collect(row)
}})
}
}
}
Flink支持ProcessTime、EventTime和IngestionTime三种时间概念,针对每种时间概念,Flink Table API中使用Schema中单独的字段来表示时间属性,当时间字段被指定后,就可以在基于时间的操作算子中使用相应的时间属性。
在Table API中通过使用.rowtime来定义EventTime字段,在ProcessTime时间字段名后使用.proctime后缀来指定ProcessTime时间属性.
案例:统计最近5秒钟,每个基站的呼叫数量
object TableAPITest {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//指定EventTime为时间语义
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(1)
//初始化Table API的上下文环境
val tableEvn =StreamTableEnvironment.create(streamEnv)
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val data = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr =line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
.assignTimestampsAndWatermarks( //引入Watermark
new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)){//延迟2秒
override def extractTimestamp(element: StationLog) = {
element.callTime
}
})
//设置时间属性
val table: Table = tableEvn.fromDataStream(data,'sid,'callOut,'callIn,'callType,'callTime.rowtime)
//滚动Window ,第一种写法
val result: Table = table.window(Tumble over 5.second on 'callTime as 'window)
//第二种写法
val result: Table = table.window(Tumble.over("5.second").on("callTime").as("window"))
.groupBy('window, 'sid)
.select('sid, 'window.start, 'window.end, 'window.rowtime, 'sid.count)
//打印结果
tableEvn.toRetractStream[Row](result)
.filter(_._1==true)
.print()
tableEvn.execute("sql")
}
}
上面的案例是滚动窗口,如果是滑动窗口也是一样,代码如下:
//滑动窗口,窗口大小为:10秒,滑动步长为5秒 :第一种写法
table.window(Slide over 10.second every 5.second on 'callTime as 'window)
//滑动窗口第二种写法 table.window(Slide.over("10.second").every("5.second").on("callTime").as("window"))
企业中Flink SQL比Table API用的多。
Flink SQL 是 Apache Flink 提供的一种使用 SQL 查询和处理数据的方式。它允许用户通过 SQL 语句对数据流或批处理数据进行查询、转换和分析,无需编写复杂的代码。Flink SQL 提供了一种更直观、易于理解和使用的方式来处理数据,同时也可以与 Flink 的其他功能无缝集成。
Flink SQL 支持 ANSI SQL 标准,并提供了许多扩展和优化来适应流式处理和批处理场景。它能够处理无界数据流,具备事件时间和处理时间的语义,支持窗口、聚合、连接等常见的数据操作,还提供了丰富的内置函数和扩展插件机制。
下面是一个简单的 Flink SQL 代码示例,展示了如何使用 Flink SQL 对流式数据进行查询和转换。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkSqlExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 设置并行度为1,方便观察输出结果
// 创建 Kafka 数据源
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
DataStream<String> sourceStream = env.addSource(kafkaConsumer);
// 注册数据源表
env.createTemporaryView("source_table", sourceStream, "message");
// 执行 SQL 查询和转换
String query = "SELECT message, COUNT(*) AS count FROM source_table GROUP BY message";
DataStream<Result> resultStream = env.sqlQuery(query).map(value -> new Result(value.getField(0), value.getField(1)));
// 打印结果
resultStream.print();
env.execute("Flink SQL Example");
}
// 自定义结果类
public static class Result {
public String message;
public Long count;
public Result() {}
public Result(String message, Long count) {
this.message = message;
this.count = count;
}
@Override
public String toString() {
return "Result{" +
"message='" + message + '\'' +
", count=" + count +
'}';
}
}
}
在上述示例中,我们使用 Apache Kafka 作为数据源,并创建了一个消费者从名为 "input-topic" 的 Kafka 主题中读取数据。然后,我们将数据流注册为名为 "source_table" 的临时表。
接下来,我们使用 Flink SQL 执行 SQL 查询和转换。在这个例子中,我们查询 "source_table" 表,对 "message" 字段进行分组并计算每个消息出现的次数。查询结果会映射到自定义的 Result
类,并最终通过 print()
方法打印到标准输出。
最后,我们通过调用 env.execute()
方法来启动 Flink 作业的执行。
复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。
CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。
在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等。
Flink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:
定义Pattern可以是单次执行模式,也可以是循环执行模式。单词执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的
val start = Pattern.begin[Event]("start_pattern")
下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。
start.where(_.getCallType == "success")
对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。
times:可以通过times指定固定的循环执行次数。
//指定循环触发4次
start.times(4);
//可以执行触发次数范围,让循环执行次数在该范围之内
start.times(2, 4);
optional:也可以通过optional关键字指定要么不触发要么触发指定的次数。
start.times(4).optional();
start.times(2, 4).optional();
greedy:可以通过greedy将Pattern标记为贪婪模式,在Pattern匹配成功的前提下,会尽可能多地触发。
//触发2、3、4次,尽可能重复执行
start.times(2, 4).greedy();
//触发0、2、3、4次,尽可能重复执行
start.times(2, 4).optional().greedy();
oneOrMore:可以通过oneOrMore方法指定触发一次或多次。
// 触发一次或者多次
start.oneOrMore();
//触发一次或者多次,尽可能重复执行
start.oneOrMore().greedy();
// 触发0次或者多次
start.oneOrMore().optional();
// 触发0次或者多次,尽可能重复执行
start.oneOrMore().optional().greedy();
timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。
// 触发两次或者多次
start.timesOrMore(2);
// 触发两次或者多次,尽可能重复执行
start.timesOrMore(2).greedy();
// 不触发或者触发两次以上,尽可能重复执行
start.timesOrMore(2).optional().greedy();
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。
简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。
// 把通话成功的事件挑选出来
start.where(_.getCallType == "success")
组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。
// 把通话成功,或者通话时长大于10秒的事件挑选出来
val start = Pattern.beginStationLog
.where(.callType=="success")
.or(.duration>10)
终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。
pattern.oneOrMore.until(_.callOut.startsWith("186"))
将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。
val strict: Pattern[Event] = start.next("middle").where(...)
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
.notNext() —— 不想让某个事件严格紧邻前一个事件发生。
.notFollowedBy() —— 不想让某个事件在两个事件之间发生。
注意:
所有模式序列必须以 .begin() 开始
模式序列不能以 .notFollowedBy() 结束
“not” 类型的模式不能被 optional 所修饰
此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
//指定模式在10秒内有效
pattern.within(Time.seconds(10));
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
//cep 做模式检测
val patternStream = CEP.pattern[EventLog](dataStream.keyBy(_.id),pattern)
得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。
通过Select Funciton抽取正常事件
可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, Iterable[IN]],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。
def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {
//获取pattern中的startEvent
val startEvent = pattern.get("start_pattern").get.next
//获取Pattern中middleEvent
val middleEvent = pattern.get("middle").get.next
//返回结果
OUT(startEvent, middleEvent)}
通过Flat Select Funciton抽取正常事件
Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = { //获取pattern中startEvent
val startEvent = pattern.get("start_pattern").get.next
//获取Pattern中middleEvent
val middleEvent = pattern.get("middle").get.next
//并根据startEvent的Value数量进行返回
for (i <- 0 to startEvent.getValue) {
collector.collect(OUT(startEvent, middleEvent))
}}
通过Select Funciton抽取超时事件
如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。
// 通过CEP.pattern方法创建
PatternStream val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) //创建OutputTag,并命名为timeout-output
val timeoutTag = OutputTag[String]("timeout-output")
//调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] = patternStream.select(timeoutTag){
//超时事件获取
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>
TimeoutEvent()//返回异常事件
} {
//正常事件获取
pattern: Map[String, Iterable[Event]] =>
NormalEvent()
//返回正常事件
}
//调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag)
在大数据领域,大多数开源框架(Hadoop、Spark、Flink)都是基于JVM运行,但是JVM的内存管理机制往往存在着诸多类似OutOfMemoryError的问题,主要是因为创建过多的对象实例而超过JVM的最大堆内存限制,却没有被有效回收掉,这在很大程度上影响了系统的稳定性,尤其对于大数据应用,面对大量的数据对象产生,仅仅靠JVM所提供的各种垃圾回收机制很难解决内存溢出的问题。在开源框架中有很多框架都实现了自己的内存管理,例如Apache Spark的Tungsten项目,在一定程度上减轻了框架对JVM垃圾回收机制的依赖,从而更好地使用JVM来处理大规模数据集。
Flink也基于JVM实现了自己的内存管理,将JVM根据内存区分为Unmanned Heap、Flink Managed Heap、Network Buffers三个区域。在Flink内部对Flink Managed Heap进行管理,在启动集群的过程中直接将堆内存初始化成Memory Pages Pool,也就是将内存全部以二进制数组的方式占用,形成虚拟内存使用空间。新创建的对象都是以序列化成二进制数据的方式存储在内存页面池中,当完成计算后数据对象Flink就会将Page置空,而不是通过JVM进行垃圾回收,保证数据对象的创建永远不会超过JVM堆内存大小,也有效地避免了因为频繁GC导致的系统稳定性问题。
JobManager在Flink系统中主要承担管理集群资源、接收任务、调度Task、收集任务状态以及管理TaskManager的功能,JobManager本身并不直接参与数据的计算过程中,因此JobManager的内存配置项不是特别多,只要指定JobManager堆内存大小即可。
jobmanager.heap.size:设定JobManager堆内存大小,默认为1024MB。
TaskManager作为Flink集群中的工作节点,所有任务的计算逻辑均执行在TaskManager之上,因此对TaskManager内存配置显得尤为重要,可以通过以下参数配置对TaskManager进行优化和调整。
taskmanager.heap.size:设定TaskManager堆内存大小,默认值为1024M,如果在Yarn的集群中,TaskManager取决于Yarn分配给TaskManager Container的内存大小,且Yarn环境下一般会减掉一部分内存用于Container的容错。
taskmanager.jvm-exit-on-oom:设定TaskManager是否会因为JVM发生内存溢出而停止,默认为false,当TaskManager发生内存溢出时,也不会导致TaskManager停止。
taskmanager.memory.size:设定TaskManager内存大小,默认为0,如果不设定该值将会使用taskmanager.memory.fraction作为内存分配依据。
taskmanager.memory.fraction:设定TaskManager堆中去除Network Buffers内存后的内存分配比例。该内存主要用于TaskManager任务排序、缓存中间结果等操作。例如,如果设定为0.8,则代表TaskManager保留80%内存用于中间结果数据的缓存,剩下20%的内存用于创建用户定义函数中的数据对象存储。注意,该参数只有在taskmanager.memory.size不设定的情况下才生效。
taskmanager.memory.off-heap:设置是否开启堆外内存供Managed Memory或者Network Buffers使用。
taskmanager.memory.preallocate:设置是否在启动TaskManager过程中直接分配TaskManager管理内存。
taskmanager.numberOfTaskSlots:每个TaskManager分配的slot数量。
Flink将JVM堆内存切分为三个部分,其中一部分为Network Buffers内存。Network Buffers内存是Flink数据交互层的关键内存资源,主要目的是缓存分布式数据处理过程中的输入数据。。通常情况下,比较大的Network Buffers意味着更高的吞吐量。如果系统出现“Insufficient number of network buffers”的错误,一般是因为Network Buffers配置过低导致,因此,在这种情况下需要适当调整TaskManager上Network Buffers的内存大小,以使得系统能够达到相对较高的吞吐量。
目前Flink能够调整Network Buffer内存大小的方式有两种:一种是通过直接指定Network Buffers内存数量的方式,另外一种是通过配置内存比例的方式。
直接设定Nework Buffer数量需要通过如下公式计算得出:
NetworkBuffersNum = total-degree-of-parallelism \* intra-node-parallelism * n
其中total-degree-of-parallelism表示每个TaskManager的总并发数量,intra-node-parallelism表示每个TaskManager输入数据源的并发数量,n表示在预估计算过程中Repar-titioning或Broadcasting操作并行的数量。intra-node-parallelism通常情况下与Task-Manager的所占有的CPU数一致,且Repartitioning和Broadcating一般下不会超过4个并发。可以将计算公式转化如下:
NetworkBuffersNum = <slots-per-TM>^2 \* < TMs>* 4
其中slots-per-TM是每个TaskManager上分配的slots数量,TMs是TaskManager的总数量。对于一个含有20个TaskManager,每个TaskManager含有8个Slot的集群来说,总共需要的Network Buffer数量为8^2**20*4=5120个,因此集群中配置Network Buffer内存的大小约为160M较为合适。
计算完Network Buffer数量后,可以通过添加如下两个参数对Network Buffer内存进行配置。其中segment-size为每个Network Buffer的内存大小,默认为32KB,一般不需要修改,通过设定numberOfBuffers参数以达到计算出的内存大小要求。
taskmanager.network.numberOfBuffers:指定Network堆栈Buffer内存块的数量。
taskmanager.memory.segment-size:内存管理器和Network栈使用的内存Buffer大小,默认为32KB。
从1.3版本开始,Flink就提供了通过指定内存比例的方式设置Network Buffer内存大小。
taskmanager.network.memory.fraction:JVM中用于Network Buffers的内存比例。
taskmanager.network.memory.min:最小的Network Buffers内存大小,默认为64MB。
taskmanager.network.memory.max:最大的Network Buffers内存大小,默认1GB。
taskmanager.memory.segment-size:内存管理器和Network栈使用的Buffer大小,默认为32KB。
本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。文章持续更新,可以关注公众号第一时间阅读。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章