快速安装 kafka 集群
阅读原文时间:2022年04月02日阅读:6

最近因为工作原因,需要安装一个 kafka 集群,目前网络上有很多相关的教程,按着步骤来也能完成安装,只是这些教程都显得略微繁琐。因此,我写了这篇文章帮助大家快速完成 kafka 集群安装。

准备多台服务器,数量建议为奇数(如:3,5,7 等),操作系统为 CentOS 7+。

这里使用 3 台服务器作为例子,IP 分别为 192.168.1.1、192.168.1.2、192.168.1.3,修改下述脚本文件的 IP 地址,并拷贝到 3 台服务器上分别执行即可完成安装。

#!/bin/bash

Modify the link if you want to download other version

KAFKA_DOWNLOAD_URL="https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz"

Please use your own server ip

SERVERS=("192.168.1.1" "192.168.1.2" "192.168.1.3")

ID=0

MECHINE_IP=$(hostname -i)
echo "Mechine IP: "${MECHINE_IP}

LENGTH=${#SERVERS[@]}

for (( i=0; i<${LENGTH}; i++ ));
do
if [ "${SERVERS[$i]}" = "${MECHINE_IP}" ]; then
ID=$((i+1))
fi
done

echo "ID: "${ID}

if [ "${ID}" -eq "0" ]; then
echo "Mechine IP is not matched to server list"
exit 1
fi

ZOOKEEPER_CONNECT=$(printf ",%s:2181" "${SERVERS[@]}")
ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:1}
echo "Zookeeper Connect: "${ZOOKEEPER_CONNECT}

echo "---------- Update yum ----------"
yum update -y
yum install -y wget

echo "---------- Install java ----------"
yum -y install java-1.8.0-openjdk
java -version

echo "---------- Create kafka user & group ----------"
groupadd -r kafka
useradd -g kafka -r kafka -s /bin/false

echo "---------- Download kafka ----------"
cd /opt
wget ${KAFKA_DOWNLOAD_URL} -O kafka.tgz
mkdir -p kafka
tar -xzf kafka.tgz -C kafka --strip-components=1
chown -R kafka:kafka /opt/kafka

echo "---------- Install and start zookeeper ----------"
mkdir -p /data/zookeeper
chown -R kafka:kafka /data/zookeeper
echo "${ID}" > /data/zookeeper/myid

zookeeper config

https://zookeeper.apache.org/doc/r3.1.2/zookeeperAdmin.html#sc_configuration

cat < /opt/kafka/config/zookeeper-cluster.properties

the directory where the snapshot is stored.

dataDir=/data/zookeeper

the port at which the clients will connect

clientPort=2181

setting number of connections to unlimited

maxClientCnxns=0

keeps a heartbeat of zookeeper in milliseconds

tickTime=2000

time for initial synchronization

initLimit=10

how many ticks can pass before timeout

syncLimit=5

define servers ip and internal ports to zookeeper

EOF

for (( i=0; i<${LENGTH}; i++ )); do INDEX=$((i+1)) echo "server.${INDEX}=${SERVERS[$i]}:2888:3888" >> /opt/kafka/config/zookeeper-cluster.properties
done

zookeeper.service

cat < /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper-cluster.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start zookeeper && systemctl enable zookeeper

echo "---------- Install and start kafka ----------"
mkdir -p /data/kafka
chown -R kafka:kafka /data/kafka

kafka config

https://kafka.apache.org/documentation/#configuration

cat < /opt/kafka/config/server-cluster.properties

The id of the broker. This must be set to a unique integer for each broker.

broker.id=${ID}

Hostname and port the broker will advertise to producers and consumers. If not set,

it uses the value for "listeners" if configured. Otherwise, it will use the value

returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners=PLAINTEXT://${MECHINE_IP}:9092

A comma separated list of directories under which to store log files

log.dirs=/data/kafka

The default number of log partitions per topic. More partitions allow greater

parallelism for consumption, but this will also result in more files across

the brokers.

num.partitions=1

The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"

For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=168

The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

The interval at which log segments are checked to see if they can be deleted according

to the retention policies

log.retention.check.interval.ms=300000

Zookeeper connection string (see zookeeper docs for details).

This is a comma separated host:port pairs, each corresponding to a zk

server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

You can also append an optional chroot string to the urls to specify the

root directory for all kafka znodes.

zookeeper.connect=${ZOOKEEPER_CONNECT}/kafka

Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=60000
EOF

kafka.service

cat < /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server-cluster.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start kafka && systemctl enable kafka

setup.sh

# 启动 zookeeper
systemctl start zookeeper

停止 zookeeper

systemctl stop zookeeper

重启 zookeeper

systemctl restart zookeeper

查看 zookeeper 日志

systemctl status zookeeper -l

启动 kafka

systemctl start kafka

停止 kafka

systemctl stop kafka

重启 kafka

systemctl restart kafka

查看 kafka 日志

systemctl status kafka -l

# 进入 kafka bin 目录
cd /opt/kafka/bin/

创建一个 topic

kafka-topics.sh --create --topic test --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

查看 topic 描述

