5.3、Flink流处理(Stream API)- Connector(连接器) 之 Apache Kafka Connector
阅读原文时间:2021年04月26日阅读:1

目录

安装Kafka

Kafka 1.0.0+ Connector

兼容性

从0.11迁移到通用的 Kafka Connector上

用法

Kafka Consumer

反序列化类 DeserializationSchema

消费者开始消费位置配置

Kafka 消费和容错

Kafka消费者主题及Partition Discovery

Partition discovery

Topic discovery

Kafka 消费者偏移量提交时的配置

Kafka 消费者和时间戳 Extraction/Watermark Emission

Kafka Producer

Kafka生成器分区方案

Kafka 生产者和容错

Kafka 0.8

Kafka 0.9 and 0.10

Kafka 0.11 and newer

Using Kafka timestamps and Flink event time in Kafka 0.10

Kafka Connector metrics

Enabling Kerberos Authentication (for versions 0.9+ and above only)

排错

数据丢失

未知 Topic 或 Partition 异常


Flink 提供了特殊的 Kafka 连接器用来读写来自 Kafka Topic 的数据。Flink Kafka消费者与Flink的 Checkpoint 机制集成,以提供精确的一次处理语义。为了精确一次语义,Flink 并不完全依赖 Kafka 的消费者组偏移跟踪,而是在内部跟踪和检查这些 offset。

需要提供一个 Maven 包,对于大多用户,使用的是 FlinkKafkaConsumer08 (flink-connector-kafka 的一部分):

Maven Dependency

Supported since

Consumer and
Producer Class name

Kafka version

Notes

flink-connector-kafka-0.8_2.11

1.0.0

FlinkKafkaConsumer08
FlinkKafkaProducer08

0.8.x

在内部使用 Kafka 的 SimpleConsumer API。偏移量由 Flink 提交给 ZK。

flink-connector-kafka-0.9_2.11

1.0.0

FlinkKafkaConsumer09
FlinkKafkaProducer09

0.9.x

Uses the new Consumer API Kafka.

flink-connector-kafka-0.10_2.11

1.2.0

FlinkKafkaConsumer010
FlinkKafkaProducer010

0.10.x

这个连接器支持 Kafka 消息,它具有用于生产和消费的时间戳。

flink-connector-kafka-0.11_2.11

1.4.0

FlinkKafkaConsumer011
FlinkKafkaProducer011

0.11.x

Kafka 从 0.11.x 之后不支持 Scala 2.10。该连接器支持Kafka事务性消息传递,为生产者提供准确的一次语义。

flink-connector-kafka_2.11

1.7.0

FlinkKafkaConsumer
FlinkKafkaProducer

>= 1.0.0

这个通用的Kafka连接器试图跟踪Kafka客户机的最新版本。它使用的客户机版本可能在Flink版本之间发生变化。从Flink 1.9发行版开始,它使用Kafka 2.2.0客户机。现代Kafka客户机向后兼容代理版本0.10.0或更高版本。

然而对于 0.11.x 和 0.10.x,建议分别使用专用的 flink-connector-kafka-0.11_2.11 和 flink-connector-kafka-0.10_2.11。

导入下面的 maven 包,根据不同的 kafka 版本,修改 artifactId:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

注意:还需要配置以下依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>

 安装Kafka

  • 按照 Kafka 的 quickstart 中的说明下载 tar包并启动服务器(每次启动应用程序之前都需要启动 Zookeeper 和 Kafka Server)。
  • 如果 Kafka 和 Zookeeper 服务器运行在远程机器上,则 config/server.properties 属性文件中 advertised.host.name 必须设置为远程机器的IP地址。

Kafka 1.0.0+ Connector

从 Flink 1.7开始,有一个新的通用 Kafka 连接器,它不跟踪特定的 Kafka 主版本。相反,它跟踪的是 Flink 发行时最新版本的Kafka。

如果您的 Kafka 版本是1.0.0或更新版本,应该使用这个 Kafka 连接器。如果使用 Kafka 的旧版本(0.11、0.10、0.9或0.8),则应该使用与版本对应的连接器。

兼容性

通过 Kafka 客户机 API 和代理的兼容性保证,通用 Kafka 连接器与较老的和较新的 Kafka 兼容。它与版本0.11.0或更新版本是否兼容,取决于 maven 依赖。

从0.11迁移到通用的 Kafka Connector上

