目录
翻译来源-Learn Flink Data Pipelines & ETL
Apache Flink的一种非常常见的用例是实现ETL(提取,转换,加载)管道,该管道从一个或多个源获取数据,执行一些转换和/或扩充,然后将结果存储在某个地方。在本节中,我们将研究如何使用Flink的DataStream API来实现这种应用程序。
请注意,Flink的Table和SQL API 非常适合许多ETL用例。但是,无论您最终是否直接使用DataStream API,这里介绍的基础知识深刻的了解都会很有价值。
本节介绍map()和flatmap(),用于实现无状态转换的基本操作。本节中的示例假定您熟悉flink-training仓库中动手练习中使用的Taxi Ride数据 。
在第一个练习中,您过滤了出租车事件流。在同一代码库中,有一个 GeoUtils类提供了一种静态方法GeoUtils.mapToGridCell(float lon, float lat),该方法将位置(经度,纬度)映射到网格单元,该网格单元指的是大小约为100x100米的区域。
现在,通过向每个事件添加startCell和endCell字段来丰富我们的出租车乘车对象流。您可以创建一个EnrichedRide扩展对象TaxiRide,添加以下字段:
public static class EnrichedRide extends TaxiRide {
public int startCell;
public int endCell;
public EnrichedRide() {}
public EnrichedRide(TaxiRide ride) {
this.rideId = ride.rideId;
this.isStart = ride.isStart;
...
this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
}
public String toString() {
return super.toString() + "," +
Integer.toString(this.startCell) + "," +
Integer.toString(this.endCell);
}
}
然后,您可以创建一个转换这个流的应用程序
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
.filter(new RideCleansingSolution.NYCFilter())
.map(new Enrichment());
enrichedNYCRides.print();
使用MapFunction:
public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
@Override
public EnrichedRide map(TaxiRide taxiRide) throws Exception {
return new EnrichedRide(taxiRide);
}
}
一个MapFunction仅在执行一对一转换时适用:对于每个进入的流元素,map()将发出一个转换后的元素。否则,您将要使用 flatmap()
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
.flatMap(new NYCEnrichment());
enrichedNYCRides.print();
使用FlatMapFunction:
public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
@Override
public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
if (valid.filter(taxiRide)) {
out.collect(new EnrichedRide(taxiRide));
}
}
}
使用此接口中提供的Collector,该flatmap()方法可以发射任意数量的流元素,包括none。
围绕流的一个属性对流进行分区往往很有用,以便属性相同的所有事件分组到一块。例如,假设您要查找从每个网格单元开始的最长的出租车车程。从SQL查询的角度考虑,这将意味着使用进行某种GROUP BY startCell,而在Flink中,这是通过 keyBy(KeySelector)
rides
.flatMap(new NYCEnrichment())
.keyBy(enrichedRide -> enrichedRide.startCell)
每一个keyBy都会导致网络混洗,从而对流进行重新分区。通常,这非常昂贵,因为它涉及网络通信以及序列化和反序列化。
KeySelector不仅限于从事件中提取key。他们可以,而是计算在你想要的任何方式的key,只要所产生的key是确定的,并具有有效的hashCode()和equals()实现。此限制排除了生成随机数或返回Array或Enums的KeySelector,但是例如,只要它们的元素遵循这些相同的规则,就可以使用Tuples或POJO来使用复合键。
keys必须以确定性的方式生成,因为它们会在需要时重新计算,而不是附加到流记录中。
例如,不是创建一个新EnrichedRide类,而是用一个字段,我们通过以下代码使用startCell字段,
keyBy(enrichedRide -> enrichedRide.startCell)
我们按上面做,而不是按照下面:
keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))
此代码段创建一个新的元组流,其中包含startCell和每个乘坐结束事件的时长(以分钟为单位):
import org.joda.time.Interval;
DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
.flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {
@Override
public void flatMap(EnrichedRide ride,
Collector<Tuple2<Integer, Minutes>> out) throws Exception {
if (!ride.isStart) {
Interval rideInterval = new Interval(ride.startTime, ride.endTime);
Minutes duration = rideInterval.toDuration().toStandardMinutes();
out.collect(new Tuple2<>(ride.startCell, duration));
}
}
});
现在,可以生成一个流,流中仅包含每一个startCell看到(到该点为止)的最长的乘坐。
可以使用多种方式表示要用作key的字段。前面您看到了一个带有EnrichedRide POJO的示例,其中用作键的字段是用其名称指定的。下面的情况涉及Tuple2对象,并且元组中的索引(从0开始)用于指定key。
minutesByStartCell
.keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
.maxBy(1) // duration
.print();
输出流包含每个键的记录,在每个key的每次持续时间达到新的最大值时会输出,如此处50797格子中所示:
…
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
…
1> (50797,8M)
…
1> (50797,11M)
…
1> (50797,12M)
这是此手册中涉及有状态流的第一个示例。尽管状态是透明处理的,但是Flink必须跟踪每个不同key的最大持续时间。
每当应用程序涉及到状态,都应该考虑该状态可能会变大。只要key空间是无界的,那么Flink需要的状态量也跟key空间一样。
当使用流时,通常聚合在有限窗口而不是在整个流上多考虑。
上面使用的maxBy()仅仅是Flink KeyedStream上许多可用的聚合器功能的一个示例。reduce()您还可以使用一个更通用的功能来实现自己的自定义聚合。
为什么Flink参与管理状态?
您的应用程序当然有能力使用状态,而无需Flink参与管理状态-但是Flink为它管理的状态提供了一些引人注目的功能:
本地:Flink状态被保存在处理该状态的机器本地,并且可以以内存速度访问
经久耐用:Flink状态是容错的,即定期自动保存检查点,并在发生故障时恢复
垂直可扩展:Flink状态可以保留在嵌入式RocksDB实例中,该实例可以通过添加更多本地磁盘来扩容
水平可伸缩:随着群集的增长和收缩,Flink状态将重新分配
queryable:可通过Queryable State API在外部查询Flink状态。
在本节中,您将学习如何使用Flink的API管理keyed状态。
此时,您已经看到Flink的几个功能接口,包括 FilterFunction,MapFunction,和FlatMapFunction。这些都是“单一抽象方法”模式的示例。
对于每个接口,Flink还提供了一个称为“丰富”变种,例如 RichFlatMapFunction,它具有一些其他方法,包括:
open(Configuration c)
close()
getRuntimeContext()
在算子初始化期间,open()被调用一次。例如,这是加载一些静态数据或打开与外部服务的连接的时机。
getRuntimeContext() 提供对整套可能感兴趣东西的访问,尤其是如何创建和访问Flink管理的状态。
在这个例子中,假设有一个要删除重复事件的流,因此只保留每个键的第一个事件。这是一个使用RichFlatMapFunction的应用,Deduplicator是RichFlatMapFunction实现:
private static class Event {
public final String key;
public final long timestamp;
...
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicator())
.print();
env.execute();
}
为此,Deduplicator需要以某种方式记住对于每个键是否已存在该键的事件。它将使用Flink的keyed状态接口来记忆。
当您使用这样的keyed流时,Flink将为所管理的每个状态项维护一个键/值存储。
Flink支持几种不同类型的keyed状态,此示例使用最简单的一种,即ValueState。这意味着Flink将为每个键存储一个对象-在这个例子中,将存储一个类型对象Boolean。
Deduplicator类有两个方法:open()和flatMap()。通过定义ValueStateDescriptor,open方法引用并管理状态。构造函数的参数指定了keyed状态项的名称("keyHasBeenSeen"),并提供可用于序列化状态对象的信息(在本例中为Types.BOOLEAN)。
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
当flatMap方法调用keyHasBeenSeen.value()时,Flink在上下文中查找该key这部分状态的值,并且只有在这个值是null时,才会讲事件收集到输出中。在这种情况下,它也会更新keyHasBeenSeen的值为true。
这种访问和更新keyed-partitioned状态的机制看起来似乎很神奇,因为在我们Deduplicator实现中该键不是显式可见的。当Flink运行时调用RichFlatMapFunction的open方法时并没有事件,此时上下文中也没有key。但当调用flatMap方法时,正在处理事件的key是可用的,并且key在后台用于确定操作Flink的状态后端中的哪一个entry。
当部署到分布式集群时,将有很多Deduplicator实例,每个实例负责整个键空间的不相交子集。因此,当您看到ValueState的单个条目时,例如
ValueState keyHasBeenSeen;
请注意理解,这不仅代表单个布尔值,而且代表分布式的,分片的键/值存储。
上面的示例存在一个可能的问题:如果key空间是无界的,将会发生什么?Flink将使用在某个地方为每个不同key的实例存储一个Boolean实例。如果有一组有限的keys,那会很好,但是在keys无限增长的应用程序中,有必要清除不再需要的keys的状态。这是通过在状态对象上调用clear()来完成的,如下所示:
keyHasBeenSeen.clear()
您可能要在某个key闲置一段时间后执行此操作。在事件驱动的应用程序部分中了解关于ProcessFunction的内容时,您将看到如何使用Timers来执行此操作。
还可以使用状态生存时间(TTL)选项进行配置,该选项可以使用状态描述符进行配置,该描述符指定何时自动清除老的密钥状态。
也可以在非keyed上下文中管理状态。有时称为算子状态。所涉及的接口稍有不同,并且由于non-keyed状态的用户定义函数是不常见,因此此处不进行介绍。此功能最常用于源和接收器的实现中。
有时,并不是像这样应用预定义的转换:
你想能够动态修改转换的某些方面-通过流中的阈值,规则或其他参数。Flink中支持此功能的模式被称为“连接流”,其中单个运算符具有两个输入流,如下所示:
被连接的流还可以用于实现流join。
在此示例中,名称为control的流用于指定从streamOfWords流中过滤掉的单词。一个名称为ControlFunction的 RichCoFlatMapFunction应用于连接流以完成此操作。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
control
.connect(datastreamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
请注意,所连接的两个流必须以兼容的方式进行keyed设置。keyBy的作用是对流的数据进行分区,并且在连接keyed流时,必须以相同的方式对它们进行分区。这样可以确保两个流中具有相同key的事件都发送到同一实例。例如,这使得可以将两个流join到一个key上。
在这种情况下,两个流都是类型DataStream,并且两个流都由字符串作为键。正如将在下面看到的,这个RichCoFlatMapFunction是在keyed状态下存储一个布尔值,并且该布尔值由两个流共享。
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
一个RichCoFlatMapFunction是FlatMapFunction的一种,可以应用于一对连接流,它可以访问丰富的函数接口。这意味着可以将其设置为有状态。
名称为blocked的Boolean被用来记住在control流上所提及的键(在这个例子中是单词),并且记住从streamOfWords流过滤掉出的那些词。这是键控状态,并且在两个流之间共享,这就是两个流必须共享相同键空间的原因。
flatMap1和flatMap2被Flink运行时调用,Flink带有来自每个连接流的元素-在这个例子下,来自control流的元素被传入入flatMap1,来自streamOfWords的元素被传递到flatMap2。这取决于control.connect(datastreamOfWords)代码对两个流连接的顺序。
重要的是要认识到flatMap1和 flatMap2回调的调用顺序是无法控制的。这两个输入流彼此竞争,并且Flink运行时将就消费一个或另一个流中的事件做它想要做的功能。如果时间和/或顺序很重要,您可能会发现有必要在管理的Flink状态下缓冲事件,直到您的应用程序准备好处理它们。(注意:如果您真的很绝望,则可以使用实现InputSelectable 接口的自定义运算符,对双输入算子消费输入元素的的顺序进行一些有限的控制 。)
本部分附带的动手练习是 乘车和票价练习。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章