flink实战--ProcessFunction
阅读原文时间:2021年04月26日阅读:1

想象这样一种情景,我们想在算子或者函数中获取数据流中Watermark的时间戳,或者在时间上前后穿梭,我们该如何办?ProcessFunction系列函数给我们提供了这样子的能力,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。这个系列函数主要包括KeyedProcessFunctionProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessJoinFunction 等多种函数,这些函数都继承于RichFunction,可以获取状态信息,另外都有定时器,可以在时间维度上设计复杂的业务逻辑。

ProcessFunction介绍

ProcessFunction是flink提供面向使用者low-level层级的api,通过ProcessFunction可以访问state、注册处理时间/事件时间定时器来帮助我们完成一些比较复杂的操作,但是只能用使用在keyedStream中,这是因为根据getRuntimeContext 得到的StreamingRuntimeContext只提供了KeyedStateStore的访问许可权,所以只能访问keyd state,;另外注册的定时器必须是与key相关,也就解释了在ProcessFunction中只能在keyedStream做定时器注册。ProcessFunction源码中定义如下:

// org.apache.flink.streaming.api.functions.ProcessFunction
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;
    // 处理每一条数据的逻辑
    public abstract void processElement(I value, Context ctx, Collector<O> out) 
        throws Exception;

    // 当定义的timer触发时候进行回调
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
        throws Exception {}

    public abstract class Context {
        public abstract Long timestamp();
        public abstract TimerService timerService(); // 可以注册一个定时器
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    public abstract class OnTimerContext extends Context { 
        public abstract TimeDomain timeDomain();
    }
}

state

ProcessFunction继承了AbstractRichFunction,可以访问Flink中的keyed state,可以通过其访问 RuntimeContext,获取相应的状态信息。

Timer

定时器允许应用程序对processing timeevent_time的变化作出反应。每次调用该函数processElement(...)都会获得一个Context对象,该对象可以访问元素的事件时间戳和TimerService,这也是区别于FlatMapRichFunction等普通函数的地方,可以通过改Context获取时间戳,设置Timer,TimerService可用于事件时间/处理时间实例注册回调。

onTimer

当注册了事件定时器,达到计时器的特定时间时,方法onTimer(...)将会被自动调用。在该调用期间,所有状态再次限定为创建计时器的key的状态,允许计时器操纵keyed state

综上,processFunction使用模式一般为:

stream.keyBy(...).process(new MyProcessFunction())

实战

引用官网的一个例子,统计每个key的计数,并且每一分钟发出一个没有更新key的key/count对。

// main
import com.flink.transformation.CountWithTimestampProcessFunction;
import com.flink.transformation.LineSplitMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ProcessFunctionMain {
    public static String HOST = "127.0.0.1";
    public static Integer PORT = 8823;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream(HOST, PORT);
        // 输入: key value
        SingleOutputStreamOperator<Tuple2<String, Long>> processResult =
                stream.map(new LineSplitMapFunction()).
                        keyBy(0).
                        process(new CountWithTimestampProcessFunction());

        processResult.print();
        env.execute("Flink word-count-process-function example");
    }
}


// transformation
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class LineSplitMapFunction implements MapFunction<String, Tuple2<String, String>> {

    @Override
    public Tuple2<String, String> map(String s) throws Exception {
        String[] arr = s.split(" ");
        return new Tuple2<>(arr[0], arr[1]);
    }
}


// process function
import com.flink.bean.CountWithTimestamp;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class CountWithTimestampProcessFunction
        extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

    private ValueState<CountWithTimestamp> countWithTimestampValueState;

    public void open(Configuration parameters) throws Exception {
        // 状态保存,设置状态过期时间,设置24小时,自动清理状态
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.hours(24))
                .cleanupFullSnapshot()
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        // 初始化ValueState
        ValueStateDescriptor<CountWithTimestamp> stateDescriptor =
                new ValueStateDescriptor("userComicReadInfoValueState",
                        TypeInformation.of(new TypeHint<CountWithTimestamp>() {}));
        stateDescriptor.enableTimeToLive(ttlConfig);
        countWithTimestampValueState = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx,
                               Collector<Tuple2<String, Long>> out) throws Exception {
        CountWithTimestamp current = countWithTimestampValueState.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.setKey(value.f0);
        }

        current.setCount(current.getCount() + 1);
        current.setLastModified(System.currentTimeMillis());

        countWithTimestampValueState.update(current);
        ctx.timerService().registerProcessingTimeTimer(current.getLastModified() + 10000);
    }

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        CountWithTimestamp result = countWithTimestampValueState.value();
        if (result.getLastModified() + 10000 == timestamp) {
            out.collect(new Tuple2<>(result.getKey(), result.getCount()));
        }
    }
}

让我们来分析下程序:首先程序从socket stream中获取一行数据,并切分为key-value的Tuple2的datastream,然后按照第一个slot进行分组,然后每条数据都通过自定义的ProcessFunction处理,ProcessFunction可以获取状态信息,我们存储了每个key的信息, modifyTime按照处理的时间来进行设置,官网上的实例是从ctx里面获取timestamp,但是如果设置TimeCharacteristic是ProcessTime则会返回null,这个在源码中有体现,所以要注意这个地方:

/**
 * Timestamp of the element currently being processed or timestamp of a firing timer.
 *
 * <p>This might be {@code null}, for example if the time characteristic of your program
 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
 */
public abstract Long timestamp();

当数据流数据到达时候,更新count和lastModifiedTime,然后更新我们记录的状态信息,并且通过ctx.timerService.registerEventTimeTimer注册一个基于ProcessTime(或者EventTime)的定时器,当到达触发条件时候就会触发定时任务执行onTimer方法,然后执行判断并且输出。

其他场景

  1. 监控报警,采集机器信息,宕机时候进行报警
  2. 我们在做一些时间相关的处理的时候,可能需要缓存一部分数据,但这些数据不能一直去缓存下去,所以需要有一些过期的机制,我们可以通过 timer 去设定这么一个时间,指定某一些数据可能在将来的某一个时间点过期,从而把它从状态里删除掉。所有的这些和时间相关的逻辑在 Flink 内部都是由自己的 Time Service(时间服务)完成的。

参考

  1. https://blog.icocoro.me/2019/06/20/1906-ProcessFunction/
  2. https://www.jianshu.com/p/e6297fac67cb
  3. https://juejin.im/post/5e3a7ebfe51d4526e03f8f8f
  4. https://www.jishuwen.com/d/paSr/zh-tw
  5. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章