为了执行迁移,查看升级程序和 Flink 版本

  • 整个过程使用 Flink 1.9 或更新版本。
  • 不要同时升级 Flink 和算子。
  • 确保在您的工作中使用的 Kafka Consumer 和/或 Kafka Producer 已经分配了唯一标识符(uid)
  • 使用 stop with savepoint 特性获取 savepoint (例如,使用 stop --withSavepoint )CLI命令。

用法

要使用通用Kafka连接器,请添加一个依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

然后实例化新的 source (FlinkKafkaConsumer)和sink (FlinkKafkaProducer)。除了从模块和类名中删除特定的 Kafka 版本之外,该 API 向后兼容 Kafka 0.11连接器。

Kafka Consumer

Flink 的 Kafka 消费者称为 FlinkKafkaConsumer08 (或Kafka 0.9.0.x的值为FlinkKafkaConsumer09)等等,或者只是Kafka >= 1.0.0版本的 FlinkKafkaConsumer )。它提供对一个或多个 Kafka 主题的访问。

构造函数接受以下参数:

  1. topic 名称或名称列表
  2. 用于反序列化 Kafka 数据的反序列化类 DeserializationSchema / KafkaDeserializationSchema
  3. KafkaConsumer 的配置,包含以下属性:
  • bootstrap.servers:用逗号分隔的 Kafka broker 列表
  • zookeeper.connect:用逗号分隔的 Zookeeper server 列表,Kafka0.8
  • group.id:消费者组id

样例:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

反序列化类 DeserializationSchema

Flink Kafka 消费者需要知道如何将Kafka中的二进制数据转换成 Java/Scala 对象。DeserializationSchema 允许用户指定这样的模式。为每个 Kafka 消息调用 T deserialize(byte[] message) 方法,并从 Kafka 传递值。

AbstractDeserializationSchema 开始说,它负责将生成的 Java/Scala 类型转化为Flink的系统类型。实现普通 DeserializationSchema 的用户需要自己实现 getProducedType(...) 方法。

为了访问 Kafka 消息的键、值和元数据,KafkaDeserializationSchema 有以下反序列化方法T deserialize(ConsumerRecord<byte[], byte[]> record)

为了方便起见,Flink提供了以下模式:

  1. TypeInformationSerializationSchema (和 TypeInformationKeyValueSerializationSchema):它根据 Flink 的 TypeInformation 创建模式。应用与Flink写或读的数据。此模式是与其他通用序列化方法相比性能较高。
  2. JsonDeserializationSchema (和 JSONKeyValueDeserializationSchema):它将序列化的 JSON 转换为 ObjectNode 对象,可以使用 objectNode.get("field").as(Int/String/...)() 访问数据。该对象包含所有KV的字段,以及一个 metedata 字段,包含了 offset/partition/topic 信息。
  3. AvroDeserializationSchema:使用Avro读取序列化之后的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))获取,也可以使用手动提供的模式(AvroDeserializationSchema.forGeneric(…))处理 GenericRecords。这个反序列化模式期望序列化的记录不包含嵌入式模式。
  • 还有一个版本的模式可用,可以在Confluent schema Registry中查找编写器的模式(用于编写记录的模式)。使用这些反序列化模式记录时,将使用从模式注册表检索并转换为静态提供的模式读取(通过ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or ConfluentRegistryAvroDeserializationSchema.forSpecific(...))。

要使用这个反序列化模式,必须添加以下附加依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.9.0</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro-confluent-registry</artifactId>
  <version>1.9.0</version>
</dependency>

当遇到一个损坏的消息,不能反序列化操作,有两种选择——要么抛出异常的 deserialize(...)方法,将导致工作失败重新启动,或返回 null 允许 Flink Kafka consumer 跳过损坏的消息。

注意:由于消费者的容错能力(有关详细信息,请参阅下面的部分),对损坏的消息执行失败将使消费者再次尝试反序列化消息。因此,如果反序列化仍然失败,消费者将陷入对已损坏消息不停止的重启和失败循环。

消费者开始消费位置配置

Flink Kafka Consumer 允许配置Kafka分区的消费开始位置:

样例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...

Flink Kafka Consumer 的所有版本都有上面的初始消费位置配置方法。

  • setStartFromGroupOffsets (default behaviour):开始从消费者组(group.id)读取分区。在Kafka brokers(或Kafka 0.8的Zookeeper)中提交的偏移量。如果无法为分区找到偏移量,将使用属性中的重置设置 auto.offset.reset
  • setStartFromEarliest() / setStartFromLatest():从最早/最近的记录开始。在这些模式下,Kafka中提交的偏移量将被忽略,不用做开始位置。
  • setStartFromTimestamp(long):从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作开始位置。如果分区的最新记录比时间戳更早,则只从最新记录读取分区。在此模式下,Kafka 中提交的偏移量将被忽略,不用作开始位置。

