我是靠谱客的博主 孤独世界,最近开发中收集的这篇文章主要介绍【kafka集群搭建+监控+启动守护线程 超详细】kafka,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

kafka

1.下载

链接:https://pan.baidu.com/s/1XVFekzTc9W4nUkiWQnIi4A
提取码:roik

2.安装

2.1解压

tar -zxvf kafka_2.12-2.0.0.tgz 
mv kafka_2.12-2.0.0 kafka

2.2 配置环境变量

vi /etc/profile

内容如下:

export KAFKA_HOME=/usr/local/software/kafka
export PATH=$PATH:$KAFKA_HOME/bin

配置文件生效:

source /etc/profile

2.3 修改配置文件

  1. 创建日志目录

    cd /usr/local/software/kafka
    mkdir kafka-logs
    chmod 777 kafka-logs
    
  2. .进入/usr/local/software/kafka/config目录,修改配置文件server.properties

    vi server.properties
    
  3. 内容如下:

    broker.id=1 # 唯一ID同一集群下broker.id不能重复
    listeners=PLAINTEXT://10.202.80.196:9092 #ލ监听地址
    log.dirs=/usr/local/software/kafka/kafka-logs # 日志目录
    log.retention.hours=168 # kafka数据保留时间单位为hour 默认168小时 即7天
    log.retention.bytes=1073741824 #kafka数据量最大值,超出范围自动清理,和log.retention.hours配置使用,注意最大值设定不能超过磁盘大小
    zookeeper.connect=10.202.80.196:2181 #zookeeper连接的ip以及port,多个以逗号分割
    
  4. 启动kafka

    zkServer.sh start #先开启zookeeper服务
    kafka-server-start.sh config/server.properties
    

    查看进程:

    image-20211201164825645
  5. 关闭kafka

    kafka-server-stop.sh
    

3.命令

  • 创建主题

    kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test01
    #主题名 test01 3个分区
    
  • 列出所有主题:

    kafka-topics.sh --list --zookeeper localhost:2181 
    
  • 查看topic的相关信息

    kafka-topics.sh --describe --zookeeper localhost:2181 --topic test01
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s8585Ggd-1639724429219)(C:UsersZYZAppDataRoamingTyporatypora-user-imagesimage-20211201171237876.png)]

  • 简单的发布订阅

    1. 要先开启zookeeper服务和kafka
    kafka-console-producer.sh --broker-list 10.202.80.196:9092 --topic test01  #消息发布 生产者
    kafka-console-consumer.sh --bootstrap-server 10.202.80.196:9092 --topic test01 #监控消息 消费者
    
    

    两个终端测试

注意:测试完后一定要记得关闭kafka和zookeeper 不然会出问题!!

如果zookeeper服务起不来 ,进到/data/目录下 把zookeeper_servevr.pid的文件删除,在启动

回顾以及更正之前学习的内容,学习了DAG有向无环图以及了解了Srorm并行度的一些基本概念

4.集群搭建

4.1基本部署

  1. 进入/usr/local/software/kafka/config

    修改配置文件

    vi server.properties
    

    内容如下:

    broker.id=0
    port=9092
    host.name=10.202.80.109
    zookeeper.connect=10.202.80.109:2181,10.202.80.110:2181,10.202.80.196:2181
    
  2. 分发下去:

    scp -r kafka/ root@10.202.80.110:/usr/local/software/kafka
    
    scp -r kafka/ root@10.202.80.196:/usr/local/software/kafka
    

    配置文件中需要修改以下内容:

    broker.id=1
    host.name=10.202.80.110
    
    broker.id=2
    host.name=10.202.80.196
    
  3. 启动kafka

    bin/kafka-server-start.sh config/server.properties 
    

    **注意:**三台都要启动!!!

