提起事务,我们第一印象可能就是ACID,需要满足原子性、一致性、事务隔离级别等概念,那kafka的事务能做到什么程度呢?我们首先看一下如何使用事务
Producer端代码如下
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
ProducerRecord<String, String> kafkaMsg1 = new ProducerRecord<>(TOPIC1, "msg val");
producer.send(kafkaMsg1);
ProducerRecord<String, String> kafkaMsg2 = new ProducerRecord<>(TOPIC2, "msg val");
producer.send(kafkaMsg2);
producer.commitTransaction();
Consumer端不需要做特殊处理,跟消费普通消息一样
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
}
}
那需要如何配置呢?
Producer
Consumer
transactional.id
事务ID,类型为String字符串,默认为空,客户端自定义,例如"order_bus"
isolation.level
事务隔离级别,默认为空,开启事务的话,需要将其设置为"read_committed"
enable.idempotence
消息幂等开关,true/false,默认为false,当配置了transactional.id,此项一定要设置为true,否则会抛出客户端配置异常
transaction.timeout.ms
事务超时时间,默认为10秒,最长为15分钟
当enable.idempotence
设置为true时,kafka会检查如下一些级联配置
配置项
内容要求
说明
acks
要求此配置项必须设置为all
响应必须要设置为all,也就是leader存储消息,并且所有follower也存储了消息后再返回,保证消息的可靠性
retries
> 0
因为幂等特性保证了数据不会重复,在需要强可靠性的前提下,需要用户设置的重试次数 > 0
max.in.flight.requests.per.connection
<= 5
此项配置是表明在producer还未收到broker应答的最大消息批次数量。该值设置的越大,标识可允许的吞吐越高,同时也越容易造成消息乱序
相关配置约束: org.apache.kafka.clients.producer.ProducerConfig#postProcessAndValidateIdempotenceConfigs()
由此,可以出一张事务的概览图
一个简单的事务可能就是这样:
假设有2个消费端此时正在消费这两个topic对应的分区,在事务提交前,所有的事务消息对两个consumer均不可见,事务一旦提交,在同一时刻,consumer1可以看到a、b消息,consumer2可看到c消息(这里首先作个申明,显而易见,kafka实现的是分布式事务,既然是分布式事务就脱离不了CAP定理,而kafka的事务也只是做到了最终一致性,后文还会详细展开)
那么整个事务是如何实现的呢?
如上图所示,整个事务流程分一下几个步骤:
initTransactions
beginTransaction
producer.send
commitTransaction
abortTransaction
当Producer发送N多条事务的话
而这里面很多步骤都是需要多个角色参与的,例如“事务初始化”,就需要Producer及Broker协同实现
事务初始化由Producer端触发,代码为
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
事务初始化经历了两个阶段:
两者是递进关系,步骤2是严格依赖步骤1的,下面的流程图标注了它们的调用关系
参与方:Producer、Broker
什么是TransactionCoordinator?
TransactionCoordinator与GroupCoordinator类似,其本质也是一个后端的broker,只是这个broker起到了针对当前事物的协调作用,所有事务操作都需要直接发送给这个指定的broker
刚开始的时候,Producer并不知道哪个broker是TransactionCoordinator,那么目标broker是如何选择出来的呢?
Producer虽然不知道Coordinato的地址,但是他有所有broker的链接串,因此初始化时,整体步骤如下:
__transaction_state
的默认分区数,该topic是kafka实现事务的关键,后文还会多次提及)获取对应的Partition,该Partition从属的Broker,即为TransactionCoordinator获取Partition代码如下: kafka.coordinator.transaction.TransactionStateManager#partitionFor()
def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
参与方:Producer、Coordinator
获取TransactionCoordinator后,便需要向其发送请求获取ProducerId及Epoch,对应的API为ApiKeys.INIT_PRODUCER_ID。可以认为ProducerId+Epoch是对事物型Producer的唯一标识,后续向broker发起的请求,也都需要携带这两个关键参数。这两个参数含义如下
参数
类型
含义
ProducerId
Long
从0开始,对应Producer端配置的TransactionId,他们存在映射关系,可以通过TransactionId来查询ProducerId;映射关系存储在kafka内部topic __transaction_state
中
Epoch
Short
从0开始,Producer每次重启,此项值都会+1;当超过short最大值后,ProducerId+1
比如当前的ProducerId为2000,Epoch为10,Producer重启后,ProducerId为2000不变,Epoch变为11;如果此时Broker端再次收到epoch为10的数据,那么将会认为是过期数据不予处理
由此可见ProducerId与Epoch是持久化在Broker端的,主要目的就是为了应对Coordinator宕机;接下来就要引出非常重要的一个kafka内部compact topic:__transaction_state
__transaction_state
是一个compact topic,即最新key对应的value内容会将旧值覆盖,可以简单将其看做一个KV存储
Key
Value
TransactionId
producerId
8
从0开始,依次递增
epoch
2
从0开始,依次递增
transactionTimeoutMs
4
事务超时时间,默认10秒,最大15分钟
transactionStatus
1
事务状态(
0-Empty 事务刚开始时init是这个状态
1-Ongoing
2-PrepareCommit
3-PrepareAbort
4-CompleteCommit
5-CompleteAbort
6-Dead
7-PrepareEpochFence
)
topicTotalNum
4
当前事务关联的所有topic总和
topicNameLen
2
topic长度
topicName
X
topic内容
partitionNum
4
partition的个数
partitionIds
X
例如有n个partition,X = n * 4,每个partition占用4 byte
transactionLastUpdateTimestampMs
8
最近一次事务操作的更新时间戳
transactionStartTimestampMs
8
事务启动的时间戳
这个Topic的可以让broker随时查看事务的当前状态,以及是否超时
相关代码 scala/kafka/coordinator/transaction/TransactionLog.scala#valueToBytes()
此步骤会让Broker向__transaction_state
中写入一条数据(由于当前Coordinator是通过分区数取模得到的,因此向topic写入数据是直接写入本地盘的,没有网络开销),事务状态为Empty
,同时向Producer返回ProducerId+Epoch。当前步骤在Broker端还有很多事务状态异常的判断,此处不再展开
参与方:Producer
代码示例
producer.beginTransaction();
注:此步骤Producer不会向Broker发送请求,只是将本地的事务状态修改为 State._IN_TRANSACTION_
Broker也并没有独立的步骤来处理事务启动,Broker在收到第一条消息时,才认为事物启动;那么Kafka为何要设计这样一个看起来很鸡肋的功能呢?直接发送消息不行么
一个正常的事务流程是这样的:
因为事务消息可能是发送多次的,每次通过producer.beginTransaction()
开启事务,可以使得代码更清晰,也更容易理解;因此多次发送的顺序会是这样
参与方:Producer、Broker
事务消息的发送是非常非常重要的环节,不论是Producer端还是Broker端,针对事务都做了大量的工作,不过在阐述核心功能前,还是需要对一些基础知识进行铺垫
与RocketMQ不同,kafka消息协议的组装是在Producer端完成的,kafka消息协议经历了3个版本(v0、v1、v2)的迭代,我们看一下现存3个版本的协议对比
然而V2版本做了相当大的改动,甚至可以说是“面目全非”
V2版本引入了Record Batch的概念,同时也引入了可变长存储类型(本文不再展开),同一个Producer的消息会按照一定的策略归并入同一个Record Batch中;如果两个Producer,一个开启事务,一个关闭事务,分别向同一个Topic的同一个Partititon发送消息,那么存在在Broker端的消息会长什么样呢?
可见,同一个Record Batch中的Producer id、epoch、消息类型等都是一样的,所以不存在同一个Batch中,既有事务消息,又有非事务消息;换言之,某个Batch,要么是事务类型的,要么是非事务类型的,这点相当重要,在Consumer端消费消息时,还要依赖这个特性。因此在Producer端,即便是同一个进程内的2个producer实例,向同一个Topic的同一个Partition,一个发送事务消息,一个发送普通消息,两者间隔发送,这时会发现Record Batch的数量与消息的数量相同,即一个Record Batch中只会存放一条消息
众所周知,kafka是有消息超时重试机制的,既然存在重试,那么就有可能存在消息重复
注:上述整个过程,Client的业务方并不知晓,重试逻辑由Producer内部控制,给业务方的感观便是消息发送了一份,却收到了两份数据
kafka要实现事务语义的话,消息重复肯定是接受不了的,因此保证消息幂等也就成了事务的前置条件。如何实现幂等呢,比较直观的思路便是给消息编号,这样Broker就可以判重了,事实上kafka也是这样做的;在Producer启动时,会进行初始化动作,此时会拿到(ProduceId+Epoch),然后在每条消息上添加Sequence字段(从0开始),之后的请求都会携带Sequence属性
如果存在重复的RecordBatch(通过produceId+epoch+sequence),那么Broker会直接返回重复记录,client收到后丢弃重复数据
scala/kafka/log/ProducerStateManager.scala#findDuplicateBatch()
如果Broker收到的RecordBatch与预期不匹配,例如比预期Sequence小或者大,都会抛出OutOfOrderSequenceException
异常
比预期Sequence小:这种请求就是典型的重复发送,直接拒绝掉并扔出异常
比预期Sequence大:因为设置了幂等参数后,max.in.flight.requests.per.connection
参数的设定最大值即为5,即Producer可能同时发送了5个未ack的请求,Sequence较大的请求先来到了,依旧扔出上述异常
处理重复数据的关键代码如下 kafka.log.ProducerStateEntry#findDuplicateBatch()
def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
if (batch.producerEpoch != producerEpoch)
None
else
batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
}
// Return the batch metadata of the cached batch having the exact sequence range, if any.
def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
val duplicate = batchMetadata.filter { metadata =>
firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
}
duplicate.headOption
}
处理Sequence过大或过小代码如下 kafka.log.ProducerAppendInfo#checkSequence()
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = {
if (producerEpoch != updatedEntry.producerEpoch) {
......
} else {
......
// If there is no current producer epoch (possibly because all producer records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence number
if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " +
s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
s"$currentLastSeq (current end sequence number)")
}
}
}
private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {
nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
}
然而单纯依靠消息幂等,真正能够实现消息不重复、消息全局幂等吗?答案是否定的,假定这样的一个前置条件: “Produer发送了一条幂等消息,在收到ACK前重启了”
因此消息幂等能只够保证在单会话(session)、单partition的场景下能保证消息幂等
参与方:Producer、Broker
Producer端在发送消息阶段,Producer与Broker的交互分两部分:
向当前事物的Coordinator发送添加Partiton的请求
向对应的分区发送消息
也是事务消息比较影响性能的一个点,在每次真正发送Record Batch消息之前,都会向Coordinator同步发送Partition,之后才会真正发送消息。而这样做的好处也显而易见,当Producer挂掉后,Broker是存储了当前事物全量Partition列表的,这样不论是事务提交还是回滚,亦或是事务超时取消,Coordinator都拥有绝对的主动权
贴少量关键源码(本人不太喜欢大篇幅粘贴源码,这样会破会行文的连贯性,相信读者也不会通过此文去翻看源码。不过在不影响阅读的前提下,本文还是会黏贴一些关键代码)
这里是消息确定了最终Partition后,向transactionManager注册
org/apache/kafka/clients/producer/KafkaProducer.java#doSend()
// Add the partition to the transaction (if in progress) after it has been successfully
// appended to the accumulator. We cannot do it before because the partition may be
// unknown or the initially selected partition may be changed when the batch is closed
// (as indicated by abortForNewBatch
). Note that the Sender
will refuse to dequeue
// batches from the accumulator until they have been added to the transaction.
if (transactionManager != null) {
transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
}
Sender线程构建add partition请求
org/apache/kafka/clients/producer/internals/Sender.java#maybeSendAndPollTransactionalRequest()
TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
if (nextRequestHandler == null)
return false;
在消息发送阶段,Coordinator的参与主要是记录当前事务消息所在的Parition信息,即更新topic __transaction_state
的状态,正如前文所述,__transaction_state
为compact类型,以下属性将会被更新
topicTotalNum
4
当前事务关联的所有topic总和
topicNameLen
2
topic长度
topicName
X
topic内容
partitionNum
4
partition的个数
partitionIds
X
例如有n个partition,X = n * 4,每个partition占用4 byte
transactionLastUpdateTimestampMs
8
最近一次事务操作的更新时间戳
题外话:如果Coordinator记录了某个Partition参与了事务,但却没有向该Partition发送事务消息,这样会有影响吗?
__transaction_state
中虽然记录了某个Partition参与了事务,但在事务提交阶段,只会向该Partition发送marker类型的控制消息,Consumer在收到controller类型的消息后会自动过滤,另外也不会影响当前Partition的LSO向前推进消息发送时,Broker做的很重要的一个工作是维护 LSO(log stable offset),一个Partition中可能存了多个事务消息,也有可能存储了很多非事务的普通消息,而LSO为第一个正在进行中(已经commit/abort的事务不算)的事务消息的offset
如上图:
因此LSO的位置就在第一个正在进行中的事务的首消息的offset。消息不断写入,Broker需要实时维护LSO的位置,而在LSO以下的位置的消息是不可以被标记为READ_COMMITED
的consumer消费的。
这里稍微引申一下Consumer端的逻辑,LSO标记之前的消息都可以被consumer看到,那么如上图,LSO之前有3条消息,2个a(事务取消),1个b(事务提交),consumer读到这3条消息后怎么处理呢?无非就是以下两种处理逻辑:
暂存在consumer端,直至读取到事务最终状态,再来判断是吐给业务端(事务成功),还是消息扔掉(事务取消)
实时判断当前消息是该成功消费还是被扔掉
具体采用哪种策略,我们在消息消费的章节再来展开
参与方:Producer、Broker
事务提交时Producer端触发的,代码如下
producer.commitTransaction();
事务提交对应的API为ApiKeys.END_TXN,Producer向Broker请求的入参为
transactionalId
事务id,即客户自定义的字符串producerId
producer id,由coordinator生成,递增epoch
由coordinator生成committed
true:commit false:abort可以看到,在事务提交阶段,Producer只是触发了提交动作,并携带了事务所需的参数,所做的操作相当有限,重头还是在Coordinator端
注:这里的提交动作是直接提交给Coordinator的,就跟事务初始化阶段,获取Producer id一样
在内部Topic __transaction_state
中存储了当前事物所关联的所有Partition信息,因此在提交阶段,就是向这些Partition发送control marker信息,用来标记当前事物的结束。而事务消息的标志正如前文消息协议所述,在attribute字段的第5个bit
attribute字段:
control
如前文所说,LSO以下的消息是不会被消费到,这样控制了事务消息的可见性,想控制这点,难度应该不大;但事务提交后,所有当前事物的消息均可见了,那事务提交时,具体发生了什么,是如何控制可能分布在多台broker上的消息同时可见呢?
上图以3个Broker组成的事务举例:
__transaction_state
追加一条消息)看起来是两阶段提交,且一切正常,但却有一些疑问:
问题1: 3.1向__transaction_state
写完事务状态后,便给Producer回应说事务提交成功,假如说3.2执行过程中被hang住了,在Producer看来,既然事务已经提交成功,为什么还是读不到对应消息呢?
的确是这样,这里成功指的是Coordinator收到了消息,并且成功修改了事务状态。因此返回成功的语义指的是一阶段提交成功,因为后续向各个Partition发送写marker的会无限重试,直至成功
问题2: 3.2中向多个Broker发送marker消息,如果Broker1、Broker2均写入成功了,但是Broker3因为网络抖动,Coordinator还在重试,那么此时Broker1、Broker2上的消息对Consumer来说已经可见了,但是Broker3上的消息还是看不到,这不就不符合事务语义了吗?
事实确实如此,所以kafka的事务不能保证强一致性,并不是说kafka做的不够完美,而是这种分布式事务统一存在类似的问题,CAP铁律限制,这里只能做到最终一致性了。不过对于常规的场景这里已经够用了,Coordinator会不遗余力的重试,直至成功
kafka.coordinator.transaction.TransactionCoordinator#endTransaction()
这里是当__transaction_state
状态改为PrepareCommit后,就向Producer返回成功
case Right((txnMetadata, newPreSendMetadata)) =>
// we can respond to the client immediately and continue to write the txn markers if
// the log append was successful
responseCallback(Errors.NONE)
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
参与方:Producer、Broker
事务取消如果是Producer端触发的,代码如下
producer.abortTransaction();
事务提交对应的API为ApiKeys.END_TXN(与事务提交是同一个API,不过参数不一样),Producer向Broker请求的入参为
transactionalId
事务id,即客户自定义的字符串producerId
producer id,由coordinator生成,递增epoch
由coordinator生成committed
false:abort事务取消除了由Producer触发外,还有可能由Coordinator触发,例如“事务超时”,Coordinator有个定时器,定时扫描那些已经超时的事务
kafka.coordinator.transaction.TransactionCoordinator#startup()
def startup(retrieveTransactionTopicPartitionCount: () => Int, enableTransactionalIdExpiration: Boolean = true): Unit = {
info("Starting up.")
scheduler.startup()
scheduler.schedule("transaction-abort",
() => abortTimedOutTransactions(onEndTransactionComplete),
txnConfig.abortTimedOutTransactionsIntervalMs,
txnConfig.abortTimedOutTransactionsIntervalMs
)
txnManager.startup(retrieveTransactionTopicPartitionCount, enableTransactionalIdExpiration)
txnMarkerChannelManager.start()
isActive.set(true)
info("Startup complete.")
}
其实事务取消的流程在Coordinator端,跟事务提交大同小异,不过事务取消会向.txnindex
文件写入数据,也就是.txnindex
文件存储了所有已取消的事务详情。对应源码文件为 kafka.log.AbortedTxn.scala
,.txnindex
文件存储协议如下
currentVersion
当前文件版本号,目前为0producerId
producerIdfirstOffset
当前事务的开始offsetlastOffset
当前事务的结束offsetlastStableOffset
存储时的LSO存储详情中,不需要记录epoch、sequence等信息,因为这个文件的目的是配合Consumer进行消息过滤的,有了事务的起止offset已经足够
firstOffset 与 lastOffset 可能跨度很长,之间如果有多个事务如何区分呢?
其实首先明确一点,同一个ProducerId在同一个时间段,只会存在一个事物,例如某条记录是这样存储:(producerId:1000, firstOffset:20, lastOffset:80) ,也就是offset在20与80之间,producerId为1000的记录只会存在一条,当然也有可能出现如下记录
但是producerId一定不是1000了,这点很关键,因为在事务消息消费时,还要依赖这个
append“事务取消记录”入口 kafka.log.LogSegment#updateTxnIndex()
参与方:Consumer、Broker
前文所有的工作,其实都体现在事务消费上,消费事务消息,也是kafka非常重要的课题
当consumer的事务隔离级别(isolation.level
)设置为 read_committed 后,便只能拉取LSO以下的记录,且返回的信息中还会携带已取消的事务
kafka.log.UnifiedLog#read
def read(startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean): FetchDataInfo = {
checkLogStartOffset(startOffset)
val maxOffsetMetadata = isolation match {
case FetchLogEnd => localLog.logEndOffsetMetadata
case FetchHighWatermark => fetchHighWatermarkMetadata
case FetchTxnCommitted => fetchLastStableOffsetMetadata
}
localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)
}
正如前文所说,LSO之前的记录,均是已提交或已取消的事务;因此在一个事物未完成之前,是永远都不会被consumer拉取到的。此时还要引出前文提出的问题,即consumer消息策略
High Water Mark
,consumer不断拉取消息,不论是已经完结的事务消息还是未完结,亦或是普通消息,统一进行拉取;然后在consumer端进行过滤,发现某事物消息未完结,那么暂存在consumer,等收到control mark消息后,再判断将所有消息返回给业务方,或是丢弃Last Stable Offset
,consumer只返回最后一个已完结事务之前的消息,consumer拉取消息后,即便是事务marker还未拉取,也可以判断是提交还是丢弃其实很明显,现在kafka最新版本采用的是策略二,不过我们还是有必要比较一下两者优缺点
策略一
策略二
优点
性能相对较高,比如LSO之后存在一些已提交的事务消息,或者普通消息,能够及时消费到
不会造成consume端OOM;只消费LSO以下的消息,因此在拿到消息后便可以判断是commit还是abort
consumer退出或重启,走常规应对即可,降低位点管理的复杂度
缺点
如果事务跨度过长,容易造成consumer端的消息积压,从而OOM
consumer退出或重启,对于已积累但未吐出的消息很难处理,需要使用复杂的逻辑来管理位点
性能较低,由于consumer只能看到LSO以下的消息,故一些非事务消息(或已完结的事务消息,但在LSO之上)不能及时消费。
综合考虑后,kafka还是选择了可控性较强,且没有致命bug的策略二,虽然有一些性能损失,但换来的是整个集群的稳定性
当consumer设置了read_committed消费消息时,除了返回常规的RecordBatch集合外,还会返回拉取区间已取消的事务列表。假定consumer收到了一段数据:
其中白色的为非事务消息,即普通消息,彩色的为事务消息,相同颜色的消息为同一事务。下面表格中,abortTxns的格式为 (producerId, startOffset, endOffset)
abortTxns
有效消息
无效消息
说明
empty
100-115
无
当取消事务列表为空时,说明当前读取到事务消息均为提交成功的事务消息
[(10, 101, 115)]
100,
103-114
101,102,103
abort列表表明producerId为10的事务已经取消,因此扫描整个列表,发现符合abort条件的记录是101、102、115
[(11, 110, 112)]
100-109,
111,
113-115
110, 112
虽然103、106的producerId也是11,但是offset range并不匹配;虽然111的offset range匹配,但是其producerId不匹配
[(10, 101, 115),
(11, 103, 106),
(12, 104, 111)]
100,105,109,110,112,113,114
101-104,
106-108,
111, 115
不再赘述,无效消息通过producerId+offset range统一来确定
注:consumer在读取以上信息的时候,可能并内有读取到control marker信息,但是已经能够确定目标消息是事务完结状态,且已经知道事务是commit或abort了,因此可以直接处理;而control消息是由coordinator发送给各个partition的,属于内部消息,consumer对于control消息是会自动过滤掉的
org.apache.kafka.clients.consumer.internals.Fetcher.CompletedFetch#nextFetchedRecord()
// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
既然kafka已经实现了事务,那么我们的业务系统中是否可以直接依赖这一特性?
假如这样使用kafka:
如果业务方将1、2整体当做是一个事务的话,那么理解就有偏差了,因为这个过程当中还缺少提交位点的步骤,假如步骤2已经执行完毕,但还未提交位点,consumer发生了重启了,那么这条消息还会被再次消费,因此kafka所说的事务支持,指的是读取、写入都在kafka集群上
消息的消费可以分为三种类型
At Least Once(至少一次)
也就是某条消息,至少会被消费一次,潜台词就是消息可能会被消费多次,也就是重复消费;kafka默认的消费类型,实现它的原理很简单,就是在业务方将消息消费掉后,再提交其对应的位点,业务方只要做好消息去重,运行起来还是很严谨的
At Most Once (至多一次)
与至少一次相对,不存在重复消费的情况,某条消息最多被消费一次,潜台词就是可能会丢消息;实现原理还是控制位点,在消费某条消息之前,先提交其位点,再消费,如果提交了位点,consumer重启了,重启后从最新位点开始消费数据,也就是之前的数据丢失了,并没有真正消费
Exactly Once(精确一次)
不论是“至少一次”还是“至多一次”都不如精确一次来的生猛,有文章说kafka事务实现了精确一次,但这样评论是不够严谨的,如果业务方将一次「拉取消息+业务处理」当做一次处理的话,那即便是开启了事务也不能保证精确一次;这里的精确一次指的读取、写入都是操作的kafka集群,而不能引入业务处理
关于Exactly Once,这里引用一下官方对其描述,Exactly-once Semantics in Apache Kafka
简单概括一下就是 1、幂等型的Producer,在单分区的前提下支持精准一次、有序的消息投递;2、事务,跨多分区的原子写入 3、Stream任务,类型为read-process-write形式的,可做到精确一次
举Stream中的例子:从1个Topic中读取数据,经过业务方的加工后,写入另外Topic中
producer.initTransactions();
producer.beginTransaction();
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());
producer.send(producerRecord);
}
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("groupId"));
producer.commitTransaction();
可以简单认为,将一次数据读取,转换为了数据写入,并统一归并至当前事务中;关键代码为
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("groupId"));
这个请求对应的API是ApiKeys.ADD_OFFSETS_TO_TXN,参数列表为
核心思想就是算出groupId在__consumer_offsets
中对应的partition,然后将该partition加入事务中,在事务提交/取消时,再统一操作,这样便实现了读与写的原子性。
不过这样做的前提是consumer需要将enable.auto.commit
参数设置为false,并使用producer.sendOffsetsToTransaction()
来提交offset
事务总共有8种状态
state
desc
0-Empty
Transaction has not existed yet
1-Ongoing
Transaction has started and ongoing
2-PrepareCommit
Group is preparing to commit
3-PrepareAbort
Group is preparing to abort
4-CompleteCommit
Group has completed commit
Will soon be removed from the ongoing transaction cache
5-CompleteAbort
Group has completed abort
Will soon be removed from the ongoing transaction cache
6-Dead
TransactionalId has expired and is about to be removed from the transaction cache
7-PrepareEpochFence
We are in the middle of bumping the epoch and fencing out older producers.
最常见的状态流转
总结一下kafka事务相关的一些topic及文件。topic只有一个,是专门为事务特性服务的,而文件有两个,这里的文件指的是所有参与事务的topic下文件
Topic
__transaction_state
内部compact topic,主要是将事务状态持久化,避免Transactional Coordinator重启或切换后事务状态丢失
文件
.txnindex
存放已经取消事务的记录,请问已经提到过,如果当前logSegment没有取消的事务,那么这个文件也不会存在
.snapshot
正如其名,因为Broker端要存放每个ProducerId与Sequence的映射关系,目的是sequence num的验重
.snapshot
跟其他索引文件不同,其他索引文件都是随着记录的增加,动态append到文件中的;而.snapshot
文件则是在logSegment roll时,也就是切换下一个log文件时,将当前缓存中的所有producerId及Sequence的映射关系存储下来。一旦发生Broker宕机,重启后只需要将最近一个.snapshot
读取出来,并通过log文件将后续的数据补充进来,这样缓存中就可以存储当前分区的全量索引
field
desc
Version
Version of the snapshot file
Crc
CRC of the snapshot data
Number
The entries in the producer table
ProducerId
The producer ID
ProducerEpoch
Current epoch of the producer
LastSequence
Last written sequence of the producer
LastOffset
Last written offset of the producer
OffsetDelta
The difference of the last sequence and first sequence in the last written batch
Timestamp
Max timestamp from the last written entry
CoordinatorEpoch
The epoch of the last transaction coordinator to send an end transaction marker
CurrentTxnFirstOffset
The first offset of the on-going transaction (-1 if there is none)
API KEY
描述
ApiKeys.FIND_COORDINATOR
寻找transaction coordinator
ApiKeys.INIT_PRODUCER_ID
初始化producerId及epoch
ApiKeys.ADD_PARTITIONS_TO_TXN
将某个partition添加进入事务
ApiKeys.PRODUCE
发送消息
ApiKeys.END_TXN
事务结束,包括事务提交跟事务取消
ApiKeys.FETCH
拉取消息
ApiKeys.ADD_OFFSETS_TO_TXN
read-process-write模式时使用,用于将一次读操作转换为写行为
注:本文所有代码截取均基于开源v3.3.1版本
kafka topic 中的文件 kafka.log.UnifiedLog#1767
object UnifiedLog extends Logging {
val LogFileSuffix = LocalLog.LogFileSuffix
val IndexFileSuffix = LocalLog.IndexFileSuffix
val TimeIndexFileSuffix = LocalLog.TimeIndexFileSuffix
val ProducerSnapshotFileSuffix = ".snapshot"
val TxnIndexFileSuffix = LocalLog.TxnIndexFileSuffix
val DeletedFileSuffix = LocalLog.DeletedFileSuffix
val CleanedFileSuffix = LocalLog.CleanedFileSuffix
val SwapFileSuffix = LocalLog.SwapFileSuffix
val DeleteDirSuffix = LocalLog.DeleteDirSuffix
val FutureDirSuffix = LocalLog.FutureDirSuffix
根据TransactionId计算partition kafka.coordinator.transaction.TransactionStateManager#partitionFor
def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
生成ProducerId kafka.coordinator.transaction.ZkProducerIdManager#generateProducerId
def generateProducerId(): Long = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
allocateNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
nextProducerId - 1
}
}
过滤control消息 org.apache.kafka.clients.consumer.internals.Fetcher.CompletedFetch#nextFetchedRecord
if (record.offset() >= nextFetchOffset) {
// we only do validation when the message should not be skipped.
maybeEnsureValid(record);
// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
}
参考:
https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/
https://www.slideshare.net/ConfluentInc/exactlyonce-semantics-in-apache-kafka
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit
手机扫一扫
移动阅读更方便
你可能感兴趣的文章