【翻译】了解Flink-数据管道和ETL -- Learn Flink - Data Pipelines & ETL
阅读原文时间:2023年07月09日阅读:3

目录

翻译来源-Learn Flink Data Pipelines & ETL

Apache Flink的一种非常常见的用例是实现ETL(提取,转换,加载)管道,该管道从一个或多个源获取数据,执行一些转换和/或扩充,然后将结果存储在某个地方。在本节中,我们将研究如何使用Flink的DataStream API来实现这种应用程序。

请注意,Flink的Table和SQL API 非常适合许多ETL用例。但是,无论您最终是否直接使用DataStream API,这里介绍的基础知识深刻的了解都会很有价值。

无状态转换

本节介绍map()和flatmap(),用于实现无状态转换的基本操作。本节中的示例假定您熟悉flink-training仓库中动手练习中使用的Taxi Ride数据 。

在第一个练习中,您过滤了出租车事件流。在同一代码库中,有一个 GeoUtils类提供了一种静态方法GeoUtils.mapToGridCell(float lon, float lat),该方法将位置(经度,纬度)映射到网格单元,该网格单元指的是大小约为100x100米的区域。

现在,通过向每个事件添加startCell和endCell字段来丰富我们的出租车乘车对象流。您可以创建一个EnrichedRide扩展对象TaxiRide,添加以下字段:

public static class EnrichedRide extends TaxiRide {
    public int startCell;
    public int endCell;

    public EnrichedRide() {}

    public EnrichedRide(TaxiRide ride) {
        this.rideId = ride.rideId;
        this.isStart = ride.isStart;
        ...
        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
    }

    public String toString() {
        return super.toString() + "," +
            Integer.toString(this.startCell) + "," +
            Integer.toString(this.endCell);
    }
}

然后,您可以创建一个转换这个流的应用程序

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .filter(new RideCleansingSolution.NYCFilter())
    .map(new Enrichment());

enrichedNYCRides.print();

使用MapFunction:

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

一个MapFunction仅在执行一对一转换时适用:对于每个进入的流元素,map()将发出一个转换后的元素。否则,您将要使用 flatmap()

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .flatMap(new NYCEnrichment());

enrichedNYCRides.print();

使用FlatMapFunction:

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

使用此接口中提供的Collector,该flatmap()方法可以发射任意数量的流元素,包括none。

keyed 流

围绕流的一个属性对流进行分区往往很有用,以便属性相同的所有事件分组到一块。例如,假设您要查找从每个网格单元开始的最长的出租车车程。从SQL查询的角度考虑,这将意味着使用进行某种GROUP BY startCell,而在Flink中,这是通过 keyBy(KeySelector)

rides
    .flatMap(new NYCEnrichment())
    .keyBy(enrichedRide -> enrichedRide.startCell)

每一个keyBy都会导致网络混洗,从而对流进行重新分区。通常,这非常昂贵,因为它涉及网络通信以及序列化和反序列化。

KeySelector不仅限于从事件中提取key。他们可以,而是计算在你想要的任何方式的key,只要所产生的key是确定的,并具有有效的hashCode()和equals()实现。此限制排除了生成随机数或返回Array或Enums的KeySelector,但是例如,只要它们的元素遵循这些相同的规则,就可以使用Tuples或POJO来使用复合键。

keys必须以确定性的方式生成,因为它们会在需要时重新计算,而不是附加到流记录中。

例如,不是创建一个新EnrichedRide类,而是用一个字段,我们通过以下代码使用startCell字段,

keyBy(enrichedRide -> enrichedRide.startCell)

我们按上面做,而不是按照下面:

keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat))

此代码段创建一个新的元组流,其中包含startCell和每个乘坐结束事件的时长(以分钟为单位):

import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });

现在,可以生成一个流,流中仅包含每一个startCell看到(到该点为止)的最长的乘坐。

