Kafka是一个分布表示实时数据流平台,可独立部署在单台服务器上,也可部署在多台服务器上构成集群。它提供了发布与订阅的功能,用户可以发送数据到Kafka集群中,也可以从Kafka集群中读取数据。之前在Kafka 2.8.0版本时,Kafka社区提出了KRaft协议的概念,现在社区发布了Kafka 3.0,里面涉及优化和新增了很多功能,其中就包含KRaft协议的改机。今天,笔者就给大家介绍一下Kafka 3.0新增了哪些特性以及优化了哪些功能。
在 Kafka 3.0 中包含了许多重要的新功能,其中比较显著的变化如下所示:
在Kafka 3.0中,社区对于Zookeeper的版本已经升级到3.6.3了,其中我们可以预览 KRaft 模式,但是无法从 2.8 或者更早的版本升级到该模式。许多实现依赖 jar 现在在运行时类路劲中可用,而不是在编译和运行时类路劲中。升级后的编译错误可以通过显示添加缺少的依赖 jar 或更新应用程序以不使用内部类来修复。
消费者配置的默认值 session.timeout.ms 从10 秒增加到了45 秒,而Broker配置 log.message.format.version 和 Topic 配置 message.format.version 已经被启用。两种配置的值始终假定为 3.0 或者更高,通过 inter.broker.protocol.version 来配置。如果设置了 log.message.format.version 或者 message.format.version 建议在升级到 3.0的同时清理掉这两个属性,同时设置 inter.broker.protocol.version 值为 3.0 。
Streams API 删除了在 2.5.0 或者更早版本中弃用的所有弃用 API,Kafka Streams 不再对“connect:json”模块有编译时的依赖,依赖此传递依赖项的项目必须明确声明它。
现在,通过指定的自定义主体构建起实现 principal.builder.class 现在必须实现 KafkaPrincipalSerde 接口以允许Broker 之间的转发。另外,一些过时的类,方法和工具以及从clients、connect、core、和tools模块进行了删除。
该Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被弃用。请使用 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)来替换,ConsumerGroupMetadata 可以通过检索KafkaConsumer#groupMetadata()更强大的语义。需要注意的是,完整的消费者组元数据集只有 Brokers 或 2.5 或更高版本才能支持,因此你必须升级你的 Kafka 集群以获得更强的语义。否则,你可以通过new ConsumerGroupMetadata(consumerGroupId)与较老版本的Broker进行交互。
连接器中 internal.key.converter 和 internal.value.converter 属性已被完全删除。自版本 2.0.0 起,不推荐使用这些 Connect 工作器属性。现在被硬编码为使用 schemas.enable 设置为的 JSON 转换器false。如果你的集群一直在使用不同的内部键或值转换器,你可以按照官网文档中概述的迁移步骤,将你的 Connect 集群安全地升级到 3.0。 基于 Connect 的 MirrorMaker (MM2) 包括对支持的更改IdentityReplicationPolicy,无需重命名 Topic 即可启用复制。DefaultReplicationPolicy默认情况下仍然使用现有的,但可以通过 replication.policy 配置属性启用身份复制 。这对于从旧版 MirrorMaker (MM1) 迁移的用户,或者对于不希望 Topic 重命名的具有简单单向复制拓扑的用例特别有用。请注意IdentityReplicationPolicy与 DefaultReplicationPolicy 不同,无法根据 Topic 名称阻止复制循环,因此在构建复制拓扑时要注意避免循环。
虽然 internal.key.converter 和 internal.value.converter 中 Connect 工作器属性,以及以这些名称为前缀的所有属性都已弃用,但是有时候用户仍会尝试使用这些属性进行调试,在与未弃用的Key 和 Value转化器相关的属性意外混淆后,或者只是对其进行盲目的配置后,进行调试。这些实验的结果可能会产生不好的后果,配置了新内保转换器却无法读取具有较旧内部转换器的内保 Topic 数据,这最多会导致偏移量和连机器配置的丢失。
以下连接属性会将被删除:
Connect 的行为就好像上面没有提供一样。具体来说,对于它的键和值转换器,它将使用开箱即用的 JsonConverter,配置为 schemas.enable 属性值为 false 。
运行未使用JsonConverter 并对 schemas.enable 设置 false 的 Connect 集群用户,可以按照以下步骤将其 Connect 集群升级到 3.0:
停止集群上的所有工作线程
对于每个内部主题(配置、偏移量和状态):
重新配置每个 Connect worker 以使用步骤 2 中新创建的内部主题
启动集群上的所有worker
在本次 Kafka 3.0 版本中新增了以下功能:
之前在核心 Kafka 产品中引入了 Headers,在 Kafka Connect Framework 中公开它们将是有利的。Kafka 的 Header 是带有二进制值的简单名称,而 Connect API 已经有一个非常有用的层来处理不同类型的数据。Connect 的 Header 支持应该使用像 Kafka 这样的字符串名称,但使用与 Connect 记录键和值相同的类型来表示值。这将提供与 Connect 框架的其余部分的一致性,并使连接器和转换能够轻松地访问、修改和创建记录上的 Header。
Kafka 将 Header 定义为具有字符串名称和二进制值,但 Connect 将使用用于记录键和值的相同机制来表示 Header 值。每个 Header 值可能有一个对应的 Schema,允许连接器和转换以一致的方式处理 Header 值、记录键和记录值。Connect 将定义一种 HeaderConverter 机制以类似于Converter框架的方式序列化和反序列化标头值 ,这样现有的 Converter实现也可以实现 HeaderConverter. 由于来自不同供应商的连接器和转换可能被组合到单个管道中,因此不同的连接器和转换可以轻松地将 Header 值从原始形式转换为连接器和/或转换期望的类型,这一点很重要。
注意:
为了简洁和清晰,显示的代码不包括 JavaDoc,但提议的更改确实包括所有公共 API 和方法的 JavaDoc。
org.apache.kafka.connect.Header 将添加一个新接口并用作记录上单个标头的公共 API。该接口为键、值和值的模式定义了简单的 getter。这些是不可变对象,还有一些方法可以创建Header具有不同名称或值的新对象。代码片段如下所示:
package org.apache.kafka.connect.header;
public interface Header {
// Access the key and value
String key(); // never null
Schema schema(); // may be null
Object value(); // may be null
// Methods to create a copy
Header with(Schema schema, Object value);
Header rename(String key);
}
org.apache.kafka.connect.Headers 还将添加一个新接口并用作记录标题有序列表的公共 API。这是在 Kafka 客户端的 org.apache.kafka.common.header.Headers接口之后作为标题的有序列表进行模式化的,其中允许多个具有相同名称的标题。Connect Headers接口定义了Header按顺序和/或按名称访问各个 对象以及获取有关Header对象数量的信息的方法 。它还定义了Header使用各种签名来添加、删除和保留 对象的方法,这些签名将易于连接器和转换使用。由于多个Header对象可以具有相同的名称,因此转换需要一种简单的方法来修改和/或删除现有Header对象, apply(HeaderTransform) 并且apply(String, HeaderTransform) 方法可以轻松使用自定义 lambda 函数来执行此操作。代码片段如下所示:
package org.apache.kafka.connect.header;
public interface Headers extends Iterable
// Information about the Header instances
int size();
boolean isEmpty();
Iterator<Header> allWithName(String key);
Header lastWithName(String key);
// Add Header instances to this object
Headers add(Header header);
Headers add(String key, SchemaAndValue schemaAndValue);
Headers add(String key, Object value, Schema schema);
Headers addString(String key, String value);
Headers addBoolean(String key, boolean value);
Headers addByte(String key, byte value);
Headers addShort(String key, short value);
Headers addInt(String key, int value);
Headers addLong(String key, long value);
Headers addFloat(String key, float value);
Headers addDouble(String key, double value);
Headers addBytes(String key, byte\[\] value);
Headers addList(String key, List<?> value, Schema schema);
Headers addMap(String key, Map<?, ?> value, Schema schema);
Headers addStruct(String key, Struct value);
Headers addDecimal(String key, BigDecimal value);
Headers addDate(String key, java.util.Date value);
Headers addTime(String key, java.util.Date value);
Headers addTimestamp(String key, java.util.Date value);
// Remove and/or retain the latest Header
Headers clear();
Headers remove(String key);
Headers retainLatest(String key);
Headers retainLatest();
// Create a copy of this Headers object
Headers duplicate();
// Apply transformations to named or all Header objects
Headers apply(HeaderTransform transform);
Headers apply(String key, HeaderTransform transform);
interface HeaderTransform {
Header apply(Header header);
}
}
每条 Kafka 消息都包含零个或多个标头名称-值对,因此 Connect 记录类将被修改为具有Headers可以就地修改的非空对象。现有的 ConnectRecord 抽象类是两个基类 SourceRecord和 SinkRecord,并且将被改变为具有新的 headers填充字段 ConnectHeaders对象。所有现有构造函数和方法的签名都将保持不变以保持后向兼容性,但现有构造函数将headers使用ConnectHeaders对象填充新字段。而且, toString(), hashCode()和 equalTo(Object)方法将改为使用新的 headers领域。
一个新的构造函数和几个新方法将被添加到这个现有的类中,代码片段如下所示:
package org.apache.kafka.connect.connector;
public abstract class ConnectRecord
/* The following will be added to this class */
private final Headers headers;
public ConnectRecord(String topic, Integer kafkaPartition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp, Iterable
this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);
if (headers == null) {
this.headers = new ConnectHeaders();
} else if (headers instanceof ConnectHeaders) {
this.headers = (ConnectHeaders)headers;
} else {
this.headers = new ConnectHeaders(headers);
}
}
public Headers headers() {
return headers;
}
public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema,
Object key, Schema valueSchema, Object value, Long timestamp,
Iterable
}
现有的 SourceRecord类将被修改以添加一个新的构造函数并实现附加 newRecord(…)方法。同样,所有现有构造函数和方法的签名将保持不变以保持向后兼容性。代码片段如下所示:
package org.apache.kafka.connect.source;
public class SourceRecord extends ConnectRecord
/\* The following will be added to this class \*/
public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key,
Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
}
@Override
public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers);
}
}
同样,SinkRecord 将修改现有 类以添加新的构造函数并实现附加 newRecord(…) 方法。同样,所有现有构造函数和方法的签名将保持不变以保持向后兼容性。代码片段如下所示:
package org.apache.kafka.connect.sink;
public class SinkRecord extends ConnectRecord
/\* The following will be added to this class \*/
public SinkRecord(String topic, int partition,
Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
Long timestamp, TimestampType timestampType, Iterable<Header> headers) {
super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
this.kafkaOffset = kafkaOffset;
this.timestampType = timestampType;
}
@Override
public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
Long timestamp, Iterable<Header> headers) {
return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
}
}
本次更新中添加了一个新 org.apache.kafka.connect.storage.HeaderConverter 接口,该org.apache.kafka.connect.storage.Converter接口在现有接口的基础上进行了模式化, 但具有特定于 Header 的方法名称和签名。代码片段如下所示:
package org.apache.kafka.connect.storage;
public interface HeaderConverter extends Configurable, Closeable {
/\*\*
\* Convert the header name and byte array value into a {@link Header} object.
\* @param topic the name of the topic for the record containing the header
\* @param headerKey the header's key; may not be null
\* @param value the header's raw value; may be null
\* @return the {@link SchemaAndValue}; may not be null
\*/
SchemaAndValue toConnectHeader(String topic, String headerKey, byte\[\] value);
/\*\*
\* Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation.
\* @param topic the name of the topic for the record containing the header
\* @param headerKey the header's key; may not be null
\* @param schema the schema for the header's value; may be null
\* @param value the header's value to convert; may be null
\* @return the byte array form of the Header's value; may be null if the value is null
\*/
byte\[\] fromConnectHeader(String topic, String headerKey, Schema schema, Object value);
/\*\*
\* Configuration specification for this set of header converters.
\* @return the configuration specification; may not be null
\*/
ConfigDef config();
}
需要注意的是,不同的是 Converter,新 HeaderConverter接口扩展了 Configurable 现在对于可能具有附加配置属性的 Connect 接口通用的接口。
现有实现 Converter 也可能实现 HeaderConverter,并且ConverterConnect 中的所有三个现有 实现都将相应地更改以通过序列化/反序列化 Header 值来实现这个新接口,类似于它们序列化/反序列化键和值的方式:
HeaderConverter 将添加一个新实现来将所有内置原语、数组、映射和结构与字符串表示形式相互转换。与StringConverter使用 toString()方法的不同 SimpleHeaderConverter,除了不带引号的简单字符串值之外, 使用类似 JSON 的表示形式表示基本类型、数组、映射和结构。这种形式直接对应于许多开发人员认为将值序列化为字符串的方式,并且可以 SimpleHeaderConverter解析这些任何和所有这样的值,并且大部分时间来推断正确的模式。因此,这将用于HeaderConverterConnect 工作程序中使用的默认值 。
下表描述了SimpleHeaderConverter将如何持久化这些值,表格如下:
类型
描述
例子
BOOLEAN
true或者false
BYTE_ARRAY
字节数组的Base64编码字符串
INT8
Java字节的字符串表示形式
INT16
Java Short的字符串表示形式
INT32
Java Int的字符串表示形式
INT64
Java Long的字符串表示形式
FLOAT32
Java 浮点数的字符串表示形式
FLOAT64
Java Double的字符串表示形式
STRING
字符串的UTF-8表示
ARRAY
数组的类似 JSON 的表示形式。数组值可以是任何类型,包括基本类型和非基本类型。
MAP
类似 JSON 的表示形式。尽管大多数正确创建的映射都具有相同类型的键和值,但也支持具有任何键和值的映射。映射值可以是任何类型,包括基本类型和非基本类型。
{ "foo": "value", "bar": "strValue", "baz": "other" }
STRUCT
类似 JSON 的表示形式。Struct 对象可以序列化,但反序列化时将始终解析为映射,因为模式不包含在序列化形式中。
{ "foo": true, "bar": "strValue", "baz": 1234 }
DECIMAL
对应的字符串表示java.math.BigDecimal。
TIME
IOS-8601 时间表示,格式为“HH:mm:ss.SSS'Z'”。
16:31:05.387UTC
DATE
日期的 ISO-8601 表示,格式为“YYYY-MM-DD”。
2021-09-25
TIMESTAMP
时间戳的 ISO-8601 表示,格式为“YYYY-MM-DD'T'HH:mm:ss.SSS'Z'”。
2021-09-25T 16:31:05.387UTC
Connect 工作器需要配置为使用 HeaderConverter 实现,因此header.converter 将定义一个名为的附加工作器配置 ,默认为 SimpleHeaderConverter. 具有相同名称和默认值的类似配置属性将添加到连接器配置中,允许连接器覆盖工作程序的 Header 转换器。请注意,每个连接器任务都有自己的标头转换器实例,就像键和值转换器一样。
每个 Header 都有一个可由接收器连接器和简单消息转换使用的值。但是,标头值的类型首先取决于标头的创建方式以及它们的序列化和反序列化方式。将添加一组新的转换实用程序方法,使 SMT 和接收器连接器可以轻松地将标头值转换为易于使用的类型。这些转换可能需要原始架构和值。与字符串之间的转换使用与上述相同的机制SimpleHeaderConverter。
例如,SMT 或接收器连接器可能期望标头值为 long,并且可以使用这些实用方法来转换任何数值(例如,int、short、String、BigDecimal 等)。或者,接收器连接器可能需要 Timestamp 逻辑数据类型,因此它可以使用该 Values.convertToTimestamp(s,v) 方法从时间戳或日期的任何 ISO-8601 格式字符串表示转换,或表示为 long 或字符串的过去纪元的毫秒数。
这些实用方法可用于 Header 值或键、值或结构、数组和映射中的任何值。代码片段如下所示:
package org.apache.kafka.connect.data;
public class Values {
// All methods return null when value is null, and throw a DataException
// if the value cannot be converted to the desired type.
// If the value is already the desired type, these methods simply return it.
public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {...}
public static Byte convertToByte(Schema schema, Object value) throws DataException {...}
public static Short convertToShort(Schema schema, Object value) throws DataException {...}
public static Integer convertToInteger(Schema schema, Object value) throws DataException {...}
public static Long convertToLong(Schema schema, Object value) throws DataException {...}
public static Float convertToFloat(Schema schema, Object value) throws DataException {...}
public static Double convertToDouble(Schema schema, Object value) throws DataException {...}
public static String convertToString(Schema schema, Object value) {...}
public static java.util.Date convertToTime(Schema schema, Object value) throws DataException {...}
public static java.util.Date convertToDate(Schema schema, Object value) throws DataException {...}
public static java.util.Date convertToTimestamp(Schema schema, Object value) throws DataException {...}
public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) throws DataException {...}
// These only support converting from a compatible string form, which is the same
// format used in the SimpleHeaderConverter described above
public static List<?> convertToList(Object value) {...}
public static Map<?, ?> convertToMap(Object value) {...}
// Only supports returning the value if it already is a Struct.
public static Struct convertToStruct(Object value) {...}
}
在 Kafka 3.0 中优化和调整了以下内容:
在 Kafka 3.0 中修复了如下BUG:
在 Kafka 3.0 中的开发任务如下:
Kafka 3.0 的发布标志着社区对 Kafka 项目迈向了一个新的里程牌。另外,感谢Kafka PMC对Kafka Eagle监控系统的认可,为了维护Apache社区的商标权益,现在对Kafka Eagle正式改名为EFAK(Eagle For Apache Kafka),EFAK会持续更新迭代优化,为大家管理Kafka集群和使用Kafka应用提供便利,欢迎大家使用EFAK,也可以到Github或者EAFK官网上关注 EFAK 的最新动态。
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。
TRANSLATE with x
English
TRANSLATE with
COPY THE URL BELOW
Back
EMBED THE SNIPPET BELOW IN YOUR SITE
Enable collaborative features and customize widget: Bing Webmaster Portal
Back
手机扫一扫
移动阅读更方便
你可能感兴趣的文章