kafka-伪集群搭建
阅读原文时间:2023年07月11日阅读:2

一、简介

Apache Kafka是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统,使用Scala与Java语言编写,能够将消息从一个端点传递到另一个端点,较之传统的消息中间件(例如ActiveMQ、RabbitMQ),Kafka具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息处理应用程序

1、Kafka架构

2、kafka高吞吐率实现

Kafka与其它MQ相比,其最大的特点就是高吞吐率。为了增加存储能力,Kafka将所有的消息都写入到了低速大容的硬盘。按理说,这将导致性能损失,但实际上,kafka仍可保持超高的吞吐率,性能并未受到影响。其主要采用了如下的方式实现了高吞吐率。

顺序读写:Kafka将消息写入到了分区partition中,而分区中消息是顺序读写的。顺序读写要远快于随机读写。

零拷贝:生产者、消费者对于kafka中消息的操作是采用零拷贝实现的。

批量发送:Kafka允许使用批量消息发送模式。

消息压缩:Kafka支持对消息集合进行压缩。

官网 http://kafka.apache.org/

1、选择 Download  ->   Scala 2.11  - kafka_2.11-2.2.1.tgz (ascsha512)  

2、鼠标右键复制链接地址

3、执行命令

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz

4、结果

 

1、执行解压命令

tar -zxvf kafka_2.12-2.6.0.tgz -C /usr/local/

cd /usr/local

mv kafka_2.12-2.6.0 kafka

2、复制配置修改server1.properties配置

cd /usr/local/kafka/config

cp server.properties server1.properties

vim server1.properties

 修改的地方 

broker.id=1

listeners=PLAINTEXT://:9091

log.dirs=/usr/local/data/kafka-logs-1

default.replication.factor=2 #新增

 复制1的配置 到2 3 

cp server1.properties server2.properties

cp server1.properties server3.properties

修改broker.id 为 2 3  

log.dirs路径 

/usr/local/data/kafka-logs-3

/usr/local/data/kafka-logs-2

mkdir -p /usr/local/data/kafka-logs-3

mkdir -p /usr/local/data/kafka-logs-2

mkdir -p /usr/local/data/kafka-logs-1

3. 启动集群及测试

/usr/local/kafka/bin/kafka-server-start.sh -daemon ./config/server1.properties

/usr/local/kafka/bin/kafka-server-start.sh -daemon ./config/server2.properties

/usr/local/kafka/bin/kafka-server-start.sh -daemon ./config/server3.properties

注:如果单机伪集群内存不够,可以修改启动脚本,将红框内的内存改小些,默认为1G

vim ./bin/kafka-server-start.sh

如果不熟悉kafka集群,则可以在启动时不加 -daemon参数,观察kafka启动过程 

如果在启动中报错,kafka会在安装目录下生成一个错误log,可以通过查看该文件排错直至启动 

连接zookeeper测试

/usr/local/zookeeper/bin/zkCli.sh -server 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184

[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 0] ls /brokers/ids

[1, 2, 3]

[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 1]

通过kafka消息发送接收验证 

如果是伪集群,则打开多个liunx会话窗口

#在窗口1创建一个producer,topic为test,broker-list为zookeeper集群ip+端口

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093 --topic test

#在窗口2创建一个consumer,topic为test,bootstrap-server为zookeeper集群ip+端口

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093 --topic test --from-beginning

在窗口1中输入helloword,如果在窗口2中接收到则集群搭建成功 

1

# ----------------------系统相关----------------------

2

# broker的全局唯一编号,不能重复,和zookeeper的myid是一个意思

3

broker.id=0

4

5

# broker监听IP和端口也可以是域名

6

listeners=PLAINTEXT://172.17.56.175:9092

7

8

# 用于接收请求的线程数量

9

num.network.threads=3

10

11

# 用于处理请求的线程数量,包括磁盘IO请求,这个数量和log.dirs配置的目录数量有关,这里的数量不能小于log.dirs的数量,

12

# 虽然log.dirs是配置日志存放路径,但是它可以配置多个目录后面用逗号分隔

13

num.io.threads=8

14

15

# 发送缓冲区大小,也就是说发送消息先发送到缓冲区,当缓冲区满了之后一起发送出去

16

socket.send.buffer.bytes=102400

17

18

# 接收缓冲区大小,同理接收到缓冲区,当到达这个数量时就同步到磁盘

19

socket.receive.buffer.bytes=102400

20

21

# 向kafka套接字请求最大字节数量,防止服务器OOM,也就是OutOfMemery,这个数量不要超过JAVA的堆栈大小,

22

socket.request.max.bytes=104857600

23

24

# 日志路径也就是分区日志存放的地方,你所建立的topic的分区就在这里面,但是它可以配置多个目录后面用逗号分隔

25

log.dirs=/tmp/kafka-logs

26

27