4.2 遇到的问题

  1. 描述:只有broker.id=1的那台机器可以正常启动

    解决方法:修改broker.id的值,保持和server.properties文件中设置的一致;(不一致的都要修改,即启动不了的就去查看是否不一致)

    • [root@localhost kafka]# cd kafka-logs/
      [root@localhost kafka-logs]# vi meta.properties
      
      image-20211202140152173
  2. 启动成功后,检查:

    jps
    20348 Jps
    4233 QuorumPeerMain
    18991 Kafk
    
  3. 描述

    kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING at kafka.zookeeper.ZooKeeperClient. a n o n f u n anonfun anonfunwaitUntilConnected 3 ( Z o o K e e p e r C l i e n t . s c a l a : 230 ) a t s c a l a . r u n t i m e . j a v a 8. J F u n c t i o n 0 3(ZooKeeperClient.scala:230) at scala.runtime.java8.JFunction0 3(ZooKeeperClient.scala:230)atscala.runtime.java8.JFunction0mcV$sp.apply(JFunc

    查看防火墙是否关闭

4.3 查看zk

  1. 创建topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181 #列出所有主题
    __consumer_offsets
    topic1
    liechu
    #创建主题
     bin/kafka-topics.sh --create --zookeeper 10.202.80.109:2181 --replication-facto
    r 1 --partitions 3 --topic test0
    Created topic "test0".
    #解释
    --replication-factor 2   #复制两份
    --partitions 3 #创建3个分区
    --topic #主题为test0
    

    查看topic:

    image-20211202141148521
  2. 登录zk查看zk目录情况:

    [root@localhost software]# cd zookeeper/
    [root@localhost zookeeper]# bin/zkCli.sh -server 10.202.80.109:2181
    #默认是不用加’-server‘参数的因为我们修改了他的端口
    [zk: 10.202.80.109:2181(CONNECTED) 0] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
    #上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的
    [zk: 10.202.80.109:2181(CONNECTED) 1] get /brokers/ids/0
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.202.80.109:9092"],"jmx_port":-1,"host":"10.202.80.109","timestamp":"1638424958139","port":9092,"version":4}
    [zk: 10.202.80.109:2181(CONNECTED) 2] 
    
    

5 监控搭建

5.1安装

  1. 解压软件

    unzip kafka-manager-1.0-SNAPSHOT.zip
    
  2. 修改配置文件

    vi conf/application.conf
    

    内容如下:

    kafka-manager.zkhosts="10.202.80.109:2181,10.202.80.110:2181,10.202.80.196:2181"
    
  3. 启动:

    bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007 &
    #指定端口
    

    访问:http://10.202.80.109:8007/

    image-20211202162750311

5.2 使用

  1. 添加集群

    image-20211202162915313
  2. 设置信息

    image-20211202163051752
  3. 查看集群信息

    image-20211202163146389 image-20211202163255758
  4. 查看主题信息

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3gCSdxbU-1639724429220)(C:UsersZYZAppDataRoamingTyporatypora-user-imagesimage-20211202163516026.png)]

  5. broker信息

    image-20211202163611168

5.3 关闭

由于没有特定的关闭命令,所以直接杀死进程即可

image-20211202163920583
kill -9 50718

参考资料:

https://blog.csdn.net/u011089412/article/details/87895652

6 守护线程

6.1简述

Linux中"守护进程"(daemon)就是一直在后台运行的进程(daemon),不会因为会话关闭而停止

启动kafka的守护线程:

bin/kafka-server-start.sh config/server.properties &
disown
  • 只要在命令的尾部加上符号&,启动的进程就会成为"后台任务"。如果要让正在运行的"前台任务"变为"后台任务",可以先按ctrl + z,然后执行bg命令(让最近一个暂停的"后台任务"继续执行)

6.3 SIGHUP

Linux系统是这样设计的:

  1. 用户准备退出 session
  2. 系统向该 session 发出SIGHUP信号
  3. session 将SIGHUP信号发给所有子进程
  4. 子进程收到SIGHUP信号后,自动退出

这由 Shell 的huponexit参数决定的。

shopt | grep huponexit

执行上面的命令,就会看到huponexit参数的值:

img

大多数Linux系统,这个参数默认关闭(off)。因此,session 退出的时候,不会把SIGHUP信号发给"后台任务"。所以,一般来说,"后台任务"不会随着 session 一起退出。

6.3 disown

通过"后台任务"启动"守护进程"并不保险,因为有的系统的huponexit参数可能是打开的(on)。

更保险的方法是使用disown命令。它可以将指定任务从"后台任务"列表(jobs命令的返回结果)之中移除。一个"后台任务"只要不在这个列表之中,session 就肯定不会向它发出SIGHUP信号。

最后

以上就是孤独世界为你收集整理的【kafka集群搭建+监控+启动守护线程 超详细】kafka的全部内容,希望文章能够帮你解决【kafka集群搭建+监控+启动守护线程 超详细】kafka所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部