可以使用多种方式表示要用作key的字段。前面您看到了一个带有EnrichedRide POJO的示例,其中用作键的字段是用其名称指定的。下面的情况涉及Tuple2对象,并且元组中的索引(从0开始)用于指定key。

minutesByStartCell
  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
  .maxBy(1) // duration
  .print();

输出流包含每个键的记录,在每个key的每次持续时间达到新的最大值时会输出,如此处50797格子中所示:

4> (64549,5M)

4> (46298,18M)

1> (51549,14M)

1> (53043,13M)

1> (56031,22M)

1> (50797,6M)

1> (50797,8M)

1> (50797,11M)

1> (50797,12M)

这是此手册中涉及有状态流的第一个示例。尽管状态是透明处理的,但是Flink必须跟踪每个不同key的最大持续时间。

每当应用程序涉及到状态,都应该考虑该状态可能会变大。只要key空间是无界的,那么Flink需要的状态量也跟key空间一样。

当使用流时,通常聚合在有限窗口而不是在整个流上多考虑。

上面使用的maxBy()仅仅是Flink KeyedStream上许多可用的聚合器功能的一个示例。reduce()您还可以使用一个更通用的功能来实现自己的自定义聚合。

有状态的Transformations

为什么Flink参与管理状态?

您的应用程序当然有能力使用状态,而无需Flink参与管理状态-但是Flink为它管理的状态提供了一些引人注目的功能:

  • 本地:Flink状态被保存在处理该状态的机器本地,并且可以以内存速度访问

  • 经久耐用:Flink状态是容错的,即定期自动保存检查点,并在发生故障时恢复

  • 垂直可扩展:Flink状态可以保留在嵌入式RocksDB实例中,该实例可以通过添加更多本地磁盘来扩容

  • 水平可伸缩:随着群集的增长和收缩,Flink状态将重新分配

  • queryable:可通过Queryable State API在外部查询Flink状态。

    在本节中,您将学习如何使用Flink的API管理keyed状态。

此时,您已经看到Flink的几个功能接口,包括 FilterFunction,MapFunction,和FlatMapFunction。这些都是“单一抽象方法”模式的示例。

对于每个接口,Flink还提供了一个称为“丰富”变种,例如 RichFlatMapFunction,它具有一些其他方法,包括:

  • open(Configuration c)

  • close()

  • getRuntimeContext()

    在算子初始化期间,open()被调用一次。例如,这是加载一些静态数据或打开与外部服务的连接的时机。

    getRuntimeContext() 提供对整套可能感兴趣东西的访问,尤其是如何创建和访问Flink管理的状态。

在这个例子中,假设有一个要删除重复事件的流,因此只保留每个键的第一个事件。这是一个使用RichFlatMapFunction的应用,Deduplicator是RichFlatMapFunction实现:

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();

    env.execute();
}

为此,Deduplicator需要以某种方式记住对于每个键是否已存在该键的事件。它将使用Flink的keyed状态接口来记忆。

当您使用这样的keyed流时,Flink将为所管理的每个状态项维护一个键/值存储。

Flink支持几种不同类型的keyed状态,此示例使用最简单的一种,即ValueState。这意味着Flink将为每个键存储一个对象-在这个例子中,将存储一个类型对象Boolean。

Deduplicator类有两个方法:open()和flatMap()。通过定义ValueStateDescriptor,open方法引用并管理状态。构造函数的参数指定了keyed状态项的名称("keyHasBeenSeen"),并提供可用于序列化状态对象的信息(在本例中为Types.BOOLEAN)。

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;
    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }
    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

当flatMap方法调用keyHasBeenSeen.value()时,Flink在上下文中查找该key这部分状态的值,并且只有在这个值是null时,才会讲事件收集到输出中。在这种情况下,它也会更新keyHasBeenSeen的值为true。