您还可以为每个分区自定义消费者应该从哪个偏移量开始:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

上面的示例将消费者配置为从主题myTopic的0、1和2分区的指定偏移量开始消费。偏移量值应该是消费者应该为每个分区读取的下一条记录。注意,如果消费者需要读取在提供的偏移量映射中没有指定偏移量的分区,那么它将返回到该特定分区的默认组偏移量行为(即使用 setStartFromGroupOffsets())。

注意,当作业从故障中自动恢复或使用 savepoint 手动恢复时,这些启动位置配置方法不影响启动位置。在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 savepoint 中的偏移量决定(有关 checkpointing 的信息,请参阅下面 Kafka 消费者和容错,以便为消费者启用容错)。

Kafka 消费和容错

启用 Flink 的 checkpointing 后,Flink Kafka 消费者将消费来自 topic 的记录,并以一致的方式定期检查所有 Kafka 偏移量以及其他操作。如果作业失败,Flink将把流程序恢复到最新 checkpoint 的状态,并从 checkpoint 中存储的偏移量开始重新使用Kafka的数据。

因此,设置 checkpoints 的间隔定义了程序在发生故障时最多需要返回多少偏移量。

要使用容错 Kafka 消费者,需要在执行环境中启用 checkpointing:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

要注意的是:只有在有足够的处理 solts 可用来重新启动拓扑时,Flink 才能重新启动拓扑。因此,如果拓扑由于 TaskManager 的丢失而失败,那么之后必须仍然有足够的 slots 可用。Flink on YARN 支持自动重启丢失的 YARN 容器。

如果没有启用 checkpointing,Kafka消费者者将定期向 Zookeeper 提交偏移量。

Kafka消费者主题及Partition Discovery

Partition discovery

Flink Kafka 消费支持动态创建的 Kafka 分区,并使用它们时 exactly-once 严格的保证。在初始检索分区元数据(即,当作业开始运行时)将从尽可能早的偏移量开始消费数。

默认情况下,partition discovery 是禁用的。要启用它,请在提供的属性配置中为 flink.partition-discovery.interval-millis 设置一个非负值,表示以毫秒为单位的时间间隔。

局限:当消费者在Flink 1.3之前从Flink版本的 savepoint 恢复时。在还原运行时不能启用 partition discovery。如果启用,恢复将失败,但有一个例外。在这种情况下,为了使用 partition discovery,请首先在Flink 1.3中取一个 savepoint。然后再还原。

Topic discovery

在更高的级别上,Flink Kafka 消费者还能够使用正则表达式基于主题名称的模式匹配 discovering topics。请看下面的例子:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("test-topic-[0-9]"),
  new SimpleStringSchema,
  properties)

val stream = env.addSource(myConsumer)
...

在上面的示例中,当作业开始运行时,消费者将订阅所有名称与指定正则表达式匹配的主题(以test-topic开头,以个位数结尾)。

要允许消费者在作业开始运行后发现动态创建的主题,请为flink.part -discovery. intervali -millis设置一个非负值。这允许消费者发现具有与指定模式匹配的新主题分区。

Kafka 消费者偏移量提交时的配置

Flink Kafka 消费者允许配置如何将偏移量提交回Kafka Brokers(或0.8中的Zookeeper)。

请注意:Flink Kafka 消费者并不依赖提交的偏移量来保证容错。提交的偏移量只是一种方法,用于公开消费者的进度以便进行监视。

配置偏移提交的方法不同,取决于是否为作业启用了 checkpointing。

  • Checkpointing disabled: Flink Kafka 消费者依赖于内部使用的 Kafka 客户端自动定期提交偏移量的能力。因此,要禁用或启用偏移提交,只需将 enable.auto.commit (或Kafka 0.8的 auto.commit.enable ) /  auto.commit.interval.ms 设置为适当值。
  • Checkpointing enabled:当 checkpoints 完成时,Flink Kafka 消费者将提交存储在 checkpointed 状态中的偏移量。这确保Kafka代理中提交的偏移量与 checkpointed 状态中的偏移量一致。用户可以通过调用消费者上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用偏移提交(默认情况下,为true)。

