概述
kafka
1.下载
链接:https://pan.baidu.com/s/1XVFekzTc9W4nUkiWQnIi4A
提取码:roik
2.安装
2.1解压
tar -zxvf kafka_2.12-2.0.0.tgz
mv kafka_2.12-2.0.0 kafka
2.2 配置环境变量
vi /etc/profile
内容如下:
export KAFKA_HOME=/usr/local/software/kafka
export PATH=$PATH:$KAFKA_HOME/bin
配置文件生效:
source /etc/profile
2.3 修改配置文件
-
创建日志目录
cd /usr/local/software/kafka mkdir kafka-logs chmod 777 kafka-logs
-
.进入
/usr/local/software/kafka/config
目录,修改配置文件server.properties
vi server.properties
-
内容如下:
broker.id=1 # 唯一ID同一集群下broker.id不能重复 listeners=PLAINTEXT://10.202.80.196:9092 #ލ监听地址 log.dirs=/usr/local/software/kafka/kafka-logs # 日志目录 log.retention.hours=168 # kafka数据保留时间单位为hour 默认168小时 即7天 log.retention.bytes=1073741824 #kafka数据量最大值,超出范围自动清理,和log.retention.hours配置使用,注意最大值设定不能超过磁盘大小 zookeeper.connect=10.202.80.196:2181 #zookeeper连接的ip以及port,多个以逗号分割
-
启动kafka
zkServer.sh start #先开启zookeeper服务 kafka-server-start.sh config/server.properties
查看进程:
-
关闭kafka
kafka-server-stop.sh
3.命令
-
创建主题
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test01 #主题名 test01 3个分区
-
列出所有主题:
kafka-topics.sh --list --zookeeper localhost:2181
-
查看topic的相关信息
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test01
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s8585Ggd-1639724429219)(C:UsersZYZAppDataRoamingTyporatypora-user-imagesimage-20211201171237876.png)]
-
简单的发布订阅
- 要先开启zookeeper服务和kafka
kafka-console-producer.sh --broker-list 10.202.80.196:9092 --topic test01 #消息发布 生产者 kafka-console-consumer.sh --bootstrap-server 10.202.80.196:9092 --topic test01 #监控消息 消费者
两个终端测试
注意:测试完后一定要记得关闭kafka和zookeeper 不然会出问题!!
如果zookeeper服务起不来 ,进到/data/
目录下 把zookeeper_servevr.pid的文件删除,在启动
回顾以及更正之前学习的内容,学习了DAG有向无环图以及了解了Srorm并行度的一些基本概念
4.集群搭建
4.1基本部署
-
进入
/usr/local/software/kafka/config
修改配置文件
vi server.properties
内容如下:
broker.id=0 port=9092 host.name=10.202.80.109 zookeeper.connect=10.202.80.109:2181,10.202.80.110:2181,10.202.80.196:2181
-
分发下去:
scp -r kafka/ root@10.202.80.110:/usr/local/software/kafka scp -r kafka/ root@10.202.80.196:/usr/local/software/kafka
配置文件中需要修改以下内容:
broker.id=1 host.name=10.202.80.110 broker.id=2 host.name=10.202.80.196
-
启动kafka
bin/kafka-server-start.sh config/server.properties
**注意:**三台都要启动!!!
4.2 遇到的问题
-
描述:只有broker.id=1的那台机器可以正常启动
解决方法:修改
broker.id
的值,保持和server.properties
文件中设置的一致;(不一致的都要修改,即启动不了的就去查看是否不一致)-
[root@localhost kafka]# cd kafka-logs/ [root@localhost kafka-logs]# vi meta.properties
-
-
启动成功后,检查:
jps 20348 Jps 4233 QuorumPeerMain 18991 Kafk
-
描述:
kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING at kafka.zookeeper.ZooKeeperClient. a n o n f u n anonfun anonfunwaitUntilConnected 3 ( Z o o K e e p e r C l i e n t . s c a l a : 230 ) a t s c a l a . r u n t i m e . j a v a 8. J F u n c t i o n 0 3(ZooKeeperClient.scala:230) at scala.runtime.java8.JFunction0 3(ZooKeeperClient.scala:230)atscala.runtime.java8.JFunction0mcV$sp.apply(JFunc
查看防火墙是否关闭
4.3 查看zk
-
创建topic
bin/kafka-topics.sh --list --zookeeper localhost:2181 #列出所有主题 __consumer_offsets topic1 liechu #创建主题 bin/kafka-topics.sh --create --zookeeper 10.202.80.109:2181 --replication-facto r 1 --partitions 3 --topic test0 Created topic "test0". #解释 --replication-factor 2 #复制两份 --partitions 3 #创建3个分区 --topic #主题为test0
查看topic:
-
登录zk查看zk目录情况:
[root@localhost software]# cd zookeeper/ [root@localhost zookeeper]# bin/zkCli.sh -server 10.202.80.109:2181 #默认是不用加’-server‘参数的因为我们修改了他的端口 [zk: 10.202.80.109:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] #上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的 [zk: 10.202.80.109:2181(CONNECTED) 1] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.202.80.109:9092"],"jmx_port":-1,"host":"10.202.80.109","timestamp":"1638424958139","port":9092,"version":4} [zk: 10.202.80.109:2181(CONNECTED) 2]
5 监控搭建
5.1安装
-
解压软件
unzip kafka-manager-1.0-SNAPSHOT.zip
-
修改配置文件
vi conf/application.conf
内容如下:
kafka-manager.zkhosts="10.202.80.109:2181,10.202.80.110:2181,10.202.80.196:2181"
-
启动:
bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007 & #指定端口
访问:http://10.202.80.109:8007/
5.2 使用
-
添加集群
-
设置信息
-
查看集群信息
-
查看主题信息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3gCSdxbU-1639724429220)(C:UsersZYZAppDataRoamingTyporatypora-user-imagesimage-20211202163516026.png)]
-
broker信息
5.3 关闭
由于没有特定的关闭命令,所以直接杀死进程即可
kill -9 50718
参考资料:
https://blog.csdn.net/u011089412/article/details/87895652
6 守护线程
6.1简述
Linux中"守护进程"(daemon)就是一直在后台运行的进程(daemon),不会因为会话关闭而停止
启动kafka的守护线程:
bin/kafka-server-start.sh config/server.properties &
disown
- 只要在命令的尾部加上符号
&
,启动的进程就会成为"后台任务"。如果要让正在运行的"前台任务"变为"后台任务",可以先按ctrl + z
,然后执行bg
命令(让最近一个暂停的"后台任务"继续执行)
6.3 SIGHUP
Linux系统是这样设计的:
- 用户准备退出 session
- 系统向该 session 发出
SIGHUP
信号- session 将
SIGHUP
信号发给所有子进程- 子进程收到
SIGHUP
信号后,自动退出
这由 Shell 的huponexit
参数决定的。
shopt | grep huponexit
执行上面的命令,就会看到huponexit
参数的值:
大多数Linux系统,这个参数默认关闭(off
)。因此,session 退出的时候,不会把SIGHUP
信号发给"后台任务"。所以,一般来说,"后台任务"不会随着 session 一起退出。
6.3 disown
通过"后台任务"启动"守护进程"并不保险,因为有的系统的huponexit
参数可能是打开的(on
)。
更保险的方法是使用disown
命令。它可以将指定任务从"后台任务"列表(jobs
命令的返回结果)之中移除。一个"后台任务"只要不在这个列表之中,session 就肯定不会向它发出SIGHUP
信号。
最后
以上就是孤独世界为你收集整理的【kafka集群搭建+监控+启动守护线程 超详细】kafka的全部内容,希望文章能够帮你解决【kafka集群搭建+监控+启动守护线程 超详细】kafka所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复