kafka-topics.sh --topic test --describe --bootstrap-server localhost:9092

启动生产者然后输入消息

kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

启动消费者消费消息

kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

删除 topic

kafka-topics.sh --topic test --delete --bootstrap-server localhost:9092

1. 以下代码主要指定下载 kafka 的版本以及服务器 IP 列表,可根据实际情况进行调整。

# Modify the link if you want to download other version
KAFKA_DOWNLOAD_URL="https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz"

Please use your own server ip

SERVERS=("192.168.1.1" "192.168.1.2" "192.168.1.3")

2. 以下代码主要用于生成 zookeeper id 和 kafka broker id 以及拼接 kafka 配置中的 zookeeper 连接串,通过本机 IP 与填写的 IP 列表进行匹配,如果本机 IP 等于第一个服务器 IP,则 ID为 1,等于第二个服务器 IP,则 ID为 2,等于第二个服务器 IP,则 ID为 3,以此类推;本机 IP 不在填写的 IP 列表中,则会退出安装。

ID=0

MECHINE_IP=$(hostname -i)
echo "Mechine IP: "${MECHINE_IP}

LENGTH=${#SERVERS[@]}

for (( i=0; i<${LENGTH}; i++ ));
do
if [ "${SERVERS[$i]}" = "${MECHINE_IP}" ]; then
ID=$((i+1))
fi
done

echo "ID: "${ID}

if [ "${ID}" -eq "0" ]; then
echo "Mechine IP is not matched to server list"
exit 1
fi

ZOOKEEPER_CONNECT=$(printf ",%s:2181" "${SERVERS[@]}")
ZOOKEEPER_CONNECT=${ZOOKEEPER_CONNECT:1}

3. 更新 yum 源,并安装 wget 下载工具

yum update -y
yum install -y wget

4. 安装 java 8

yum -y install java-1.8.0-openjdk
java -version

5. 创建 kafka 用户及组

groupadd -r kafka
useradd -g kafka -r kafka -s /bin/false

6. 下载并解压 kafka 可执行程序

cd /opt
wget ${KAFKA_DOWNLOAD_URL} -O kafka.tgz
mkdir -p kafka
tar -xzf kafka.tgz -C kafka --strip-components=1
chown -R kafka:kafka /opt/kafka

7. 创建 zookeeper 目录,创建 zookeeper id

mkdir -p /data/zookeeper
chown -R kafka:kafka /data/zookeeper
echo "${ID}" > /data/zookeeper/myid

8. 生成 zookeeper 配置文件,详细说明可参考:https://zookeeper.apache.org/doc/r3.1.2/zookeeperAdmin.html#sc_configuration

cat < /opt/kafka/config/zookeeper-cluster.properties

the directory where the snapshot is stored.

dataDir=/data/zookeeper

the port at which the clients will connect

clientPort=2181

setting number of connections to unlimited

maxClientCnxns=0

keeps a heartbeat of zookeeper in milliseconds

tickTime=2000

time for initial synchronization

initLimit=10

how many ticks can pass before timeout

syncLimit=5

define servers ip and internal ports to zookeeper

EOF

for (( i=0; i<${LENGTH}; i++ )); do INDEX=$((i+1)) echo "server.${INDEX}=${SERVERS[$i]}:2888:3888" >> /opt/kafka/config/zookeeper-cluster.properties
done

9. 创建 zookeeper systemd 管理文件,启动并设置开机启动 zookeeper

cat < /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server (Kafka)
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper-cluster.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start zookeeper && systemctl enable zookeeper

10. 创建 kafka 目录

mkdir -p /data/kafka
chown -R kafka:kafka /data/kafka

11. 生成 kafka 配置文件,详细说明可参考:https://kafka.apache.org/documentation/#configuration

cat < /opt/kafka/config/server-cluster.properties

The id of the broker. This must be set to a unique integer for each broker.

broker.id=${ID}

Hostname and port the broker will advertise to producers and consumers. If not set,

it uses the value for "listeners" if configured. Otherwise, it will use the value

returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners=PLAINTEXT://${MECHINE_IP}:9092

A comma separated list of directories under which to store log files

log.dirs=/data/kafka

The default number of log partitions per topic. More partitions allow greater

parallelism for consumption, but this will also result in more files across

the brokers.

num.partitions=1

The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"

For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

The minimum age of a log file to be eligible for deletion due to age

log.retention.hours=168

The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

The interval at which log segments are checked to see if they can be deleted according

to the retention policies

log.retention.check.interval.ms=300000

Zookeeper connection string (see zookeeper docs for details).

This is a comma separated host:port pairs, each corresponding to a zk

server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

You can also append an optional chroot string to the urls to specify the

root directory for all kafka znodes.

zookeeper.connect=${ZOOKEEPER_CONNECT}/kafka

Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=60000
EOF

12. 创建 kafka systemd 管理文件,启动并设置开机启动 kafka

cat < /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target kafka-zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server-cluster.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start kafka && systemctl enable kafka

按照上述的操作,你将快速完成 kafka 集群安装,如有问题可以在文章留言。