Flink DataStream API 中的多面手——Process Function详解
阅读原文时间:2023年07月12日阅读:1

之前熟悉的流处理API中的转换算子是无法访问事件的时间戳信息和水位线信息的。例如:MapFunction 这样的map转换算子就无法访问时间戳或者当前事件的时间。

然而,在一些场景下,又需要访问这些信息。基于此,DataStream API提供了一系列的 Low-Level转换算子。

这些算子支持访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。

ProcessFunction 用来构建事件驱动的应用(支持带有事件时间的窗口操作)以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。

例如:Flink SQL就是使用 Process Function实现的。

Flink提供了8个Process Function:

1、ProcessFunction

2、KeyedProcessFunction

3、CoProcessFunction

4、ProcessJoinFunction

5、BroadcastProcessFunction

6、KeyedBroadcastProcessFunction

7、ProcessWindowFunction

8、ProcessAllWindowFunction

从上面的继承关系中可以看出,都实现了RichFunction接口,所以支持使用 open()、close()、getRuntimeContext() 等方法的调用。从名字上可以看出,这些函数都有

不同的适用场景,但是基本的功能是类似的,下面会以 KeyedProcessFunction为例来讨论这些函数的通用功能。

一、KeyedProcessFunction

由于KeyedStream 是最常用的,而KeyedProcessFunction又用来操作KeyedStream,所以这里重点介绍 KeyedProcessFunction。

KeyedProcessFunction 会处理流的每一个元素,输出为0个,1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有

open()、close() 和 getRuntimeContext() 等方法。而 KeyedProcessFunction[KEY,IN,OUT] 还提供了另外两个方法:

1、processElement(value:IN,ctx:Context,out:Collector[OUT]),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。

Context可以访问元素的时间戳,元素的key,以及TimeService时间服务。Context还可以将结果输出到别的流(side outputs)。

2、onTimer(timestamp:Long,ctx:OntimerContext,out:Collector[OUT]) 是一个回调函数。

当之前注册的定时器触发时调用。参数 timestamp为定时器所设定的触发的时间戳。Collector 为输出结果的集合。

OnTimerContext和processElement的Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)

public abstract class KeyedProcessFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

public KeyedProcessFunction() {  
}

public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;

public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {  
}

public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {  
    public OnTimerContext() {  
        super();  
    }

    public abstract TimeDomain timeDomain();

    public abstract K getCurrentKey();  
}

public abstract class Context {  
    public Context() {  
    }

    public abstract Long timestamp();

    public abstract TimerService timerService();

    public abstract <X> void output(OutputTag<X> var1, X var2);

    public abstract K getCurrentKey();  
}  

}

上面的源码中,主要有两个方法,分析如下:

  • processElement(I value, Context ctx, Collector out)

该方法会对流中的每条记录都调用一次,输出0个或者多个元素,类似于FlatMap的功能,通过Collector将结果发出。除此之外,该函数有一个Context 参数,用户可以通过Context 访问时间戳、当前记录的key值以及TimerService(关于TimerService,下面会详细解释)。另外还可以使用output方法将数据发送到side output,实现分流或者处理迟到数据的功能。

  • onTimer(long timestamp, OnTimerContext ctx, Collector out)

该方法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp参数表示触发计时器(timers)的时间戳,Collector可以将记录发出。细心的你可能会发现,这两个方法都有一个上下文参数,上面的方法传递的是Context 参数,onTimer方法传递的是OnTimerContext参数,这两个参数对象可以实现相似的功能。OnTimerContext还可以返回触发计时器的时间域(EVENT_TIME与PROCESSING_TIME)。

@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

long currentProcessingTime();

long currentWatermark();

void registerProcessingTimeTimer(long var1);

void registerEventTimeTimer(long var1);

void deleteProcessingTimeTimer(long var1);

void deleteEventTimeTimer(long var1);  

}

TimerService提供了以下几种方法:

  • currentProcessingTime()

返回当前的处理时间

  • currentWatermark()

返回当前event-time水位线(watermark)时间戳

  • registerProcessingTimeTimer(long time)

针对当前key,注册一个processing time计时器(timers),当processing time的时间等于该计时器时钟时会被调用

  • registerEventTimeTimer(long time)

针对当前key,注册一个event time计时器(timers),当水位线时间戳大于等于该计时器时钟时会被调用

  • deleteProcessingTimeTimer(long time)

针对当前key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该方法不会起作用

  • deleteEventTimeTimer(long time)

针对当前key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该方法不会起作用

当计时器触发时,会回调onTimer()函数,系统对于ProcessElement()方法和onTimer()方法的调用是同步的

注意:上面的源码中有两个Error 信息,这就说明计时器只能在keyed streams上使用,常见的用途是在某些key值不在使用后清除keyed state,或者实现一些基于时间的自定义窗口逻辑。如果要在一个非KeyedStream上使用计时器,可以使用KeySelector返回一个固定的分区值(比如返回一个常数),这样所有的数据只会发送到一个分区。

本文首先介绍了Flink提供的几种底层Process Function API,这些API可以访问时间戳和水位线,同时支持注册一个计时器,进行调用回调函数onTimer()。接着从源码的角度解读了这些API的共同部分,详细解释了每个方法的具体含义和使用方式。最后,给出了一个Process Function常见使用场景案例,使用其实现分流处理。除此之外,用户还可以使用这些函数,通过注册计时器,在回调函数中定义处理逻辑,使用非常的灵活。

/**