注意:在这个场景中,属性中的自动周期性偏移提交设置完全被忽略。

Kafka 消费者和时间戳 Extraction/Watermark Emission

在许多场景中,记录的时间戳(显式或隐式)嵌入到记录本身中。此外,用户可能希望定期或以不规则的方式发出 watermarks,例如基于Kafka流中包含当前 event-time watermark 的特殊记录。对于这些案例,Flink Kafka Consumer 允许指定 AssignerWithPeriodicWatermarks 或&nbsp;AssignerWithPunctuatedWatermarks。

可以指定自定义的或者预定义的 extractor/watermark emitter,然后通过一下方式配置给消费者:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
    .addSource(myConsumer)
    .print()

在内部,每个Kafka分区执行一个 assigner 实例。像这样指定一个 assigner,从Kafka读取的每一条记录,调用 extractTimestamp(T element, long previousElementTimestamp) 方法都会给每条记录分配一个时间戳,调用&nbsp;Watermark getCurrentWatermark() 或者&nbsp;Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) 方法确定当产生新的 watermark 时,使用哪个时间戳。

注意:如果 watermark assigner 依赖于从Kafka读取的记录来推进 watermark assigner(通常是这种情况),那么所有主题和分区都需要有连续的记录流。否则,整个应用程序的水印将无法前进,所有基于时间的操作(例如时间窗口或带有计时器的函数)也无法前进。一个空闲的Kafka分区会导致这种行为。Flink改进计划防止这种情况发生(查阅-英文版:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html),与此同时,一种可行解决方案是向所有已使用的分区发送心跳消息,这些分区将推进空闲分区的 watermarks。

Kafka Producer 

FlinkKafkaProducer的版本跟消费者的一样,具体使用见下:

val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer011[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)

stream.addSink(myProducer)

上面的示例演示了创建 Flink Kafka Producer 的基本用法,将流写入单个Kafka主题。还有其他的构造方法还提供了以下用法:

  • Providing custom properties:提供自定义属性配置,参考Kafka官方配置文档。
  • Custom partitioner:要将记录分配到特定分区,可以向构造函数提供 FlinkKafkaPartitioner 的自定义实现类。这个分区器将被流中的每个记录调用,以确定记录应该发送到目标主题的哪个确切分区。详细看下方。
  • Advanced serialization schema:与消费者类似,生产者还允许使用高级序列化模式KeyedSerializationSchema,该模式允许分别序列化键和值。它还允许覆盖目标主题,以便一个生产者实例可以向多个主题发送数据。

Kafka生成器分区方案

默认情况下,如果没有为 Flink Kafka Producer 指定自定义 partitioner,那么该生成器将使用 FlinkFixedPartitioner 将每个Flink Kafka Producer 的并行子任务映射到单个Kafka分区(即,接收子任务接收到的所有记录最终将位于相同的Kafka分区中)。

可以通过实现 FlinkKafkaPartitioner 类来实现自定义分区器。所有 Kafka 版本的构造函数都允许在实例化生成器时提供自定义分区器。

注意:分区器实现必须是可序列化的,因为它们将在 Flink 节点之间传输。此外,请记住分区器中的任何状态都将在作业失败时丢失,因为分区器不是生产者的 Checkpoint 状态的一部分。

也可以避免使用分区器,并简单的让 Kafka 根据 key 对记录进行分区(根据提供的序列化模式进行分区),为此在传值的时候给个 null,则默认使用 FlinkFixedPartitioner。

Kafka 生产者和容错

Kafka 0.8

在0.9之前,Kafka没有提供任何机制来保证至少一次或完全一次的语义。

Kafka 0.9 and 0.10

启用了 Flink 的 Checkpoint 后,FlinkKafkaProducer09 和 FlinkKafkaProducer010 至少可以提供精确一次语义的保证。

除了启用Flink的 Checkpoint 之外,也需要使用 setLogFailuresOnly(boolean) 和&nbsp;setFlushOnCheckpoint(boolean)

  • setLogFailuresOnly(boolean):默认为false,启用之后生产者只记录失败的情况,而不是捕获或抛出失败情况。就算没有写入到 Kafka topic 中,也算是成功,前提是关闭 at-least-once。
  • setFlushOnCheckpoint(boolean):默认为true,启用后,Flink的 Checkpoint 将等待Kafka在 Checkpoint 成功之前确认 Checkpoint 时的任何非法记录。这确保 Checkpoint 之前的所有记录都已写入Kafka。前提启用 at-least-once。