这种访问和更新keyed-partitioned状态的机制看起来似乎很神奇,因为在我们Deduplicator实现中该键不是显式可见的。当Flink运行时调用RichFlatMapFunction的open方法时并没有事件,此时上下文中也没有key。但当调用flatMap方法时,正在处理事件的key是可用的,并且key在后台用于确定操作Flink的状态后端中的哪一个entry。

当部署到分布式集群时,将有很多Deduplicator实例,每个实例负责整个键空间的不相交子集。因此,当您看到ValueState的单个条目时,例如

ValueState keyHasBeenSeen;

请注意理解,这不仅代表单个布尔值,而且代表分布式的,分片的键/值存储。

清除状态

上面的示例存在一个可能的问题:如果key空间是无界的,将会发生什么?Flink将使用在某个地方为每个不同key的实例存储一个Boolean实例。如果有一组有限的keys,那会很好,但是在keys无限增长的应用程序中,有必要清除不再需要的keys的状态。这是通过在状态对象上调用clear()来完成的,如下所示:

keyHasBeenSeen.clear()

您可能要在某个key闲置一段时间后执行此操作。在事件驱动的应用程序部分中了解关于ProcessFunction的内容时,您将看到如何使用Timers来执行此操作。

还可以使用状态生存时间(TTL)选项进行配置,该选项可以使用状态描述符进行配置,该描述符指定何时自动清除老的密钥状态。

也可以在非keyed上下文中管理状态。有时称为算子状态。所涉及的接口稍有不同,并且由于non-keyed状态的用户定义函数是不常见,因此此处不进行介绍。此功能最常用于源和接收器的实现中。

连接流

有时,并不是像这样应用预定义的转换:

你想能够动态修改转换的某些方面-通过流中的阈值,规则或其他参数。Flink中支持此功能的模式被称为“连接流”,其中单个运算符具有两个输入流,如下所示:

被连接的流还可以用于实现流join。

在此示例中,名称为control的流用于指定从streamOfWords流中过滤掉的单词。一个名称为ControlFunction的 RichCoFlatMapFunction应用于连接流以完成此操作。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
    DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);

    control
        .connect(datastreamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

请注意,所连接的两个流必须以兼容的方式进行keyed设置。keyBy的作用是对流的数据进行分区,并且在连接keyed流时,必须以相同的方式对它们进行分区。这样可以确保两个流中具有相同key的事件都发送到同一实例。例如,这使得可以将两个流join到一个key上。

在这种情况下,两个流都是类型DataStream,并且两个流都由字符串作为键。正如将在下面看到的,这个RichCoFlatMapFunction是在keyed状态下存储一个布尔值,并且该布尔值由两个流共享。

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;

    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }

    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }

    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

一个RichCoFlatMapFunction是FlatMapFunction的一种,可以应用于一对连接流,它可以访问丰富的函数接口。这意味着可以将其设置为有状态。

名称为blocked的Boolean被用来记住在control流上所提及的键(在这个例子中是单词),并且记住从streamOfWords流过滤掉出的那些词。这是键控状态,并且在两个流之间共享,这就是两个流必须共享相同键空间的原因。

flatMap1和flatMap2被Flink运行时调用,Flink带有来自每个连接流的元素-在这个例子下,来自control流的元素被传入入flatMap1,来自streamOfWords的元素被传递到flatMap2。这取决于control.connect(datastreamOfWords)代码对两个流连接的顺序。

重要的是要认识到flatMap1和 flatMap2回调的调用顺序是无法控制的。这两个输入流彼此竞争,并且Flink运行时将就消费一个或另一个流中的事件做它想要做的功能。如果时间和/或顺序很重要,您可能会发现有必要在管理的Flink状态下缓冲事件,直到您的应用程序准备好处理它们。(注意:如果您真的很绝望,则可以使用实现InputSelectable 接口的自定义运算符,对双输入算子消费输入元素的的顺序进行一些有限的控制 。)

动手

本部分附带的动手练习是 乘车和票价练习

进一步阅读