kafka生产者调优手册
阅读原文时间:2022年04月06日阅读:1

目录

第一章 kafka硬件配置选择

100 万日活,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。
1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。每条日志大小:0.5k - 2k(取 1k)。
1150 条/每秒钟 * 1k ≈ 1m/s 。
高峰期每秒钟:1150 条 * 20 倍 = 23000 条。每秒多少数据量:20MB/s。


服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1
= 2 * (20m/s * 2 / 100) + 1
= 2 * 1 +1
= 3 台
建议 3 台服务器。
(20m/s * 2 / 100)往上取整数


kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。建议选择普通的机械硬盘。
每天总数据量:1 亿条 * 1k ≈ 100g
100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T
建议三台服务器硬盘总大小,大于等于 1T。


Kafka 内存组成:堆内存 + 页缓存

1.4.1 堆内存配置

Kafka堆内存建议每个节点:10g ~ 15g,在kafka-server-start.sh中修改
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"

查看kafka进程号:jps

根据kafka进程号,查看kafka的堆内存:jmap -heap 32339

1.4.2 页缓存选择

页缓存:页缓存是Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中25%的数据在内存中就好。

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小
=(10 * 1g * 25%)/ 3 ≈ 1g
建议服务器内存大于等于 11G。


num.io.threads = 16    负责写磁盘的线程数,整个参数值要占总核数的 50%。
num.replica.fetchers = 6 副本拉取线程数,这个参数占总核数的 50%的 1/3。
num.network.threads = 9    数据传输线程数,这个参数占总核数的 50%的 2/3。
建议 32 个cpu core。


网络带宽 = 峰值吞吐量 ≈ 20MB/s    选择千兆网卡即可。
100Mbps 单位是 bit;10M/s 单位是 byte ;
1byte = 8bit,100Mbps/8 = 12.5M/s。
一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。

第二章 kafka生产者

参数名称

描述

bootstrap.servers

生产者连接集群所需的 broker 地址清单。 例如 hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查

key.serializer 和 value.serializer

找到其他 broker 信息。指定发送消息的 key 和 value 的序列化类型。一定要写 全类名。

buffer.memory

RecordAccumulator 缓冲区总大小,默认 32m。

batch.size

缓冲区一批数据最大值,默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。

max.in.flight.requests.per.connection

允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。

retries

当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms

两次重试之间的时间间隔,默认是 100ms。

enable.idempotence

是否开启幂等性,默认 true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是 none,也就 是不压缩。

参数名称

描述

buffer.memory

RecordAccumulator 缓冲区总大小,默认 32m。

batch.size

缓冲区一批数据最大值,默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。

compression.type

生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。

参数名称

描述

acks

0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。

参数名称

描述

enable.idempotence

是否开启幂等性,默认 true,表示开启幂等性。

单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序;

参数名称

描述

enable.idempotence

是否开启幂等性,默认 true,表示开启幂等性。

max.in.flight.requests.per.connection

允许最多没有返回 ack 的次数,默认为 5,开启幂等性 要保证该值是 1-5 的数字。

第三章 kafka Broker

参数名称

描述

replica.lag.time.max.ms

ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出ISR。 该时间阈值,默认 30s。

auto.leader.rebalance.enable

默认是 true。 自动 Leader Partition 平衡。建议 关闭。

leader.imbalance.per.broker.percentage

默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。

leader.imbalance.check.interval.seconds

默认值 300 秒。检查 leader 负载是否平衡的间隔 时间。

log.segment.bytes

Kafka 中 log 日志是分成一块块存储的,此配置 是指 log 日志划分 成块的大小,默认值 1G。

log.index.interval.bytes

默认 4kb,kafka 里面每当写入了 4kb 大小的日志 (.log),然后就往 index 文件里面记录一个索引。

log.retention.hours

Kafka 中数据保存的时间,默认 7 天。

log.retention.minutes

Kafka 中数据保存的时间,分钟级别,默认关闭。

log.retention.ms

Kafka 中数据保存的时间,毫秒级别,默认关闭。

log.retention.check.interval.ms

检查数据是否保存超时的间隔,默认是 5 分钟。

log.retention.bytes

默认等于-1,表示无穷大。超过设置的所有日志 总大小,删除最早的 segment。

log.cleanup.policy

默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩 策略。

num.io.threads

默认是 8。负责写磁盘的线程数。整个参数值要 占总核数的 50%。

num.replica.fetchers

默认是 1。副本拉取线程数,这个参数占总核数 的 50%的 1/3

num.network.threads

默认是 3。数据传输线程数,这个参数占总核数 的 50%的 2/3 。

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改, 交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,默认是 null。一般不 建议修改,交给系统自己管理。

参数名称

描述

auto.leader.rebalance.enable

默认是 true。自动 Leader Partition 平衡。生产环 境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。

leader.imbalance.per.broker.percentage