总之,Kafka producer 默认情况下至少为0.9和0.10版本提供精准一次语义保证,前提是 setLogFailureOnly 设置为 false, setFlushOnCheckpoint 设置为true。

注意:默认情况下,重试次数设置为“0”。这意味着,当 setLogFailuresOnly 被设置为 false 时,生产者会在出现错误时立即失败,包括 leader 的更改。该值默认设置为“0”,以避免重试导致目标 topic 中产生重复消息。对于大多数 borker 更改频繁的生产环境,我们建议将重试次数设置为较高的值。

注意:目前还没有 Kafka 的事务生成器,所以 Flink 不能保证准确地将数据一次传递到Kafka topic。 

Kafka 0.11 and newer

启用了 Flink checkpoint,FlinkKafkaProducer011(Kafka版本大于等于1.0.0)提供了精确一次语义的保证。

除了启用 Flink checkpoint 以外,也可以给传给&nbsp;FlinkKafkaProducer011 不同的参数来选择三种不同的操作模式:

  • Semantic.NONE:Flink 不保证任何事情,产生的数据可能丢失,也可能重复发送。
  • Semantic.AT_LEAST_ONCE (默认值):类似于 FlinkKafkaProducer010 中的 setFlushOnCheckpoint(true)。这保证不会丢失任何记录(但有可能重复发送)。
  • Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。当使用事务写 Kafka 时,不要忘记给接 Kafka 的应用程序设置 isolation.level(read_committed 或 read_uncommitted,后者是默认值)

说明:

Semantic.EXACTLY_ONCE 模式依赖于提交事务,checkpoint 之前、从 checkpoint 恢复记录之后。如果 Flink 应用程序失败以后和完成重启之间的时间超过 Kafka 的事务允许的时间范围,就会出现数据丢失(Kafka 将自动终止超时的事务)。请跟据失败重启时间配置事务的超时。

transaction.max.timeout.ms 默认为15分钟,且不允许事务超时时间大于改值。FlinkKafkaProducer011 设置&nbsp;transaction.timeout.ms 默认为1个小时,使用&nbsp;Semantic.EXACTLY_ONCE 模式之前应该适当增加&nbsp;transaction.max.timeout.ms 的值。

read_committed 模式的&nbsp;KafkaConsumer,未完成的事务将会屏蔽数据读取,对于给定的 Kafka topic 之前未完成的事务。换句话说,在以下一系列事件之后:

  1. 事务1开启并写入一些数据
  2. 事务2开启并写入一些数据
  3. 用户提交了事务2

即使已经提交了来自事务2的记录,在提交或中止transaction1之前,他们也不会被消费。有两个影响:

  • 首先,在Flink应用程序的正常工作期间,用户可以指定消息记录生产的可消费延迟,等于完成检查点之间的平均时间。
  • 其次,在Flink应用程序失败的情况下,应用程序重新启动或经过配置的事务超时时间之前,这个应用程序正在写数据的 topic 将被阻塞以供消费者消费。这句话只适用于多个代理/应用程序写同一个 Kafka topic 的情况。

注意:Semantic.EXACTLY_ONCE 模式为每个&nbsp;FlinkKafkaProducer011 实例分类固定大小的池。producer 和 checkpoint 是严格一对一的关系,如果并发的 checkpoint 的数量大于池的大小,FlinkKafkaProducer011 将抛出异常并导致程序失败,所以需要合理配置池的大小和并发 checkpoint 的大小。

注意:Semantic.EXACTLY_ONCE 避免了任何会阻止消费者消费数据的遗留事务。但是,Flink程序在第一次 checkpoint 之前失败,重启程序之后,系统中没有之前的池大小的信息。因此,在第一个 checkpoint 完成之前指定比 FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR 的值更大值的&nbsp;Flink 程序是不安全的, 

Using Kafka timestamps and Flink event time in Kafka 0.10 

从 Apache Kafka 0.10 版本开始, Kafka 的消息可以带有时间戳,指示事件发生的时间或消息被写入 Kafka broker 的时间。 

如果将 Flink 中的时间特性设置为 TimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)),FlinkKafkaConsumer010 将发出带有时间戳的记录。

Kafka consumer 不会发出 watermarker。使用 assignTimestampsAndWatermarks 方法进行“Kafka consumer and Timestamp Extraction/Watermark Emission”中描述的相同机制也适用于 watermarkers 的产生。