# 消息体(也就是往Kafka发送的单条消息)最大大小,单位是字节,必须小于socket.request.max.bytes值

28

message.max.bytes =5000000

29

30

# 自动平衡由于某个broker故障会导致Leader副本迁移到别的broker,当之前的broker恢复后也不会迁移回来,有时候我们需要

31

# 手动进行平衡避免同一个主题不同分区的Leader副本在同一台broker上,下面这个参数就是开启自动平衡功能

32

auto.leader.rebalance.enable=true

33

34

#设置了上面的自动平衡,当故障转移后,隔300秒(默认)触发一个定时任务进行平衡操作,而只有代理的不均衡率为10%以上才会执行

35

leader.imbalance.check.interval.seconds=300

36

37

# 设置代理的不均衡率,默认是10%

38

leader.imbalance.per.broker.percentage=10

39

40

# ---------------分区相关-------------------------

41

42

# 默认分区数量,当建立Topic时不指定分区数量,默认就1

43

num.partitions=1

44

45

# 是否允许自动创建topic ,若是false,就需要通过命令创建topic

46

auto.create.topics.enable =true

47

48

# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数

49

default.replication.factor =2

50

51

# ---------------日志相关-------------------------

52

53

# segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。

54

# 这里就是用来设置恢复和清理data下数据的线程数量

55

num.recovery.threads.per.data.dir=1

56

57

# 日志文件中每个segment的大小,默认为1G。topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,当超过这个大小会建立一个新日志文件

58

# 这个参数会被topic创建时的指定参数覆盖,如果你创建Topic的时候指定了这个参数,那么你以你指定的为准。

59

log.segment.bytes=1073741824

60

61

# 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据

62

# log.retention.bytes和log.retention.minutes|hours任意一个达到要求,都会执行删除

63

# 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准

64

log.retention.hours|minutes=168

65

66

# 这个参数会在日志segment没有达到log.segment.bytes设置的大小默认1G的时候,也会强制新建一个segment会被

67

# topic创建时的指定参数覆盖

68

log.roll.hours=168

69

70

# 上面的参数设置了每一个segment文件的大小是1G,那么就需要有一个东西去定期检查segment文件有没有达到1G,多长时间去检查一次,

71

# 就需要设置一个周期性检查文件大小的时间(单位是毫秒)。

72

log.retention.check.interval.ms=300000

73

74

# 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,

75

# 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准

76

log.cleanup.policy = delete

77

78

# 是否启用日志清理功能,默认是启用的且清理策略为compact,也就是压缩。

79

log.cleaner.enable=false

80

81

# 日志清理时所使用的缓存空间大小

82

log.cleaner.dedupe.buffer.size=134217728

83

84

# log文件"sync"到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段

85

# 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.

86

# 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)

87

# 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.

88

# 物理server故障,将会导致没有fsync的消息丢失.

89

log.flush.interval.messages=9223372036854775807

90

91

# 检查是否需要固化到硬盘的时间间隔

92

log.flush.scheduler.interval.ms =3000

93

94

# 仅仅通过interval来控制消息的磁盘写入时机,是不足的.

95

# 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔

96

# 达到阀值,也将触发.

97

log.flush.interval.ms = None

98

99

# --------------------------复制(Leader、replicas) 相关-------------------

100

# partition leader与replicas之间通讯时,socket的超时时间

101

controller.socket.timeout.ms =30000

102

103

# replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),

104

# 并认为它是死的,不会再加入管理中

105

replica.lag.time.max.ms =10000

106

107

# follower与leader之间的socket超时时间

108

replica.socket.timeout.ms=300000

109

110

# leader复制时候的socket缓存大小

111

replica.socket.receive.buffer.bytes=65536

112

113

# replicas每次获取数据的最大大小

114

replica.fetch.max.bytes =1048576

115

116

# replicas同leader之间通信的最大等待时间,失败了会重试

117

replica.fetch.wait.max.ms =500

118

119

# fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件

120

replica.fetch.min.bytes =1

121

122

# leader 进行复制的线程数,增大这个数值会增加follower的IO

123

num.replica.fetchers=1

124

125

# 最小副本数量

126

min.insync.replicas = 2

热门专题

Android stuio push obb 后

Linux系统被入侵后的排查过程

devpress 统一设置字体除了按钮控件

红米 10x 编译Lineage OS

ora-01652无法通过128表空间扩展

vs 编写com组件

C# Stack 最多能容纳多少个对象

win10睡眠倒计时

gephi 导入数据

第一个方格放一粒小麦Python

python 使用rs485通过二进制码流通讯

C# Combox模糊搜索

win7uefi版本下载

利用原生类绕过wakeup

cmd怎么快速找到空文件夹

cocos creator 调用别的场景里面的方法

Visual C 6.0界面

java获取当前时间并格式化

pytorch 翻译

doxyfile生成的文档不完整

Home

Powered By WordPress