默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。

leader.imbalance.check.interval.seconds

默认值 300 秒。检查 leader 负载是否平衡的间隔 时间。

  如果 broker 端配置参数auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。
  所以生产环境应该将参数auto.create.topics.enable设置为flase

第四章 kafka消费者

参数名称

描述

bootstrap.servers

向 Kafka 集群建立初始连接用到的 host/port 列表。

key.deserializer 和 value.deserializer

指定接收消息的 key 和 value 的反序列化类型。一定要写全 类名。

group.id

标记消费者所属的消费者组。

enable.auto.commit

默认值为 true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为 true, 则该值定义了 消费者偏移量向Kafka 提交的频率,默认 5s。

auto.offset.reset

当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions

consumer_offsets 的分区数,默认是 50 个分区。不建议修 改。

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3。不建议修改。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡。

fetch.min.bytes

默认 1 个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms

默认 500ms。如果没有从服务器端获取到一批数据的最小字 节数。该时间到,仍然会返回数据。

fetch.max.bytes

默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes ( broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

增加分区数;
./kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

参数名称

描述

fetch.max.bytes

默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config) or max.message.bytes (topic config)影响。

max.poll.records

一次 poll 拉取数据返回消息的最大条数,默认是 500 条

第五章 kafka总体

(1)    buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。
(2)    batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(3)    linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100 毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长, 会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(4)    compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的CPU 开销。 2)增加分区


(1)    调整 fetch.max.bytes 大小,默认是 50m。
(2)    调整 max.poll.records 大小,默认是 500 条


acks 设置为-1 (acks=-1),保证数据不丢失
enable.idempotence=true,开启幂等性,保证数据不重复
ISR中最小副本数大于等于2


(1)    创建一个只有 1 个分区的 topic。
(2)    测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。
(3)    假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。
(4)    然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s; 分区数 = 100 / 20 = 5 分区
分区数一般设置为:3-10 个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。


在生产环境中,如果某个 Kafka 节点挂掉。正常处理办法:
(1)    先尝试重新启动一下,如果能启动正常,那直接解决。
(2)    如果重启不行,考虑增加内存、增加 CPU、网络带宽。
(3)    如果将 kafka 整个节点误删除,如果副本数大于等于 2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。

5.6.1 kafka 压测

用 Kafka 官方自带的脚本,对Kafka 进行压测。
生产者压测:kafka-producer-perf-test.sh
消费者压测:kafka-consumer-perf-test.sh

5.6.2 kafka producer压测

(1)    创建一个 test topic,设置为 3 个分区 3 个副本
./kafka-topics.sh --bootstrap- server hadoop102:9092 --create --replication-factor 3 -- partitions 3 --topic test
(2)    在/opt/module/kafka/bin 目录下面有这两个文件。我们来测试一下
./kafka-producer-perf-test.sh -- topic test --record-size 1024 --num-records 1000000 --throughput 10000    --producer-props
bootstrap.servers=192.168.0.215:9092,192.168.0.216:9092,192.168.0.217:9092 batch.size=16384 linger.ms=0


//参数说明
record-size 是一条信息有多大,单位是字节,本次测试设置为 1k。
num-records 是总共发送多少条信息,本次测试设置为 100 万条。
throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次实验设置为每秒钟 1 万条。
producer-props 后面可以配置生产者相关参数,batch.size 配置为 16k。
batch.size 一批多大,1K=1024
linger.ms 等待时间默认是0ms,设置为50ms
compression.type 压缩方式,默认none,可设置gzip,snappy,lz4
buffer.memory 缓存大小,默认32m。设置64,根据实际调整

5.6.3 kafka consumer压测

(1)    修改/opt/module/kafka/config/consumer.properties 文件中的一次拉取条数为 500
max.poll.records=500
//参数说明
--bootstrap-server 指定Kafka 集群地址
--topic 指定 topic 的名称
--messages 总共要消费的消息个数。本次实验 100 万条。

(2)    消费 100 万条日志进行压测
./kafka-consumer-perf-test.sh -- bootstrap-server 192.168.0.215:9092,192.168.0.216:9092,192.168.0.217:9092 -
-topic    test    --messages  1000000 --consumer.config config/consumer.properties
//参数说明
--bootstrap-server 指定Kafka 集群地址
--topic 指定 topic 的名称
--messages 总共要消费的消息个数。本次实验 100 万条。

(3)    调整 fetch.max.bytes 大小为 100m
  ①修改/opt/module/kafka/config/consumer.properties 文件中的拉取一批数据大小 100m
fetch.max.bytes=104857600
  ②再次执行
./kafka-consumer-perf-test.sh -- broker-list 192.168.0.215:9092,192.168.0.216:9092,192.168.0.217:9092 -- topic test --messages 1000000 --consumer.config config/consumer.properties