flink既能处理离线数据,也能处理实时数据,在1.12.0版本以前,批数据返回的数据集合是dataSet,对应一套dataSet的api,从1.12.0版本以后,flink实现了api的流批一体化处理。DataStream新增一个执行模式(execution mode),通过设置不同的执行模式,即可实现流处理与批处理之前的切换,这样一来,dataSet基本就被废弃了
流执行模式
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流,默认情况下,程序使用的就是STREAMING执行模式
批执行模式
专门用于批处理的执行模式,这种模式下,flink处理作业的方式类似于MapReduce框架。对于不会持续计算的有界数据,我们用这种模式处理会更加方便;
自动模式
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
1.12.0以前
流处理:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
批处理:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
1.12.0以后
流处理:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
批处理:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定批处理运行模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
把数据作为有界流处理,一次性处理完所有数据,再进行结果的展示输出。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<String> batchSource = env.fromElements(" hello java", "hello c", "hello c++", "hello c#", "java world", "hello world");
batchSource.flatMap(new FlatMapFunction<String, Tuple2<String,Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String,Long>> collector) throws Exception {
for(String word: value.split(" ")){
collector.collect(Tuple2.of(word,1l));
}
}
}).keyBy( data -> data.f0 ).sum(1).print();
env.execute();
}
数据是无界流,来一个数据处理一个数据,每个处理过程都会打印出来
代码与上面一致,只是使用默认的执行模式RuntimeExecutionMode.STREAMING
手机扫一扫
移动阅读更方便
你可能感兴趣的文章