使用Kafka中的时间戳时,不需要定义时间戳提取器。extractTimestamp() 方法的 previousElementTimestamp 参数包含 Kafka 消息携带的时间戳。

Kafka消费者的时间戳提取器应该是这样的:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

如果设置了 setWriteTimestampToKafka(true), FlinkKafkaProducer010 只会发出记录时间戳。

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);

Kafka Connector metrics

Flink’s Kafka connectors 通过 Flink metrics 提供一些 metrics,用来分析连接器的行为。生产者通过 Flink metrics 系统为所有支持的版本导出 Kafka 内部的 metrics。消费者导出从Kafka 0.9版本开始的所有 metrics。Kafka 文档列出了文档中导出的所有 metrics。

除了这些 metrics 之外,所有消费者还公开每个主题分区的当前偏移量和提交偏移量。current-offsets 是指分区中当前的 offset,指的是我们成功检索和发出的最后一个元素的偏移量。committed-offsets 是最后提交的 offset。Flink 中的 Kafka 消费者将偏移量提交回 Zookeeper (Kafka 0.8)或Kafka broker(Kafka 0.9+)。如果禁用了 checkpoint,offset 将定期提交。使用检查点,一旦流拓扑中的所有算子都确认创建了状态检查点,提交就会发生。这为用户提供了至少一次的语义。对于 Flink 的 offset,系统提供精确一次的语义。

提交给ZK或代理的 offset 也可以用来跟踪 Kafka 消费者的消费进度。每个分区中提交的偏移量和最近偏移量之间的差异称为消费者滞后。如果 Flink 拓扑消耗来自主题的数据的速度比添加新数据的速度慢,那么延迟将会增加,消费者将会落后。对于大型生产部署,我们建议监视该指标,以避免增加延迟。

Enabling Kerberos Authentication (for versions 0.9+ and above only)

Flink 提供了 Kafka connector 对 Kerberos 的配置项,只需要在 Flink 的 flink-conf.yaml 配置文件中配置,如下所示:

1、通过设置以下内容来配置Kerberos凭证

  • security.kerberos.login.use-ticket-cache:默认为 true,Flink 将尝试在 kinit 管理的票据缓存中使用 Kerberos 凭证。注意,当在部署在 YARN 上的 Flink 作业中使用 Kafka connector 时,使用票据缓存的 Kerberos 授权将不起作用。使用 Mesos 进行部署时也是如此,因为 Mesos 部署不支持使用票证缓存进行授权。
  • security.kerberos.login.keytab and security.kerberos.login.principal:要使用 Kerberos keytabs,请为这两个属性设置值。

2、追加 KafkaClient 到 security.kerberos.login.contexts:在使用 Kafka 凭证来加载 Kafka 上下文时,Flink 提供 Kerberos 票据。

启用基于 kerberos 的 Flink 安全性之后,您可以使用 Flink Kafka Consumer 或 Producer 对 Kafka 进行身份验证,只需在传递给内部 Kafka 客户机的属性配置中包含以下两个设置:

  • security.protocol to SASL_PLAINTEXT (default NONE):用于与Kafka代理通信的协议。当使用独立的Flink部署时,还可以使用SASL_SSL。
  • sasl.kerberos.service.name to kafka (default kafka):它的值应该与用于Kafka代理配置的sasl.kerberos.service.name匹配。客户机和服务器配置之间的服务名称不匹配将导致身份验证失败。

有关以上的详细配置,请查阅Flink部署运维(Deployment & Operations)- Configuration(Flink配置项)

排错

注意:Flink 只对 Kafka 的 Consumer 和 Producer 做了封装,如果出现了问题,可以通过升级 Kafka Brokers、重新配置Kafka Brokers 或在中重新配置 Kafkanconsumer 或 KafkaProducer 来解决。下面列出了常见问题的示例。

数据丢失

依赖 Kafka 的配置,即使在 Kafka ack之后,仍然可能有数据丢失。特别要记住Kafka配置中的以下属性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

上述选项的默认值很容易导致数据丢失。详细请查阅:Kafka源码解读——kafka配置文件_Configs

未知 Topic 或 Partition 异常

错误原因有可能是新的 leader 选举正在进行,例如 Kafka broker 重启期间或重启之后。这是一个可检索的异常,因此 Flink job 应该能够重新启动并恢复正常作业。它还可以通过更改 Producer 设置中的重试属性来规避。然而可能导致消息重新排序,可以通过设置 max.in.flight.requests.per.connection 为 1 来绕过它。