☞ 以下介绍消息队列在实际应用常用的使用场景。异步处理、应用解耦、流量削锋和消息通讯四个场景。
【1】异步处理:场景说明:用户注册后,需要发注册邮件和注册短信。
引入消息队列后架构如下:用户的响应时间=注册信息写入数据库的时间,例如50毫秒。发注册邮箱、发注册短信写入消息队列后,直接返回客户端,因写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。按照传统的做法①、串行方式,将注册信息写入数据库成功后,发注册邮件,再发送注册短信,以上三个成功后,返回客户端。可能需要150毫秒,这样使用消息队列提高了3倍。②、并行方式,将注册信息写入数据库成功后,发送注册邮件,同时发送注册短信。也可能需要100毫秒,这样使用消息队列提高了2倍。
【2】应用解耦:场景说明:用户下单后,订单系统需要通知库存系统。如下图:
传统模式的缺点:①、库存系统无法访问时,则订单减库存业务将会失败,从而导致订单失败;②、订单系统与库存系统耦合;
引入消息队列:①、用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。②、库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
☛ 当库存系统不能正常使用时,也不会影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的解耦。
【3】流量削锋:场景说明:秒杀或团抢活动中使用广泛。秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。一般需要在应用前端加入消息队列。
用户请求:服务器接受后,首先写入消息队列。当消息队列长度超出最大数量,则直接抛弃用户请求或跳转至错误页面。
秒杀业务处理:根据消息队列中的请求信息,再做后续处理。
▁▂▃ 这样可以有效的控制活动人数和有效缓解短时间内的高流量冲击,防止压垮应用系统。
【4】日志处理:指将消息队列用在日志处理中,比如 Kafka 的应用,解决大量日志传输的问题。
▷ 日志采集客户端:负责日志数据采集,定时写入 Kafka队列。
▷ kafka消息队列:负责日志数据的接收,存储和转发。
▷ 日志处理应用:订阅并消费 kafka 队列中的日志数据。
【5】消息通信:消息队列一般都内置了高效的通信机制,因此也可以用纯消息通信。比如实现点对点消息队列,或者聊天室。
①、点对点通讯:客户端A和客户端B使用同一队列,进行消息通讯。
②、聊天室通讯(发布订阅模式):客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
【1】发送端 MQ-Product (消息生产者)将消息发送给 MQ-server;
【2】MQ-server 将消息落地,持久化到数据库等;
【3】MQ-server 回 ACK 给 MQ-Producer;
【4】MQ-server 将消息发送给消息接收端 MQ-Consumer (消息消费者);
【5】MQ-Consumer 消费接收到消息后发送 ACK 给 MQ-server;
【5】MQ-server 将落地消息删除;
为了保证消息必达,MQ使用了消息超时、重传、确认机制。使得消息可能被重复发送,当消息生产者收不到 MQ-server 的ACK,重复向 MQ-server发送消息。MQ-server 收不到消息消费者的 ACK,重复向消息消费者发消息。
【消息重发】:【1】如果消息接收者在处理消息过程中没有对MOM(消息中间键)进行应答,则消息将由 MOM重发。
【2】如果队列中设置了预读参数(consumer.perfetchSize),如果消息接收者在处理第一条消息时(没有向MOM进行确认)就宕机了,则预读数量的所有消息将被重发。
【3】如果 Session 是事务的,则只要消息接收者有一条消息没有确认,或消息发送期间 MOM 或客户端某一方突然宕机了,则该事务范围中的所有消息 MOM 都将重发。
▷ ActiveMQ 消息服务器怎么知道客户端到底是消息正在处理中还是已处理完成没应答MOM或者宕机等等情况?其实是所有的客户端机器,都运行着一套客户端的 ActiveMQ 环境,该环境缓存发来的消息,维持着和 ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。
【补发策略】:前提,Broker 根据自己的规则,通过 BrokerInfo 命令包和客户端建立连接,向客户端传送缺省发送策略(发送:同步和异步,策略:持久化消息和非持久化消息)。但是客户端可以使用 ActiveMQConnect.getRedeliveryPolicy() 方法覆盖该策略设置。
1 RedeliveryPolicy policy = connection.getRedeliveryPolicy();
2 policy.setInitialRedeliveryDelay(500);
3 policy.setBackOffMultiplier(2);
4 policy.setUseExponentialBackOff(true);
5 policy.setMaximumRedeliveries(2);
★ 一旦消息重发尝试超过重发策略中配置的 maximumRedeliveries(默认=6)会给 Broker 发送一个“Poison ack”通知它,这个消息被认为是 a poison pill,接着 Broker会将这个消息发送给 DLQ(Dead Letter Queue),以便后续处理。
【策略】:【1】 缺省死信队列(Dead Letter Queue)叫做Active.DLQ;所有的未送达消息将发送到这个队列,导致非常难于管理。此时就可以通过设置 activemq.xml 文件中的 destination policy map 的 “individualDeadLetterStrategy” 属性来修改。
1
2
18 …
19
【2】自动丢弃过期消息(Expired Messages):一些应用可能只是简单的丢弃过期消息,而不是将它们放到 DLQ。在dead letter strategy死信策略上配置 processExpired 属性为 false,可以实现这个功能。
1
2
17 …
18
【3】将非持久信息(non-persistent messages)放入死信队列 ActiveMQ 缺省不会将未发送到的非持久信息放入死信队列。如果一个应用程序并不想将消息 message 设置为持久的,那么记录下来的那些未发送到的消息对它来说往往也就没有价值。不过如果想实现这个功能,可以在 dead-letter-strategy 死信策略上设置 processNonPersistent="true"。
1
2
18 …
19
对于非幂等性的服务而言,如果重复发送消息就会产生严重的问题。譬如:银行取钱,上游支付系统负责给用户扣款,下游系统负责给用户发钱,通过 MQ异步通知。不管是上游的 ACK丢失,导致 MQ收到重复的消息,还是下半场 ACK丢失,导致系统收到重复的出钱通知,都可能出现,上游扣了一次钱,下游发了多次钱。消息队列的异步操作,通常用于幂等性的服务,非幂等性的服务是不适用中间件进行通信的。更多的是建立长连接 Socket 进行通信的。或者通过如下方式改造。
对于每条消息,MQ内部生成一个全局唯一、与业务无关的消息ID:inner-msg-id。当 MQ-server 接收到消息时,先根据 inner-msg-id 判断消息是否重复发送,再决定是否将消息落地到 DB中。这样,有了这个 inner-msg-id 作为去重的依据就能保证一条消息只能一次落地到 DB。
【1】对于非幂等性业务且要求实现幂等性业务:生成一个唯一ID标记每一条消息,将消息处理成功和去重日志通过事物的形式写入去重表。
【2】对于非幂等性业务可不实现幂等性的业务:权衡去重所花的代价决定是否需要实现幂等性,如:购物会员卡成功,向用户发送通知短信,发送一次或者多次影响不大。不做幂等性可以省掉写去重日志的操作。
【Active 中有两种方式保证消息消费的顺序性】:【1】通过高级特性 consumer 独有的消费者(exclusive consumer)。如果一个 queue 设置为 exclusive,broker 会挑选一个 consumer,并且将所有的消息都发给这个 consumer。如果这个 consumer挂了,broker 会自动挑选另外一个 consumer。
1 queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
2 consumer = session.createConsumer(queue);
【2】利用 Activemq 的高级特性:MessageGroups。Message Groups 特性是一种负载均衡的机制。在一个消息被分发到 consumer之前,broker 首先检查消息 JMSXGroupID属性。如果存在,那么 broker会检查是否有某个 consumer拥有这个message group。如果没有,那么 broker会选择一个 consumer,并将它关联到这个 message group。此后,这个 consumer会接收这个 message group的所有消息,直到:
①、Consumer 被关闭。
②、Message group 被关闭,通过发送一个消息,并设置这个消息的 JMSXGroupSeq 为 -1。
消费者实际上根据两个维度排序了,一个是消费者的 Priority,即消费者的优先级。还有一个是消费者的指定的消息组的个数 AssignedGroupCount。这个顺序直接影响到下一条消息是谁来接收。
1 protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {
2 boolean result = true;
3 // 保持消息组在一起。
4 String groupId = node.getGroupID();
5 int sequence = node.getGroupSequence();
6 if (groupId != null) {
7 // 先查找该queue存储的一个groupId,和consumerId的一个map
8 MessageGroupMap messageGroupOwners = getMessageGroupOwners();
9 // 如果是该组的第一条消息。则指定该consumer消费该消息组
10 if (sequence == 1) {
11 assignGroup(subscription, messageGroupOwners, node, groupId);
12 } else {
13 // 确保前一个所有者仍然有效,否则就生成新的主人。
14 ConsumerId groupOwner;
15 groupOwner = messageGroupOwners.get(groupId);
16 if (groupOwner == null) {
17 assignGroup(subscription, messageGroupOwners, node, groupId);
18 } else {
19 if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
20 // 一个组中的 sequence < 1 表示改组消息已经消费完了
21 if (sequence < 0) {
22 messageGroupOwners.removeGroup(groupId);
23 subscription.getConsumerInfo().decrementAssignedGroupCount(destination);
24 }
25 } else {
26 // 说明该消费者不能消费该消息组
27 result = false;
28 }
29 }
30 }
31 }
32 return result;
33 }
【RabbitMQ 保证消息队列的顺序性】:造成顺序错乱的场景:RabbitMQ 中有一个 Queue,多个 Consumer。生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1、data2、data3,放入RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,可能消费者2先执行完操作,把 data2 存入数据库,然后是 data1、data3。导致顺序错乱。
【解决方案】:RabbitMQ 将上面的一个 Queue 拆分为三个 Queue,每个 Queue 对应一个 Consumer,就是多一些 Queue 而已,确实是麻烦点;然后这个 Consumer内部用内存队列做排队,然后分发给底层不同的 worker来处理。 如下,将消息放入一个队列,由一个消费者消费即可保证顺序。
【Kafka 保证消息队列的顺序性】: 建了一个 Topic,有三个 Partition。生产者在写的时候,其实可以指定一个 key,比如说指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 Partition中去,而且这个 Partition中的数据一定是有顺序的。消费者从 Partition中取出来数据的时候,也一定是有顺序的。接着,消费者里可能会搞多个线程来并发处理消息。因为如果消费者用单线程时,处理比较耗时。而多线程并发处理时,顺序可能就乱序。
【解决方案】:①、一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
②、写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
【1】Kafka 是 LinkedIn 开发的一个高性能、分布式的消息系统,广泛用于日志收集、流式数据处理、在线和离线消息分发等场景。虽然不是作为传统的 MQ来设计,但在大部分情况下,Kafka 也可以代替原有 ActiveMQ 等传统的消息系统。
【2】Kafka 将消息流按 Topic组织,保存消息的服务器称为 Broker,消费者可以订阅一个或者多个 Topic。为了均衡负载,一个Topic 的消息又可以划分到多个分区(Partition),分区越多,Kafka 并行能力和吞吐量越高。
【3】Kafka 集群需要 Zookeeper支持来实现集群,Kafka 发行包中已经包含了 Zookeeper,部署的时候可以在一台服务器上同时启动一个 Zookeeper Server 和 一个 Kafka Server,也可以使用已有的其他 Zookeeper 集群。
【4】和传统的 MQ 不同,消费者需要自己保留一个 offset,从 Kafka 获取消息时,只拉取当前 offset 以后的消息。Kafka 的scala/java 版的 Client 已经实现了这部分的逻辑,将 offset 保存到 zookeeper上。每个消费者可以选择一个 id,同样 id 的消费者对于同一条消息只会收到一次。一个 Topic 的消费者如果都使用相同的id,就是传统的 Queue;如果每个消费者都使用不同的id,就是传统的 pub-sub。Kafka后来把对 offset的保存放在了一个 topic里面。
如果在 MQ 的场景下,将 Kafka 和 ActiveMQ 相比,Kafka 的优点:
【1】分布式、高可扩展:Kafka 集群可以透明的扩展,增加新的服务器进集群。
【2】高性能:Kafka 的性能大大超过传统的 ActiveMQ、RabbitMQ 等 MQ 实现,尤其是 Kafka 还支持 batch 操作。
【3】容错:Kafka 每个 Partition 的数据都会复制到几台服务器上。当某个 Broker 故障失效时,ZooKeeper 服务将通知生产者和消费者,生产者和消费者转而使用其它 Broker。
【4】高吞吐:在一台普通的服务器上既可以达到 10W/s 的吞吐速率。
【5】完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡。
【6】快速持久化:可以在 O(1) 的系统开销下进行消息持久化。
【7】游标位置:ActiveMQ 游标由 AMQ来管理,无法读取历史数据。Kafka 客户端自己管理游标,可以重读数据。
Kafka 的缺点:
【1】重复消息:Kafka 默认提供的是at least once语义,只保证每个消息至少会送达一次,虽然几率很小,但一条消息有可能会被送达多次。
【2】消息乱序:虽然一个 Partition 内部的消息是保证有序的,但是如果一个Topic 有多个Partition,Partition 之间的消息送达不保证有序。
【3】复杂性:Kafka 需要 zookeeper 集群的支持,Topic 通常需要人工来创建,部署和维护较一般消息队列成本更高。
MQ 是非线程安全的
Kafka 架构:【1】Producers(生产者):生产者是发送一个或多个主题 Topic 的发布者。生产者向 Kafka 代理发送数据。每当生产者将消息发布给代理时,代理只需要将消息附加到最后一个段文件。实际上,该消息将被附加到分区。生产者也可以向指定的分区发送消息。
【2】Brokers:代理(经纪人)负责维护发布数据的简单系统。
【3】Topic:主题属于特定类别的信息流称为主题。数组存储在主题中。Topic 相当于 Queue。主题被拆分成分区。分区被实现为具有大小相等的一组分段文件。
【4】Partition(分区):每个 Partition 内部消息有序,其中每个消息都有一个 offset 序号。一个 Partition 值对应一个 Broker,一个 Broker 可以管理多个 Partition。
【5】Segment:Partition 物理上由多个 Segment组成。每个 Partion 目录相当于一个巨型文件被平均分配到多个大小相等 segment段数据文件中。但每个段 segment file消息数量不一定相等
【6】Partition offset(分区偏移):每个 Partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 Partition中。Partition 中的每个消息都有一个连续的序列号叫做 offset,用于 Partition唯一标识一条消息。
【7】Replicas of partition(分区备份):副本只是一个分区备份:不读取和写入数据,主要用于防止数据丢失。
【8】Kafka Cluster(Kafka 集群):Kafka 有多个代理被称为 Kafka集群。可以扩展 Kafka集群,无需停机。这些集群用于管理消息数据的持久性和复制。
【9】Consumers(消费者):Consumers 从 MQ读取数据。 消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息。Consumer 自己维护消费到哪个 offset。
【每个Consumer 都有对应的 group】:【1】group 内是 queue 消费模型:各个 Consumer 消费不同的 Partition,因此一个消息在 group 内只消费一次。
【2】group 间是 publish-subscribe 消费模型:各个 group 各自独立消费,互不影响,因此一个消息被每个 group 消费一次。
Producer 数据丢失的原因:【1】使用同步模式的时候,有 3种状态保证消息被安全生产,当配置 ack=1时(只保证写入 Leader成功)的话,如果刚好 Leader partition 挂了,数据就会丢失。
ack 机制:broker 表示发来的数据已确认接收无误,表示数据已经保存到磁盘。
0:不等待 broker 返回确认消息
1:等待 topic 中某个 partition leader 保存成功的状态反馈
-1/all:等待 topic 中某个 partition 所有副本都保存成功的状态反馈
【2】使用异步模式时,当缓冲区满了,如果配置=0(还没有收到确认的数据,数据就立即被丢弃掉)。
解决办法:只要能避免以上两种情况就可以保证消息不会被丢失。如下:
【1】当同步模式时,确认机制设置为-1,就是让消息写入 Leader 和所有副本。
【2】当异步模式时,消息发出,还没收到确认的时候,缓冲区也满了。在配置文件中设置成不限制阻塞超时的时间,也就是说让生产者一直阻塞,这样就能保证数据不会丢失。
1 producer.type = async
2 request.required.acks=1
3 queue.buffering.max.ms=5000 #异步发送的时候 发送时间间隔 单位是毫秒
4 queue.buffering.max.messages=10000
5 queue.enqueue.timeout.ms = -1
6 batch.num.messages=200 #异步发送 每次批量发送的条目
Kafka弄丢了数据:Kafka 的某个 Broker宕机了,然后重新选举 Broker上的 Partition 的 Leader时。如果此时 Follower还没来得及同步数据,Leader就挂了,然后某个 Follower成为了 Leader,他就少了一部分数据。
解决办法:一般要求设置 4个参数来保证消息不丢失:
【1】给 Topic设置 replication.factor 参数:这个值必须大于1,表示要求每个 Partition必须至少有2个副本。
【2】在 Kafka服务端设置 min.isync.replicas参数:这个值必须大于1,表示要求一个 Leader至少感知到有至少一个 Follower在跟自己保持联系正常同步数据,这样才能保证 Leader挂了之后还有一个 Follower。
【3】在生产者端设置 acks= -1:要求每条数据,必须是写入所有 Replica 副本之后,才能认为是写入成功了。
【4】在生产者端设置 retries=MAX(很大的一个值,表示无限重试):表示消息一旦写入事变,就无限重试。
Consumer 数据丢失的原因:当你消费到了这个消息,然后消费者那边自动提交了offset,让 kafka 以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢了。
解决办法:【1】Kafka 会自动提交 offset,使用 Kafka高级API,如果将自动提交 offset 改为手动提交(当数据入库之后进行偏移量的更新),就可以保证数据不会丢。但是可能导致重复消费,比如你刚处理完,还没有提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
【镜像模式】:队列的数据都镜像了一份到所有的节点上。这样任何一个节点失效,不会影响到整个集群的使用。在实现上 mirror queue 内部有一套选举算法,会选出一个 master 和若干的 slaver。master 和 slaver 通过相互之间不断的发送心跳来检查是否连接断开。可以通过指定 net_ticktime 来控制心跳检查频率。注意一个单位时间 net_ticktime 实际上做了4次交互,故当超过net_ticktime (± 25%) 秒没有响应的话则认为节点挂掉。另外注意修改 net_ticktime 时需要所有节点都一致。配置举例:
1 {rabbit, [{tcp_listeners, [5672]}]},
2 {kernel, [{net_ticktime, 120}]}
【Consumer】:任意连接一个节点,若连上的不是 Master,请求会转发给 Master,为了保证消息的可靠性,Consumer 回复 Ack给 Master后,Master 删除消息并广播所有的 Slaver去删除;
【Publisher】:任意连接一个节点,若连上的不是 Master,则转发给 Master,由 Master存储并转发给其他的 Slaver存储;
【如果 Slaver 挂掉】:则集群的节点状态没有任何变化。只要 Client没有连到这个节点上,也不会给 Client发送失败的通知。在检测到 Slaver挂掉的期间 Publish消息会有延迟。如果配置了高可用策略是自动同步,当 Slaver起来后,队列中有大量的消息需要同步,将会整个集群阻塞长时间的不能读写直到同步结束;
【RabbitMQ 实现了一种镜像队列(mirrored queue)的算法提供HA】:创建队列时可以通过传入“x-ha-policy”参数设置队列为镜像队列,镜像队列会存储在多个 Rabbit MQ 节点上,并配置成一主多从的结构,可以通过“x-ha-policy-params”参数来具体指定master 节点和 slave节点的列表。所有发送到镜像队列上的操作,比如消息的发送和删除,都会先在 master节点上执行,再通过一种叫 GM(Guaranteed Multicast)的原子广播(atomic broadcast)算法同步到各 slave节点。GM算法通过两阶段的提交,可以保证 master节点发送到所有 slave节点上的消息要么全部执行成功,要么全部失败;通过环形的消息发送顺序,即 master节点发送消息给一个 slave节点,这个 slave节点依次发送给下一个 slave节点,最终消息回到 master节点,保证了主从节点上的负载差别不大。通过传入“x-ha-policy”参数设置队列为镜像队列(mirrored queue):定义一个policy:以“ha.”开头的队列都被镜像到集群中的所有节点上:rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'。定义一个policy:以“cinder”开头的队列被镜像到集群中的任意两个节点上,并且自动同步:rabbitmqctl set_policy ha-cinder-two "^cinder"或者设置'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}';
all:队列将 mirrored 到所有集群中的节点中,当新节点添加进来时也会 mirrored 到新的节点;
exactly(需指定count):如果节点数小于 count 数,则队列将 mirrored 到所有的节点。如果节点数大于 count,新的节点将不再创建队列的 mirror(即使原来已创建 mirror 的节点挂掉也不会创建);
nodes:对指定的节点进行 mirror。如果没有一个指定的节点在运行中,那么只有 client 连接的那个节点才会声明 queue(这里有个迁移策略:假如 queue是在[A,B]上且A为 master,若给定的新的策略为nodes[C,D],那么为了防止数据丢失,在迁移中会同时存在[A,C,D]直到C,D已经同步好以后,A才会关闭);
【1】顺序读写磁盘,充分利用了操作系统的预读机制。
【2】Linux 中使用 sendfile 命令,减少一次数据拷贝(零拷贝):
①、把数据从硬盘读取到内核中的页缓存。
②、把数据从内核中读取到用户空间(sendfile 命令跳过此步骤)。
③、把用户空间的数据写到 socket 缓存区中。
④、操作系统将数据从 socket 缓冲区中复制到网卡缓冲区,以便将数据经网络发出。
【3】生产者缓存消息批量发送,消费者批量从 broker 获取消息,减少 IO 次数,充分利用磁盘顺序读写的性能。
【4】通常情况下 Kafka 的瓶颈不是 CPU或者磁盘,而是网络宽带,所以生产者可以对数据进行压缩。
详细博文链接
与 RabbitMQ 的区别:ღ RabbitMQ 用在对可靠性要求比较高的消息传递上。kafka:用于处理活跃的流式数据,大数据量的数据处理上。
【1】在架构模型方面:RabbitMQ 遵循 AMQP 协议,RabbitMQ 的 Broker由 Exchange、Binding、Queue 组成,其中 Exchange 和 Binding 组成了消息的路由键;Producer 通过连接 Channel 和 Server 进行通信,Consumer 从 Queue 获取消息进行消费(长连接,queue 有消息会推送到 consumer端,consumer 循环从输入流读取数据)。rabbitMQ 以 Broker为中心;有消息的确认机制。
kafka 遵从一般的MQ结构,Producer,Broker,Consumer,以 Consumer为中心,消费信息保存的客户端 Consumer上,Consumer根据消费的点,从 Broker上批量 pull数据,无消息确认机制。
【2】在吞吐量方面:RabbitMQ在吞吐量方面稍逊于Kafka,他们的出发点不一样,RabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。
kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy(sendfile 函数) 机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
【3】在可用性方面:RabbitMQ 支持 mirror 的 queue,主 queue失效,mirror queue接管。
Kafka 的 Broker支持主备模式。
【4】在集群负载均衡方面:RabbitMQ 的负载均衡需要单独的 loadbalancer 进行支持。
Kafka 采用 Zookeeper对集群中的 Broker、Consumer进行管理,可以注册 Topic 到Zookeeper上;通过 Zookeeper的协调机制,Producer 保存对应 Topic的 Broker信息,可以随机或者轮询发送到 Broker上;并且 Producer可以基于语义指定分片,消息发送到 Broker的某分片上。
与 ActiveMQ 的区别:ActiveMQ 和 Kafka,前者完全实现了 JMS 的规范,后者并没有纠结于JMS规范,设计了另一套吞吐非常高的分布式发布-订阅消息系统,非常流行。目前归属于 Apache 定级项目。它只用文件系统来管理消息的生命周期。接下来我们结合三个点(消息安全性,服务器的稳定容错性以及吞吐量)来分别谈谈这两个消息中间件。
【1】消息的安全性:Kafka 集群中的 Leader 负责某一 Topic 的某一 Partition 的消息的读写,理论上 Consumer 和 Producer 只与该 Leader 节点打交道,一个集群里的某一 Broker 即是 Leader 的同时也可以担当某一 Partition 的 Follower,即 Replica。Kafka 分配 Replica 的算法如下:
(1)将所有 Broker(假设共n个Broker)和待分配的 Partition排序。
(2)将第i个 Partition分配到第(i mod n)个 Broker上。
(3)将第i个 Partition的第j个 Replica分配到第((i + j) mod n)个 Broker上。
同时,Kafka 与 Replica 既非同步也不是严格意义上的异步。一个典型的 Kafka 发送-消费消息的过程如下:首先 Producer消息发送给某 Topic 的某 Partition 的 Leader,Leader 先是将消息写入本地 Log,同时 follower(如果落后过多将会被踢出 Replica列表)从Leader上 pull 消息,并且在未写入 log 的同时即向 Leader 发送 ACK 的反馈,所以对于某一条已经算作 commit 的消息来讲,在某一时刻,其存在于 Leader的 log中,以及 Replica的内存中。这可以算作一个危险的情况(听起来吓人),因为如果此时集群挂了这条消息就算丢失了,但结合 producer的属性(request.required.acks=-1,当所有follower都收到消息后返回ack)可以保证在绝大多数情况下消息的安全性。当消息算作 commit的时候才会暴露给 consumer,并保证 at-least-once的投递原则。
【2】服务的稳定容错性:前面提到过,Kafka天然支持HA,整个 leader/follower 机制通过 zookeeper调度,它在所有 Broker中选出一个 controller,所有 Partition的 Leader选举都由 controller决定,同时 controller也负责增删 Topic以及 Replica的重新分配。如果Leader挂了,集群将在ISR(in-sync replicas)中选出新的Leader,选举基本原则是:新的 Leader必须拥有原来的 Leader commit 过的所有消息。假如所有的 follower都挂了,Kafka会选择第一个“活”过来的 Replica(不一定是ISR中的)作为 Leader,因为如果此时等待 ISR中的 Replica是有风险的,假如所有的ISR都无法“活”,那此 Partition将会变成不可用。
【3】吞吐量:Leader 节点负责某一 Topic(可以分成多个 Partition)的某一 Partition的消息的读写,任何发布到此 Partition的消息都会被直接追加到 log文件的尾部,因为每条消息都被 append 到该 Partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka高吞吐率的一个很重要的保证),同时通过合理的 Partition,消息可以均匀的分布在不同的 Partition里面。 Kafka基于时间或者 Partition的大小来删除消息,同时 Broker是无状态的,Consumer的消费状态(offset)是由Consumer 自己控制的(每一个 Consumer实例只会消费某一个或多个特定 Partition的数据,而某个 Partition的数据只会被某一个特定的 Consumer实例所消费),也不需要 Broker通过锁机制去控制消息的消费,所以吞吐量惊人,这也是 Kafka吸引人的地方。最后说下由于 zookeeper 引起的脑裂(Split Brain)问题:脑裂问题就是产生了两个 Leader,导致集群行为不一致了。1个集群如果发生了网络故障,很可能出现1个集群分成了两部分,而这两个部分都不知道对方是否存活,不知道到底是网络问题还是直接机器down了,所以这两部分都要选举1个Leader,而一旦两部分都选出了Leader, 并且网络又恢复了,那么就会出现两个 Brain的情况,整个集群的行为不一致了。解决:只有集群中超过半数节点投票才能选举出 Leader。ZooKeeper默认采用了这种方式。
Kafka 的设计目标:kafka在 设计之初就需要考虑以下5个方面的问题
【1】以时间复杂度为O(1)的方式提供消息持久化能力,即使对 TB级以上数据也能保证常数时间复杂度的访问性能。
【2】高吞吐率,即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
【3】支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
【4】同时支持离线数据处理和实时数据处理。
【5】Scale out:支持在线水平扩展。
所以,不像 AMQ,Kafka 从设计开始极为高可用为目的,天然 HA。Broker 支持集群,消息亦支持负载均衡,还有副本机制。同样,Kafka 也是使用 Zookeeper 管理集群节点信息,包括 Consumer 的消费信息也是保存在 zk 中,下面我们分话题来谈:
和传统的MQ不同:消费者需要自己保留一个offset,从kafka 获取消息时,只拉取当前offset 以后的消息。将 offset保存到 zookeeper上。每个消费者可以选择一个id,同样id 的消费者对于同一条消息只会收到一次。一个Topic 的消费者如果都使用相同的id,就是传统的 Queue;如果每个消费者都使用不同的id, 就是传统的pub-sub。
kafka 主从同步怎么实现:Kafka 的主从同步,主要是针对它的 Broker来说。在 Kafka 的 Broker 中,同一个 Topic 可以被分配成多个 Partition,每个 Partition的可以有一个或者多个 replicas(备份),即会有一个 Leader 以及 0到多个 Follower,在Consumer 读取数据的时候,只会从 Leader上读取数据,Follower只是在 Leader宕机的时候来替代 Leader,主从同步有两种方式:同步复制和异步复制,Kafka采用的是中间策略 ISR(In Sync Replicas)。
Kafka 的 ISR策略:有数据写 Leader的时候,Leader会查看 Follower组成的 ISR列表,并且符合以下两点才算是属于 ISR列表:【1】Broker 可以维护和 zookeeper的连接,zookeeper通过心跳机制检查每个节点的连接。【2】如果节点是个 Follow它必须能及时同步 Leader的写操作,不能延时太久。当有写消息的时候,我们可以根据配置做如下配置:request.required.acks 参数的设置来进行调整:
☞ 0 ,相当于异步发送,消息发送完毕即 offset增加,继续生产;相当于At most once;
☞ 1,Leader 收到Leader Replica 对一个消息的接收 ack才增加 offset,然后继续生产;
☞ -1,Leader 收到所有 Replica 对一个消息的接收 ack才增加 offset,然后继续生产;
【1】延迟处理:可以通过设置延迟级别,控制消息延迟的时间。
【2】设置过期时间:
1
1)Message 过期则客户端不能接收;
2)ttlCeiling:表示过期时间上限(程序写的过期时间不能超过此时间);
3)zeroExpirationOverride:表示过期时间(给未分配过期时间的消息分配过期时间);
【3】过期消息处理办法:消息过期后会进入死信队列,如不想抛弃死信队列,默认进入 ACTIVEMQ.DLQ队列,且不会自动清除;对于过期的消息进入死信队列还有一些可选的策略:放入各自的死信通道、保存在一个共享的队列(默认),且可以设置是否将过期消息放入队列的开关以及死信队列消息过期时间。
1)直接抛弃死信队列:AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。如果开发者不需要关心DeadLetter,可以使用此策略。
1
2)定时抛弃死信队列:默认情况下,ActiveMQ永远不会过期发送到 DLQ的消息。但是,从 ActiveMQ5.12开始,deadLetterStrategy 支持 expiration属性,其值以毫秒为单位。
1
2 …
3
6 …
7
3)慢消费者策略设置:Broker将会启动一个后台线程用来检测所有的慢速消费者,并定期关闭它们。中断慢速消费者,慢速消费将会被关闭。abortConnection是否关闭连接;如果慢速消费者最后一个ACK距离现在的时间间隔超过阀 maxTimeSinceLastAck,则中断慢速消费者。
1
2 …
3
6 …
7
RabbitMQ 遵循了 AMQP 规范,用消息确认机制来保证:只要消息发送,就能确保被消费者消费,来做到了消息最终一致性。
Rabbitmq 的整个发送过程如下:【1】生产者发送消息到消息服务。
【2】如果消息落地持久化完成,则返回一个标志给生产者。生产者拿到这个确认后,才能放心的说消息终于成功发到消息服务了。否则进入异常处理流程。
【3】消息服务将消息发送给消费者。
【4】消费者接受并处理消息,如果处理成功则手动确认。当消息服务拿到这个确认后,才放心的说终于消费完成了。否则重发,或者进入异常处理。
【问题】:两台设备上只有一个上存在 logs;
【基本情况】:一个 Topic 配置了四个 Partition,一个 Consumer Group 消费此Topic,但使用两台服务器,分别创建 Consumer 实例。都运行日志收集程序。
【问题】:Consumer Group 是将消费到的日志写入服务器磁盘文件中。有两台服务器都在运行此日志收集程序,每个服务器上的程序都创建了一个 Group 的 Consumer实例,此 Consumer实例会分配到两个 Partition进行处理,因此每个服务器都只存储了一部分日志文件。但是在测试时发现,所有日志都写入了 ServerA,ServerB上没有日志,即便使用测试工具发送了大量数据,ServerB仍然没有日志。
【原因】:查看 log发现,ServerA 上的 Consumer实例分配的 Partition 为 Partition_0 / Partition_1,serverB 上的 Consumer实例分配的 Partition 为partition_3 / Partition_4,两个 Server上的 Consumer实例都被分配了Partition,Partition分配正常,消费应该没有问题。ServerB 上没有日志数据,说明没有数据供其消费,也就是说,所有数据都被 Producer发送到了 Partition_1 或Partition_2 上,这是生产的问题,应该是与生产者的分区路由有关,因此有必要了解下生产者的分区路由策略。Kafka 中的每个Topic 分配了4个 Partition,生产者(Producer)在将消息记录(ProducerRecord)发送到某个 Topic时是要选择对应的 Partition的,选择 Partition的策略如下:
【1】消息中指定Partition:判断 Partition字段是否有值,有值就直接将该消息发送到指定的 Partition就行;
【2】如果没有指定分区(Partition),则使用分区器进行分区路由,首先判断消息中是否指定了key;
【3】如果指定了key,则使用该 key进行 hash操作,并转为正数,然后将其对 Topic相应的分区数进行取余操作,得到一个分区;
【4】如果没有指定key,则在一个随机数上以自增的方式产生一个数(第一次时生成随机数,之后在其基础上进行自增),转为正数之后对分区数量进行取余操作,得到一个分区。
由于在程序中 Producer发送记录的时候指定了固定的 key,根据这个 key进行分区路由总是会选择同一个分区,所有日志都被发送给了同一个分区,因此只有关联这个分区的 Consumer实例才能消费,只有此 Consumer实例所在的 Server上才有日志。
【1】ISR:In-Sync Replicas 副本同步队列(包括 leader 副本在内);
【2】AR:Assigned Replicas 所有副本;
ISR是由 Leader维护,Follower 从 Leader同步数据有一些延迟(包括延迟时间 replica.lag.time.max.ms[默认值为10000] 和延迟条数replica.lag.max.messages[默认值为4000] 两个维度, 当前最新的版本0.10.x中只支持 replica.lag.time.max.ms这个维度),任意一个超过阈值都会把 Follower剔除出 ISR,存入OSR(Outof-Sync Replicas)列表,新加入的 Follower 也会先存放在OSR中。AR=ISR+OSR。
Kafka 源码注释中说明了一般有这几种情况会导致副本失效:
【1】Follower 副本进程卡住,在一段时间内根本没有向 Leader 副本发起同步请求,比如频繁的 Full GC。
【2】Follower 副本进程同步过慢,在一段时间内都无法追赶上 Leader 副本,比如 I/O 开销过大。
【3】如果通过工具增加了副本因子,那么新增加的副本在赶上 Leader 副本之前也都是处于失效状态的。
【4】如果一个 Follower 副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上 Leader 副本之前也处于失效状态。
我们用 UnderReplicatedPartitions代表 Leader副本在当前 Broker上且具有失效副本的分区的个数。
【1】如果集群中有多个 Broker的 UnderReplicatedPartitions保持一个大于0的稳定值时,一般暗示着集群中有 Broker已经处于下线状态。这种情况下,这个 Broker中的分区个数与集群中的所有 UnderReplicatedPartitions(处于下线的 Broker是不会上报任何指标值的)之和是相等的。通常这类问题是由于机器硬件原因引起的,但也有可能是由于操作系统或者 JVM引起的 。
【2】如果集群中存在 Broker的 UnderReplicatedPartitions频繁变动,或者处于一个稳定的大于0的值(这里特指没有 Broker下线的情况)时,一般暗示着集群出现了性能问题,通常这类问题很难诊断,不过我们可以一步一步的将问题的范围缩小,比如先尝试确定这个性能问题是否只存在于集群的某个 Broker中,还是整个集群之上。如果确定集群中所有的 under-replicated分区都是在单个 Broker上,那么可以看出这个 Broker出现了问题,进而可以针对这单一的 Broker做专项调查,比如:操作系统、GC、网络状态或者磁盘状态(比如:iowait、ioutil等指标)。
在 Kafka 中这种功能完全可以支持,同时主写从读可以让从节点去分担主节点的负载压力,预防主节点负载过重而从节点却空闲的情况发生。但是主写从读也有 2 个很明显的缺点:
【1】数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
【2】延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
【Kafka 架构导致我们没有必要使用主从分离】在 Kafka 中 这种负载均衡是在主写主读的架构上实现的。我们来看 一下 Kafka 的生产消费模型,如下图所示。
在 Kafka 集群中有 3 个分区,每个分区有 3 个副本,正好均匀地分布在 3个 broker 上,灰色阴影的代表 Leader 副本,非灰色阴影的代表 Follower 副本,虚线表示 Follower 副本从 Leader 副本上拉取消息。当生产者写入消息的时候都写入 Leader 副本,对于图中的情形,每个 Broker 都有消息从生产者流入。当消费者读取消息的时候也是从 Leader 副本中读取 的,对于图中的情形,每个 Broker 都有消息流出到消费者。从而将压力分配到每个服务器上,从而实现了负载均衡功能。
【1】Broker 注册:Broker 是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的 Broker管理起来,此时就使用到了 Zookeeper。在 Zookeeper上会有一个专门用来进行 Broker服务器列表记录的节点:/brokers/ids 每个Broker在启动时,都会到 Zookeeper上进行注册,即到 /brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。Kafka 使用了全局唯一的数字来指代每个 Broker服务器,不同的 Broker必须使用不同的 Broker ID进行注册,创建完节点后,每个 Broker就会将自己的 IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦 Broker宕机,则对应的临时节点也会被自动删除。
【2】Topic 注册:在 Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个 Broker上,这些分区信息及与Broker的对应关系也都是由 Zookeeper在维护,由专门的节点来记录,如:/borkers/topics Kafka 中每个 Topic都会以 /brokers/topics/[topic] 的形式被记录,如 /brokers/topics/login 和 /brokers/topics/search 等。Broker服务器启动后,会到对应 Topic节点(/brokers/topics)上注册自己的 Broker ID并写入针对该 Topic的分区总数,如 /brokers/topics/login/3->2,这个节点表示Broker ID为3的一个 Broker服务器,对于"login" 这个 Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。
【3】生产者负载均衡:由于同一个 Topic消息会被分区并将其分布在多个 Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka 支持传统的四层负载均衡,也支持 Zookeeper方式实现负载均衡。
■ 四层负载均衡,根据生产者的 IP地址和端口来为其确定一个相关联的 Broker。通常,一个生产者只会对应单个 Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的 TCP连接,只需要和 Broker维护单个 TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个 Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的 Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到 Broker的新增和删除。
■ 使用 Zookeeper进行负载均衡,由于每个Broker启动时,都会完成 Broker注册过程,生产者会通过该节点的变化来动态地感知到 Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
【4】消费者负载均衡:与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
【5】分区与消费者的关系:消费组 (Consumer Group)下有多个 Consumer(消费者)。对于每个消费者组 (Consumer Group),Kafka 都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的 Topic下的每个分区只能分配给某个 group 下的一个 Consumer(当然该分区还可以被分配给其他 group)。同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。在 Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在 Zk 上记录消息分区与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该消息分区上消费者的 Consumer ID。
【6】消息消费进度Offset 记录:在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度 Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset 在 Zookeeper中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 节点内容就是 Offset的值。
【7】消费者注册:消费者服务器在初始化启动时加入消费者分组的步骤如下:注册到消费者分组。每个消费者服务器启动时,都会到 Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的 Topic信息写入该临时节点。对消费者分组中的消费者的变化注册监听。每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的 Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现 Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。进行消费者负载均衡。为了让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行消费者与消息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或 Broker服务器发生变更,会发出消费者负载均衡。
【ZK 的详细存储结构图】:
早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offset 的值。考虑到 zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了 zookeeper的作用。新的 consumer使用了 kafka内部的 group coordination 协议,也减少了对 zookeeper的依赖。
Kafka 使用 ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower 可以批量的从 Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了 Follower与 Leader的消息量差。所有的 Follower 都复制 Leader 的日志,日志中的消息和顺序都和 Leader 中的一致。Follower 像普通的 Consumer 那样从 Leader 那里拉取消息并保存在自己的日志文件中。ISR 中有f+1个节点,就可以允许在f个节点 Down掉的情况下不会丢失消息并正常提供服。ISR 的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR。因此如果 Leader宕了,直接从 ISR中选择一个 Follower就行。只有当消息被所有的副本加入到日志中时,才算是“committed”,只有 committed的消息才会发送给 consumer,这样就不用担心 Leader Down掉了消息会丢失。Kafka 选择一个节点作为“controller”,当发现有Leader 节点 Down掉的时候它负责在LSR 分区的所有节点中选择新的 Leader,这使得 Kafka可以批量的高效的管理所有分区节点的主从关系。如果 controller down掉了,活着的节点中的一个会被切换为新的 controller。
Leader 维护一个与其基本保持同步的 Replica列表,该列表称为 ISR(in-sync Replica),每个 Partition都会有一个 ISR,而且是由Leader动态维护 ,如果 Follower 比 Leader落后太多消息数量【replica.lag.max.messages】,或者超过一定时间未发起数据复制请求【replica.lag.time.max.ms】,则 Leader将其从 ISR中移除 。
Kafka 的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka 的特性之一就是高吞吐率。Kafka 之所以能这么快,是因为顺序写磁盘、大量使用内存页 、零拷贝技术的使用
【数据写入】:Kafka 会把收到的消息都写入到硬盘中,不会丢失数据。为了优化写入速度 Kafka 采用了两个技术, 顺序写入和 MMFile(Memory Mapped File)
【原因一:顺序写入】:磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机 I/O,最喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。如果在内存做这些操作的时候,一个是 Java 对象的内存开销很大,另一个是随着堆内存数据的增多,Java 的 GC 时间会变得很长。
【使用磁盘操作有以下几个好处】:①、磁盘顺序读写速度超过内存随机读写。②、JVM 的 GC 效率低,内存占用大。使用磁盘可以避免这一问题。③、系统冷启动后,磁盘缓存依然可用。下图就展示了 Kafka 是如何写入数据的, 每一个 Partition 其实都是一个文件 ,收到消息后 Kafka 会把数据插入到文件末尾(虚框部分):
该方法的缺陷:没有办法删除数据 ,所以 Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个 Topic 都有一个 Offset 用来表示读取到了第几条数据 。
如果不删除硬盘肯定会被撑满,所以 Kakfa 提供了两种策略来删除数据:
【1】基于时间;
【2】基于 Partition 文件大小;
【原因二:Memory Mapped Files】:即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以 Kafka 的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统分页存储来利用内存提高 I/O 效率。Memory Mapped Files(后面简称 mmap)也被翻译成内存映射文件 ,在 64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。通过 mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小,有虚拟内存为我们兜底。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。(调用文件的 Read 会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中)但也有一个很明显的缺陷:不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 Flush 的时候才把数据真正的写到硬盘。Kafka 提供了一个参数 producer.type 来控制是不是主动 Flush:如果 Kafka 写入到 mmap 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。如果 Kafka 写入 mmap 之后立即返回 Producer 不调用 Flush 叫异步 (Async)。
【原因三:Zero Copy】:传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:调用 Read 函数,文件数据被 Copy 到内核缓冲区。Read 函数返回,文件数据从内核缓冲区 Copy 到用户缓冲区。Write 函数调用,将文件数据从用户缓冲区 Copy 到内核与 Socket 相关的缓冲区。数据从 Socket 缓冲区 Copy 到相关协议引擎。以上细节是传统 Read/Write 方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次 Copy 操作:硬盘—>内核 buf—>用户 buf—>Socket 相关缓冲区—>协议引擎。而 Sendfile 系统调用则提供了一种减少以上多次 Copy,提升文件传输性能的方法。在内核版本 2.1 中,引入了 Sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。Sendfile 的引入不仅减少了数据复制,还减少了上下文切换。sendfile(socket, file, len);
【运行流程如下】:【1】Sendfile 系统调用,文件数据被 Copy 至内核缓冲区。【2】再从内核缓冲区 Copy 至内核中 Socket 相关的缓冲区。【3】最后再 Socket 相关的缓冲区 Copy 到协议引擎。
相较传统 Read/Write 方式,2.1 版本内核引进的 Sendfile 已经减少了内核缓冲区到 User 缓冲区,再由 User 缓冲区到 Socket 相关缓冲区的文件 Copy。而在内核版本 2.4 之后,文件描述符结果被改变,Sendfile 实现了更简单的方式,再次减少了一次 Copy 操作。在 Apache、Nginx、Lighttpd 等 Web 服务器当中,都有一项 Sendfile 相关的配置,使用 Sendfile 可以大幅提升文件传输性能。Kafka 把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候 Kafka 直接把文件发送给消费者,配合 mmap 作为文件读写方式,直接把它传给 Sendfile。
【原因四:批量压缩】:在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络 IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的 CPU 资源,不过对于 Kafka 而言,网络 IO 更应该考虑:
■ 因为每个消息都压缩,但是压缩率相对很低,所以 Kafka 使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。
■ Kafka 允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。
■ Kafka 支持多种压缩协议,包括 Gzip 和 Snappy 压缩协议。
【总结】:Kafka 速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗,通过 mmap 提高 I/O 速度。写入数据的时候由于单个 Partion 是末尾添加,所以速度最优;读取数据的时候配合 Sendfile 直接暴力输出。
更多相关知识链接
消息审计是指在消息生产、存储和消费的整个过程之间对消息个数及延迟的审计,以此来检测是否有数据丢失、是否有数据重复、端到端的延迟是多少等。
目前与消息审计有关的产品也有多个,比如 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它们主要通过在消息体(value 字段)或在消息头(headers 字段)中内嵌消息对应的时间戳 timestamp 或全局的唯一标识ID(或者是两者兼备)来实现消息的审计功能。
内嵌 timestamp 的方式主要是设置一个审计的时间间隔 time_bucket_interval(可以自定义设置几秒或几分钟),根据这个 time_bucket_interval 和消息所属的 timestamp 来计算相应的时间桶(time_bucket)。
内嵌 ID 的方式就更加容易理解了,对于每一条消息都会被分配一个全局唯一标识 ID。如果主题和相应的分区固定,则可以为每个分区设置一个全局的 ID。当有消息发送时,首先获取对应的 ID,然后内嵌到消息中,最后才将它发送到 broker 中。消费者进行消费审计时,可以判断出哪条消息丢失、哪条消息重复。
消息轨迹指的是一条消息从生产者发出,经由 broker 存储,再到消费者消费的整个过程中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。生产者、broker、消费者这3个角色在处理消息的过程中都会在链路中增加相应的信息,将这些信息汇聚、处理之后就可以查询任意消息的状态,进而为生产环境中的故障排除提供强有力的数据支持。
对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,我们需要将这些信息保存起来。我们同样可以将轨迹信息保存到 Kafka 的某个主题中,比如下图中的主题 trace_topic。
生产者在将消息正常发送到用户主题 real_topic 之后(或者消费者在拉取到消息消费之后)会将轨迹信息发送到主题 trace_topic 中。
如果消费者客户端的 isolation.level 参数配置为“read_uncommitted”(默认),它对应的 Lag 等于HW – ConsumerOffset 的值,其中 ConsumerOffset 表示当前的消费位移。
如果这个参数配置为“read_committed”,那么就要引入 LSO 来进行计算了。LSO 是 LastStableOffset 的缩写,它对应的 Lag 等于 LSO – ConsumerOffset 的值。
【1】首先通过 DescribeGroupsRequest 请求获取当前消费组的元数据信息,当然在这之前还会通过 FindCoordinatorRequest 请求查找消费组对应的 GroupCoordinator。
【2】接着通过 OffsetFetchRequest 请求获取消费位移 ConsumerOffset。
【3】然后通过 KafkaConsumer 的 endOffsets(Collection partitions)方法(对应于 ListOffsetRequest 请求)获取 HW(LSO)的值。
【4】最后通过 HW 与 ConsumerOffset 相减得到分区的 Lag,要获得主题的总体 Lag 只需对旗下的各个分区累加即可。
JMX的全称为Java Management Extensions。可以管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等。
比较重要的 Broker 端 JMX 指标:
【1】BytesIn/BytesOut:即 Broker 端每秒入站和出站字节数。你要确保这组值不要接近你的网络带宽,否则这通常都表示网卡已被“打满”,很容易出现网络丢包的情形。
【2】NetworkProcessorAvgIdlePercent:即网络线程池线程平均的空闲比例。通常来说,你应该确保这个 JMX 值长期大于 30%。如果小于这个值,就表明你的网络线程池非常繁忙,你需要通过增加网络线程数或将负载转移给其他服务器的方式,来给该 Broker 减负。
【3】RequestHandlerAvgIdlePercent:即 I/O 线程池线程平均的空闲比例。同样地,如果该值长期小于 30%,你需要调整 I/O 线程池的数量,或者减少 Broker 端的负载。
【4】UnderReplicatedPartitions:即未充分备份的分区数。所谓未充分备份,是指并非所有的 Follower 副本都和 Leader 副本保持同步。一旦出现了这种情况,通常都表明该分区有可能会出现数据丢失。因此,这是一个非常重要的 JMX 指标。【5】【5】【5】ISRShrink/ISRExpand:即 ISR 收缩和扩容的频次指标。如果你的环境中出现 ISR 中副本频繁进出的情形,那么这组值一定是很高的。这时,你要诊断下副本频繁进出 ISR 的原因,并采取适当的措施。
【6】ActiveControllerCount:即当前处于激活状态的控制器的数量。正常情况下,Controller 所在 Broker 上的这个 JMX 指标值应该是 1,其他 Broker 上的这个值是 0。如果你发现存在多台 Broker 上该值都是 1 的情况,一定要赶快处理,处理方式主要是查看网络连通性。这种情况通常表明集群出现了脑裂。脑裂问题是非常严重的分布式故障,Kafka 目前依托 ZooKeeper 来防止脑裂。但一旦出现脑裂,Kafka 是无法保证正常工作的。
【1】消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
【2】存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
【3】流式处理平台: Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
HW(High Watermark):高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。
LSO(LogStartOffset):一般情况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 脚本、日志的清理和截断等操作进行修改。
如上图所示,它代表一个日志文件,这个日志文件中有9条消息,第一条消息的 offset(LogStartOffset)为0,最后一条消息的 offset 为8,offset 为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取到 offset 在0至5之间的消息,而 offset 为6的消息对消费者而言是不可见的。
LEO(Log End Offset):标识当前日志文件中下一条待写入消息的 offset,上图中 offset 为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
LW(Low Watermark):低水位,代表 AR 集合中最小的 logStartOffset 值。副本的拉取请求(FetchRequest,它有可能触发新建日志分段而旧的被清理,进而导致 logStartOffset 的增加)和删除消息请求(DeleteRecordRequest)都有可能促使 LW 的增长。
HW与LEO之间的关系链接
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。链接
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。当前消费者需要提交的消费位移是offset+1
【1】Rebalance:一个 consumer正在消费一个分区的一条消息,还没有消费完,发生了 rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍。
【2】消费者端手动提交:如果先消费消息,再更新offset位置,导致消息重复消费。
【3】消费者端自动提交:设置 offset为自动提交,关闭 kafka时,如果在 close之前,调用 consumer.unsubscribe() 则有可能部分 offset没提交,下次重启会重复消费。
【4】生产者端:生产者因为业务问题导致的宕机,在重启之后可能数据会重发。
【1】自动提交:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
【2】生产者发送消息:发送消息设置的是fire-and-forget(发后即忘 ack=0),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。
【3】消费者端:先提交位移,但是消息还没消费完就宕机了,造成了消息没有被消费。自动位移提交同理。
【4】acks没有设置为all:如果在 broker还没把消息同步到其他 broker的时候宕机了,那么消息将会丢失。
【1】线程封闭,即为每个线程实例化一个 KafkaConsumer 对象
一个线程对应一个 KafkaConsumer 实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。
【2】消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:
两个方案对比:
在执行完脚本之后,Kafka 会在 log.dir 或 log.dirs 参数所配置的目录下创建相应的主题分区,默认情况下这个目录为 /tmp/kafka-logs/。 在 ZooKeeper 的/brokers/topics/目录下创建一个同名的实节点,该节点中记录了该主题的分区副本分配方案。示例如下:
[zk: localhost:2181/kafka(CONNECTED) 2] get /brokers/topics/topic-create
{"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
可以增加,使用 kafka-topics 脚本,结合 –alter 参数来增加某个主题的分区数,命令如下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic
当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。最后,Rebalance 实在是太慢了。
不支持,因为删除的分区中的消息不好处理。如果直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于 Spark、Flink 这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。
在 Kafka 中,性能与分区数有着必然的关系,在设定分区数时一般也需要考虑性能的因素。对不同的硬件而言,其对应的性能也会不太一样。可以使用Kafka 本身提供的用于生产者性能测试的 kafka-producer- perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh来进行测试。
增加合适的分区数可以在一定程度上提升整体吞吐量,但超过对应的阈值之后吞吐量不升反降。如果应用对吞吐量有一定程度上的要求,则建议在投入生产环境之前对同款硬件资源做一个完备的吞吐量相关的测试,以找到合适的分区数阈值区间。
分区数的多少还会影响系统的可用性。如果分区数非常多,如果集群中的某个 broker 节点宕机,那么就会有大量的分区需要同时进行 leader 角色切换,这个切换的过程会耗费一笔可观的时间,并且在这个时间窗口内这些分区也会变得不可用。
分区数越多也会让 Kafka 的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。
__consumer_offsets:作用是保存 Kafka 消费者的位移信息
__transaction_state:用来存储事务日志消息
所谓的优先副本是指在AR集合列表中的第一个副本。理想情况下,优先副本就是该分区的 Leader 副本,所以也可以称之为 preferred leader。Kafka 要确保所有主题的优先副本在 Kafka 集群中均匀分布,这样就保证了所有分区的 Leader 均衡分布。以此来促进集群的负载均衡,这一行为也可以称为“分区平衡”。
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。
Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率:
【1】偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置。
【2】时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka是通过seek() 方法来指定消费的,在执行 seek() 方法之前要去执行一次 poll()方法,等到分配到分区之后会去对应的分区的指定位置开始消费,如果指定的位置发生了越界,那么会根据auto.offset.reset 参数设置的情况进行消费。
Kafka提供了一个 offsetsForTimes() 方法,通过 timestamp 来查询与此对应的分区位置。offsetsForTimes() 方法的参数 timestampsToSearch 是一个 Map 类型,key 为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp 字段。
日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。我们可以通过 broker 端参数 log.cleanup.policy 来设置日志清理策略,此参数的默认值为 “delete”,即采用日志删除的清理策略。
【1】基于时间:日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments)retentionMs 可以通过 broker 端参数 log.retention.hours、log.retention.minutes 和 log.retention.ms 来配置,其中 log.retention.ms 的优先级最高,log.retention.minutes 次之,log.retention.hours 最低。默认情况下只配置了 log.retention.hours 参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
删除日志分段时,首先会从 Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上“.deleted”的后缀(当然也包括对应的索引文件)。最后交由一个以“delete-file”命名的延迟任务来删除这些以“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数来调配,此参数的默认值为60000,即1分钟。
【2】基于日志大小:日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。retentionSize 可以通过 broker 端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。注意 log.retention.bytes 配置的是 Log 中所有日志文件的总大小,而不是单个日志分段(确切地说应该为 .log 日志文件)的大小。单个日志分段的大小由 broker 端参数 log.segment.bytes 来限制,默认值为1073741824,即 1GB。这个删除操作和基于时间的保留策略的删除操作相同。
【3】基于日志起始偏移量:基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量 baseOffset 是否小于等于 logStartOffset,若是,则可以删除此日志分段。
如上图所示,假设 logStartOffset 等于25,日志分段1的起始偏移量为0,日志分段2的起始偏移量为11,日志分段3的起始偏移量为23,通过如下动作收集可删除的日志分段的文件集合 deletableSegments:
从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为11,小于 logStartOffset 的大小,将日志分段1加入 deletableSegments。日志分段2的下一个日志偏移量的起始偏移量为23,也小于 logStartOffset 的大小,将日志分段2加入 deletableSegments。日志分段3的下一个日志偏移量在 logStartOffset 的右侧,故从日志分段3开始的所有日志分段都不会加入 deletableSegments。收集完可删除的日志分段的文件集合之后的删除操作同基于日志大小的保留策略和基于时间的保留策略相同
日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
如果要采用日志压缩的清理策略,就需要将 log.cleanup.policy 设置为“compact”,并且还需要将 log.cleaner.enable (默认值为 true)设定为 true。
如下图所示,Log Compaction 对于有相同 key 的不同 value 值,只保留最后一个版本。如果应用只关心 key 对应的最新 value 值,则可以开启 Kafka 的日志清理功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
Kafka 中有多种延时操作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。延时操作创建之后会被加入延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮[链接](TimingWheel)实现的。
在 Kafka 集群中会有一个或多个 Broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 Leader 副本出现故障时,由控制器负责为该分区选举新的 Leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有 Broker更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 Topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka事务链接,为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。每个新的生产者实例在初始化的时候都会被分配一个 PID,这个 PID 对用户而言是完全透明的。对于每个 PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将
Broker 端会在内存中为每一对
就目前而言,一共有如下几种情形会触发再均衡的操作:
【1】有新的消费者加入消费组;
【2】有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线;
【3】有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了 方法取消对某些主题的订阅;
【4】消费组所对应的 GroupCoorinator 节点发生了变更;
【5】消费组内所订阅的任一主题或者主题的分区数量发生变化;
GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。
第一阶段(FIND_COORDINATOR):消费者需要确定它所属的消费组对应的 GroupCoordinator 所在的 Broker,并创建与该 Broker 相互通信的网络连接。如果消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,并且与它之间的网络连接是正常的,那么就可以进入第二阶段。否则,就需要向集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator,这里的“某个节点”并非是集群中的任意节点,而是负载最小的节点。
第二阶段(JOIN_GROUP):在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
【选举消费组的Leader】:如果消费组内还没有 Leader,那么第一个加入消费组的消费者即为消费组的 Leader。如果某一时刻 Leader 消费者由于某些原因退出了消费组,那么会重新选举一个新的 Leader;
【选举分区分配策略】:【1】收集各个消费者支持的所有分配策略,组成候选集 candidates;
【2】每个消费者从候选集 candidates 中找出第一个自身支持的策略,为这个策略投上一票。
【3】计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
第三阶段(SYNC_GROUP):Leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者,通过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。
第四阶段(HEARTBEAT):进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交到了 GroupCoordinator,并且 GroupCoordinator 将其保存到了 Kafka 内部的 __consumer_offsets 主题中,此时消费者可以通过 OffsetFetchRequest 请求获取上次提交的消费位移并从此处继续消费。
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停止发送心跳的时间足够长,则整个会话就被判定为过期,GroupCoordinator 也会认为这个消费者已经死亡,就会触发一次再均衡行为。
【1】提高了 Partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力;
【2】对于单 Partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力;
【3】增多consumer group 中 consumer的数量,消费者数=分区数;
【4】如果是下游的数据处理不及时:提高每批次拉取的数量。(拉取数据/处理时间<生产速度)
【1】生产者与Broker 之间可以通过开启 Broker 的幂等性 idempotence = true 实现 Exactly-Once[链接] ,解决消息重发问题;
【2】消费者与Broker 之间需要消费者通过唯一标识进行幂等性处理,解决消息重发问题;
【1】生产者与 Broker 之间的消息一致性设置:Broker 需要将 idempotence = true开启幂等性, 同时 ack = -1将数据写入 ISR中,保证Broker宕机后,数据不会丢失。同时,Producer 也维护了 PID(自增)与分区之间的关系,当旧 Producer发送消息时,Broker 也会进行丢弃,来保证消息一致性。同时将 retry > 1 ,保证消费发送失败之后,消息能够重新发送,保证消息不丢失 。如果想保证多个消息发送消息的一致性需要加入 kafka 事务[链接]。
【2】Broker 集群消息一致性:Broker 读写都是在 Leader 上进行,所以不存在读写消息差异问题。但是为了防止 Leader 挂掉之后的消息一致性,就需要设置 ISR的规则(消息)包括延迟时间 replica.lag.time.max.ms [默认值为10000] 和延迟条数 replica.lag.max.messages[默认值为4000]
【3】消费者与 Broker之间的消息一致性设置:首先关闭自动提交 offset,防止消息丢失。业务逻辑处理完成后进行手动提交。根据唯一主键保证消息的幂等性。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章