Linux 单机搭建 Kafka 集群

大纲

前言

本文适用于在 Centos/Debian/Ubuntu 等 Linux 发行版系统上,使用单机搭建 Kafka 集群。

Zookeeper 集群搭建

本文的 Kafka 集群搭建依赖于 Zookeeper,因此需要将 Zookeeper 单机集群提前搭建起来。值得一提的是,从 Kafka 2.8.0 版本开始,Kafka 自身实现了 Raft 分布式一致性机制,这意味着 Kafka 集群是可以脱离 ZooKeeper 独立运行的。

集群规划

节点 IP 地址端口版本号
Zookeeper 节点 1127.0.0.121813.4.10
Zookeeper 节点 2127.0.0.121823.4.10
Zookeeper 节点 3127.0.0.121833.4.10

集群搭建

由于篇幅有限,Linux 单机搭建 Zookeeper 集群的内容这里不再累述,详细教程可看 这里

Kafka 集群搭建

集群规划

节点 IP 地址端口版本号
Kafka 节点 1127.0.0.190922.13-3.2.1
Kafka 节点 2127.0.0.190932.13-3.2.1
Kafka 节点 3127.0.0.190942.13-3.2.1

集群搭建

Kafka 下载地址

  • (1) Kafka 的安装包可以从 官网 下载。
  • (2) 以下载得到的压缩文件 kafka_2.13-3.2.1.tgz 为例,2.11 是 Scala 的版本号,3.2.1 是 Kafka 的版本号。
  • (3) 值得一提的是,Kafka 的 Broker 组件是使用 Scala 开发的,而 Producer 组件和 Consumer 组件是使用 Java 开发的。
  • Kafka 安装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建安装目录
# mkdir -p /usr/local/kafka-cluster

# 进入安装目录
# cd /usr/local/kafka-cluster

# 下载文件
# wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz

# 解压文件
# tar -xvf kafka_2.13-3.2.1.tgz

# 重命名目录(作为节点一的安装目录)
# mv kafka_2.13-3.2.1 kafka-node1

# 删除文件
# rm -rf kafka_2.13-3.2.1.tgz
  • Kafka 基础配置
1
2
3
4
5
6
7
8
# 进入安装目录
# cd /usr/local/kafka-cluster/kafka-node1

# 数据存储目录
# mkdir datas

# 编辑配置文件(更改或添加以下内容即可)
# vim config/server.properties

最关键的配置内容是 broker.idlog.dirszookeeper.connect,其中的 zookeeper.connect 是 Zookeeper 连接地址,建议使用 /kafka 作为后缀,这样方便日后在 Zookeeper 里统一管理 Kafka 的数据。在开发与测试阶段,其他配置项可以根据业务需求适当调整具体的参数值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# broker 的全局唯一编号,不能重复
broker.id=1
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# 日志(数据)存放的路径
log.dirs=/usr/local/kafka-cluster/kafka-node1/datas
# 每个 topic 创建时的分区数量,默认是 1 个分区
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 是否启用自动创建主题的功能,默认是启用,禁用后可以避免因拼写错误或误用主题名而自动创建不必要的主题
auto.create.topics.enable=false
# 每个 topic 在创建时的分区副本数量,默认是 1 个副本
default.replication.factor=3
# 设置内置主题 __consumer_offsets 的副本数量,默认是 1 个副本,用于存储消费者组的偏移量
offsets.topic.replication.factor=3
# 设置事务状态日志主题 __transaction_state 的副本数量,默认是 1 个副本
transaction.state.log.replication.factor=3
# 设置事务状态日志主题的最小同步副本数(ISR),确保至少有多个副本成功同步数据才能提交事务
transaction.state.log.min.isr=2
# 每个 segment 文件保留的最长时间,超时将被删除,默认保留 7 天
log.retention.hours=168
# 每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次数据是否过期
log.retention.check.interval.ms=300000
# 配置连接 Zookeeper 集群地址
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

提示

上述部分配置默认是不存在于 Kafka 的配置文件 server.properties 中的,因此需要手动添加缺少的配置项,比如 default.replication.factor

  • Kafka 端口配置

单机搭建 Kafka 集群时,为了解决端口冲突的问题,还需要指定 Kafka 监听的端口,必须将下述各个配置文件里的端口都更改掉。

1
2
3
4
# vim config/server.properties

# 默认端口号
listeners=PLAINTEXT://:9092
1
2
3
4
# vim config/connect-standalone.properties

# 单机环境的端口号
bootstrap.servers=localhost:9092
1
2
3
4
# vim config/connect-distributed.properties

# 集群环境的端口号
bootstrap.servers=localhost:9092
1
2
3
4
# vim config/producer.properties

# 发布端的端口号
bootstrap.servers=localhost:9092
1
2
3
4
# vim config/consumer.properties

# 消费端的端口号
bootstrap.servers=localhost:9092
  • Kafka 创建多个节点

