我是靠谱客的博主 虚心含羞草,这篇文章主要介绍搭建网联云平台(二)——云服务器伪分布式kafka集群,现在分享给大家,希望可以做个参考。

伪分布式搭建方法

Hosts文件

由于只有一个服务器,为了为以后多个服务器做kafka集群打基础,先在但服务器上做了伪分布式,发现kafka配置文件中有许多坑,记录一下。

首先是对服务器上的/etc/hosts文件的配置,其实是ip地址与hostname的一些映射关系,默认一般是有:

复制代码
1
2
3
4
5
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost

理论上可以正常运行,集群的时候有需要的话要添加上其他几台服务器的 IP host#number,如123.123.123.123 kafka#1
如果在运行kafka有报错java.net.UnknownHostException,加入本机的IP加hostname

kafka集群配置文件

Apache官网
https://kafka.apache.org/downloads
下载kafka后直接在linux上解压即可用,打开config下的service.properties进行配置,建议先配置kafka中zookeeper相关:

复制代码
1
2
zookeeper.connect=ip:2181

ip处改为你的云服务器ip地址,将程序放在同一文档下,复制两份后继续配置
在这里插入图片描述
需要配置的项包括:

复制代码
1
2
3
4
broker.id=0#1#2 listeners=PLAINTEXT://:9092#9093#9094 advertised.listeners=PLAINTEXT://123.1.1.1:9092#9093#9094

注释后面的指集群其他两个文件的配置内容,需要注意的是listeners与advertised.listeners的配置方式不同,即便kafka的注释中提示可以写listeners = PLAINTEXT://your.host.name:9092,但是不能写成ip地址加端口的形式,因为要配置advertised.listeners端口对外开放,写ip会报错端口不能bind。
listeners和advertised.listeners一定都要写,自己测试的时候一直报错UnknownHostException和无法建立连接,就是这两个的配置有问题。

启动kafka伪集群

参考命令如下
启动zookeeper

复制代码
1
2
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

进入各个kafka#N中启动各个节点

复制代码
1
2
bin/kafka-server-start.sh -daemon config/server.properties

-daemon表示以守护线程后台运行,如果运行出错不会显示出来,所以要用jps查看是否启动成功,成功的话会有三个kakfa
成功的话会有三个kakfa
或者监视云服务器的内存占用是否增加,kafka默认在JVM中占用1G的内存,这也会导致部署伪分布式内存不足,可以编辑bin下的kafka-server-start.sh文件改为256M:

复制代码
1
2
3
4
5
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m" #"-Xmx1G -Xms1G" fi

关闭服务:

复制代码
1
2
bin/kafka-server-stop.sh

创建一个具有两个分区partition的主题test

复制代码
1
2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test

成功的话会发现两个partition分布在不同的broker上,用java做远程连接测试,指定向broker0(存有partition-0)发送消息但指定保存到partition-1上(位于broker1内),发现消息可以正确抵达并存储,说明伪分布式集群成功

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.example.demo; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; @SpringBootTest class DemoApplicationTests { @Autowired private KafkaTemplate tmp; @Test void contextLoads() { tmp.send("test",1,"key","data"); } }

最后

以上就是虚心含羞草最近收集整理的关于搭建网联云平台(二)——云服务器伪分布式kafka集群的全部内容,更多相关搭建网联云平台(二)——云服务器伪分布式kafka集群内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(83)

评论列表共有 0 条评论

立即
投稿
返回
顶部