工程师计划 大数据 kafka集群部署

kafka集群部署

kafka(9092)

1.上传文件

tar -xvf kafka_2.11-1.0.2.tgz -C /opt
cd /opt
mv kafka_2.11-1.0.2 kafka

2.创建logs文件

cd /opt/kafka
mkdir logs

3.修改配置文件

vim /opt/kafka/config/server.properties
...
#broker的全局唯一编号,各个节点的不能重复
broker.id=1
#删除topic功能使能
delete.topic.enable=true
#监听地址,集群中各个节点自己的IP地址
listeners=PLAINTEXT://192.168.94.10:9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=192.168.94.10:2181,192.168.94.11:2181,192.168.94.12:2181

4.配置环境变量

vi /etc/profile
...
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
:wq
source /etc/profile
#分发文件 分发后需要执行source命令生效
scp -r /etc/profile 192.168.94.11:/etc
scp -r /etc/profile 192.168.94.12:/etc

5.把修改好的安装包分发到各个节点

scp -r /opt/kafka root@192.168.94.11:/opt
scp -r /opt/kafka root@192.168.94.12:/opt

6.修改各个节点的配置文件

#节点11
vim /opt/kafka/config/server.properties
#broker的全局唯一编号,各个节点的不能重复
broker.id=2
#删除topic功能使能
delete.topic.enable=true
#监听地址,集群中各个节点自己的IP地址
listeners=PLAINTEXT://192.168.94.11:9092

#节点12
vim /opt/kafka/config/server.properties
#broker的全局唯一编号,各个节点的不能重复
broker.id=3
#删除topic功能使能
delete.topic.enable=true
#监听地址,集群中各个节点自己的IP地址
listeners=PLAINTEXT://192.168.94.12:9092

7.编写批量启动脚本

vim /opt/kafka-all.sh
#!/bin/bash
case $1 in
"start"){
for i in 192.168.94.10 192.168.94.11 192.168.94.12
do
echo --------------------------kafka $i 启动 -------------------
ssh $i "source /etc/profile && /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties"
if [ $? -eq 0 ]; then
    echo -e "\033[32m 启动成功 \033[0m"
else
    echo -e "\033[31m 启动失败 \033[0m"
fi
done
}
;;
"stop"){
for i in 192.168.94.10 192.168.94.11 192.168.94.12
do
echo --------------------------kafka $i 关闭 -------------------
ssh $i "/opt/kafka/bin/kafka-server-stop.sh"
if [ $? -eq 0 ]; then
    echo -e "\033[32m 关闭成功 \033[0m"
else
    echo -e "\033[31m 关闭失败 \033[0m"
fi
done
}
;;
""){
echo "参数选项 start|stop"
}
;;
esac
:wq
#给文件执行权限
chmod +x kafka-all.sh
#启动命令
/opt/kafka-all.sh start
#停止命令
/opt/kafka-all.sh stop

8.kafka命令

#创建topic
kafka-topics.sh --zookeeper 192.168.94.10:2181 --create --replication-factor 1 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数
#查看topic
kafka-topics.sh --zookeeper 192.168.94.10:2181 --list
#发送消息
kafka-console-producer.sh --broker-list 192.168.94.10:9092 --topic first
#消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.94.11:9092 --from-beginning --topic first
kafka集群部署
© 2021 京ICP备2021027871号-1
联系我们

联系我们

18513870113

在线咨询: QQ交谈

邮箱: 1140136143@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部