拷贝两份上面已经配置好的 Kafka 节点一安装目录,以此作为集群另外两个节点的安装文件,例如 kafka-node2kafka-node3。安装目录拷贝完成后,还需要为另外两个集群节点按照以下步骤更改对应的内容:

第一步:更改 server.properties 配置文件里的 broker.idlog.dirs
第二步:更改 Kafka 监听的端口,包括更改上述的 server.propertiesconnect-standalone.propertiesconnect-distributed.propertiesproducer.propertiesconsumer.properties 配置文件

上述两个步骤完成后,节点二和节点三的核心配置如下:

1
2
3
4
5
6
# 节点二的核心配置
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/kafka-cluster/kafka-node2/datas

bootstrap.servers=localhost:9093
1
2
3
4
5
6
# 节点三的核心配置
broker.id=3
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/kafka-cluster/kafka-node3/datas

bootstrap.servers=localhost:9094

集群管理

  • 集群启动

注意

启动 Kafka 集群之前,必须确保 Zookeeper 集群已经启动成功,这是因为本文搭建的 Kafka 集群依赖于 Zookeeper 集群。

1
2
3
4
5
6
7
8
9
# 后台启动
# /usr/local/kafka-cluster/kafka-node1/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node1/config/server.properties
# /usr/local/kafka-cluster/kafka-node2/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node2/config/server.properties
# /usr/local/kafka-cluster/kafka-node3/bin/kafka-server-start.sh -daemon /usr/local/kafka-cluster/kafka-node3/config/server.properties

# 或者前台启动(可直接查看启动时输出的日志信息)
# /usr/local/kafka-cluster/kafka-node1/bin/kafka-server-start.sh /usr/local/kafka-cluster/kafka-node1/config/server.properties
# /usr/local/kafka-cluster/kafka-node2/bin/kafka-server-start.sh /usr/local/kafka-cluster/kafka-node2/config/server.properties
# /usr/local/kafka-cluster/kafka-node3/bin/kafka-server-start.sh /usr/local/kafka-cluster/kafka-node3/config/server.properties
  • 查看状态

集群启动后,可以使用以下命令查看集群的运行状态。如果发现集群启动失败,则可以使用前台的方式再次启动集群,然后根据终端输出的错误日志信息来定位问题。

1
2
3
4
5
6
7
8
9
10
# 查看端口占用情况
# netstat -nplt | grep 9092
# netstat -nplt | grep 9093
# netstat -nplt | grep 9094

# 查看Kafka进程
# ps -aux | grep kafka

# 查看Java进程
# jps -l
  • 集群关闭

注意

关闭 Kafka 集群时,一定要等 Kafka 所有节点进程全部关闭后再关闭 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群的相关信息,Zookeeper 集群一旦先关闭,Kafka 集群就没有办法再获取关闭进程的信息,此时只能手动强制杀死 Kafka 进程。

1
2
3
4
5
# 在单机搭建集群的环境下,任意执行以下一个命令都会关闭所有集群节点(这跟 Shell 脚本的代码有关),如果希望只单独关闭某个节点,建议使用 kill 命令

# /usr/local/kafka-cluster/kafka-node1/bin/kafka-server-stop.sh stop
# /usr/local/kafka-cluster/kafka-node2/bin/kafka-server-stop.sh stop
# /usr/local/kafka-cluster/kafka-node3/bin/kafka-server-stop.sh stop
  • 清空数据

若希望清空 Kafka 集群的数据,则可以按照以下步骤操作(清空数据的操作不可恢复,生产环境下慎用)。

第一步:关闭 Kafka 集群
第二步:连接 Zookeeper 集群,然后删除 /kafka 目录
第三步:删除 Kafka 各个集群节点安装目录下的 datas 目录
第四步:重启 Kafka 集群

集群测试

  • 进入任意节点的安装目录下的 bin 目录
1
2
# 进入安装目录
# cd /usr/local/kafka-cluster/kafka-node1/bin
  • 创建主题
1
2
# 创建主题
# ./kafka-topics.sh --bootstrap-server 192.168.1.1:9092 --create --partitions 1 --replication-factor 3 --topic test
  • 查看主题列表
1
2
# 查看主题列表
# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
  • 查看主题详细信息
1
2
# 查看主题详细信息
# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe
  • 启动控制台消费者
1
2
# 启动消费者
# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
  • 启动控制台生产者
1
2
# 启动生产者
# ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
  • 在生产者的控制台手动输入 hello kafka,消费者就可以消费到生产者的消息,并在控制台输出 hello kafka,这表示消费者成功消费了生产者发送的消息!

Kafka 集群调优

调整 Kafka 的堆内存大小

若希望调整 Kafka 的堆内存大小,可以找到各个 Kafka 集群节点的服务启动脚本文件 bin/kafka-server-start.sh,然后在脚本文件中修改如下参数值:

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
fi

参考博客