大纲
前言
本文适用于在 Centos/Debian/Ubuntu 等 Linux 发行版系统上,使用单机搭建 Kafka 集群。
Zookeeper 集群搭建
本文的 Kafka 集群搭建依赖于 Zookeeper,因此需要将 Zookeeper 单机集群提前搭建起来。值得一提的是,从 Kafka 2.8.0
版本开始,Kafka 自身实现了 Raft
分布式一致性机制,这意味着 Kafka 集群是可以脱离 ZooKeeper 独立运行的。
集群规划
节点 | IP 地址 | 端口 | 版本号 |
---|
Zookeeper 节点 1 | 127.0.0.1 | 2181 | 3.4.10 |
Zookeeper 节点 2 | 127.0.0.1 | 2182 | 3.4.10 |
Zookeeper 节点 3 | 127.0.0.1 | 2183 | 3.4.10 |
集群搭建
由于篇幅有限,Linux 单机搭建 Zookeeper 集群的内容这里不再累述,详细教程可看 这里。
Kafka 集群搭建
集群规划
节点 | IP 地址 | 端口 | 版本号 |
---|
Kafka 节点 1 | 127.0.0.1 | 9092 | 2.13-3.2.1 |
Kafka 节点 2 | 127.0.0.1 | 9093 | 2.13-3.2.1 |
Kafka 节点 3 | 127.0.0.1 | 9094 | 2.13-3.2.1 |
集群搭建
Kafka 下载地址
- (1) Kafka 的安装包可以从 官网 下载。
- (2) 以下载得到的压缩文件
kafka_2.13-3.2.1.tgz
为例,2.11
是 Scala 的版本号,3.2.1
是 Kafka 的版本号。 - (3) 值得一提的是,Kafka 的 Broker 组件是使用 Scala 开发的,而 Producer 组件和 Consumer 组件是使用 Java 开发的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
|
最关键的配置内容是 broker.id
、log.dirs
、zookeeper.connect
,其中的 zookeeper.connect
是 Zookeeper 连接地址,建议使用 /kafka
作为后缀,这样方便日后在 Zookeeper 里统一管理 Kafka 的数据。在开发与测试阶段,其他配置项可以根据业务需求适当调整具体的参数值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka-cluster/kafka-node1/datas
num.partitions=1
num.recovery.threads.per.data.dir=1
auto.create.topics.enable=false
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka
|
提示
上述部分配置默认是不存在于 Kafka 的配置文件 server.properties
中的,因此需要手动添加缺少的配置项,比如 default.replication.factor
。
单机搭建 Kafka 集群时,为了解决端口冲突的问题,还需要指定 Kafka 监听的端口,必须将下述各个配置文件里的端口都更改掉。
1 2 3 4
|
listeners=PLAINTEXT://:9092
|
1 2 3 4
|
bootstrap.servers=localhost:9092
|
1 2 3 4
|
bootstrap.servers=localhost:9092
|
1 2 3 4
|
bootstrap.servers=localhost:9092
|
1 2 3 4
|
bootstrap.servers=localhost:9092
|
拷贝两份上面已经配置好的 Kafka 节点一安装目录,以此作为集群另外两个节点的安装文件,例如 kafka-node2
、kafka-node3
。安装目录拷贝完成后,还需要为另外两个集群节点按照以下步骤更改对应的内容:
第一步:更改 server.properties
配置文件里的 broker.id
、log.dirs
第二步:更改 Kafka 监听的端口,包括更改上述的 server.properties
、connect-standalone.properties
、connect-distributed.properties
、producer.properties
、consumer.properties
配置文件
上述两个步骤完成后,节点二和节点三的核心配置如下:
1 2 3 4 5 6
| # 节点二的核心配置 broker.id=2 listeners=PLAINTEXT://:9093 log.dirs=/usr/local/kafka-cluster/kafka-node2/datas
bootstrap.servers=localhost:9093
|
1 2 3 4 5 6
| # 节点三的核心配置 broker.id=3 listeners=PLAINTEXT://:9094 log.dirs=/usr/local/kafka-cluster/kafka-node3/datas
bootstrap.servers=localhost:9094
|
集群管理
注意
启动 Kafka 集群之前,必须确保 Zookeeper 集群已经启动成功,这是因为本文搭建的 Kafka 集群依赖于 Zookeeper 集群。
集群启动后,可以使用以下命令查看集群的运行状态。如果发现集群启动失败,则可以使用前台的方式再次启动集群,然后根据终端输出的错误日志信息来定位问题。
注意
关闭 Kafka 集群时,一定要等 Kafka 所有节点进程全部关闭后再关闭 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群的相关信息,Zookeeper 集群一旦先关闭,Kafka 集群就没有办法再获取关闭进程的信息,此时只能手动强制杀死 Kafka 进程。
若希望清空 Kafka 集群的数据,则可以按照以下步骤操作(清空数据的操作不可恢复,生产环境下慎用)。
第一步:关闭 Kafka 集群
第二步:连接 Zookeeper 集群,然后删除 /kafka
目录
第三步:删除 Kafka 各个集群节点安装目录下的 datas
目录
第四步:重启 Kafka 集群
集群测试
- 在生产者的控制台手动输入
hello kafka
,消费者就可以消费到生产者的消息,并在控制台输出 hello kafka
,这表示消费者成功消费了生产者发送的消息!
Kafka 集群调优
调整 Kafka 的堆内存大小
若希望调整 Kafka 的堆内存大小,可以找到各个 Kafka 集群节点的服务启动脚本文件 bin/kafka-server-start.sh
,然后在脚本文件中修改如下参数值:
1 2 3
| if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G" fi
|
参考博客