概述
文章目录
- Docker安装Kafka
- 下载镜像
- 安装Zookeeper
- 安装Kafka
- 测试
- Linux安装Kafka
- 安装Zookeeper
- 安装Kafka
- 测试
- 集群安装
- 命令行操作
- Kafka命令行操作
- 生产消费者命令行操作
- 监控平台搭建
- 安装mysql
- 安装监控
- JAVA_HOME 配置
- 解压缩及配置
- 添加环境变量
- 启动测试
- 集成SpringBoot
- pom依赖
- 生产者
- 消费者
- 测试
- 请求
- 控制台输出
- 问题
课程资料
链接:https://pan.baidu.com/s/14ziQH62MeYmM8N6JsH5RcA
提取码:yyds
Docker安装Kafka
下载镜像
docker load -i kafka-zookeeper.tar #解压
docker network create kafka-zk #创建网络!
安装Zookeeper
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime --network=kafka-zk zookeeper:v1.0
docker logs zookeeper #查看zookeeper日志
docker exec -it zookeeper /bin/bash #进入容器
cd bin
./zkCli.sh #连接zk
安装Kafka
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.111.101:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime --network=kafka-zk kafka:v1.0
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.17.0.2:2181/kafka
在根目录下新建一个kafka
目录,来管理kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.111.101:9092
改成本地的ip,让外界可以访问!
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
注意:
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
当前主机IP或地址!!!!(重点:如果是服务器部署则配服务器IP或域名
否则客户端监听消息会报地址错误)
测试
docker exec -it kafka /bin/bash #进入容器
cd /opt/kafka/bin
#--bootstrap-server 127.0.0.1:9092与上面的KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092息息相关!!!
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic first # 创建topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first # 生产者
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first # 消费者监听
Linux安装Kafka
安装Zookeeper
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/ # 解压到指定目录
cd /opt/module/
mv apache-zookeeper-3.5.7-bin/ zookeeper
cd conf/
cp zoo_sample.cfg zoo.cfg # 必须要有一份zoo.cfg!!!
mkdir /opt/module/zookeeper/data #存放数据的目录
vim zoo.cfg
#修改如下内容:
dataDir=/opt/module/zookeeper/data
./zkServer.sh --config ../conf start #启动
./zkCli.sh #连接zookeeper服务端
安装Kafka
tar -zxvf kafka_2.12-3.3.1.tgz -C /opt/module/
cd /opt/module/
mv kafka_2.12-3.3.1/ kafka # 更名为kafka
cd /opt/module/kafka/config #进目录
vim server.properties # 进入到/opt/module/kafka 目录,修改配置文件
测试
./kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic first #创建主题
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list #查看所有主题
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic=first # 生产者
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic=first #消费者
集群安装
参照
普通安装
,注意修改broker_id
,保证zookeeper的地址相同即可!!
命令行操作
Kafka命令行操作
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list #查看所有的topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic second #创建topic
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic second #查看主题详情
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic second #删除主题
#注意 设置分区,相当于数据分片,把大的拆成小的------replication副本数必须大于可用的broker!!!
注意:
- 分区数
只能增加,不能减少!
- 必须在
创建topic时指定副本数!
生产消费者命令行操作
生产者
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic=first # 生产者
消费者
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic=first #消费者
监控平台搭建
监控工具
需要MySQL作为持久化手段
安装mysql
my.cnf
文件
[client]
default_character_set=utf8mb4
[mysqld]
collation_server = utf8mb4_general_ci
character_set_server = utf8mb4
创建容器
# my.cnf配置
mkdir -p /app/mysql/conf
cd /app/mysql/conf
vim my.cnf #主机的my.cnf
docker run -d -p 3306:3306 --privileged=true -v /app/mysql/log:/var/log/mysql -v /app/mysql/data:/var/lib/mysql -v /app/mysql/conf:/etc/mysql/conf.d -e MYSQL_ROOT_PASSWORD=123456 --name=mysql mysql:8.0
#查看是否启动
docker ps
记得重启加载配置!
docker restart mysql # 重启容器加载配置!
docker exec -it mysql /bin/bash
mysql -uroot -p123456
create database ke; #用于存储监控数据!
安装监控
JAVA_HOME 配置
vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
然后,我们使用
source /etc/profile
使配置立即生效。
解压缩及配置
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 monitor
cd monitor/
cd conf/
vi system-config.properties
system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
############Zookeeper的集群配置!!!##############
efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=127.0.0.1:2181/kafka
# 多个以逗号隔开 hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka
# 注意要设置/kafka,为kafka的配置管理目录!!!
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.efak.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16
######################################
# 访问的端口号
######################################
efak.webui.port=8048
######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk
######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15
######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=
######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=
######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=org.sqlite.JDBC
#efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#efak.username=root
#efak.password=www.kafka-eagle.org
######################################
# kafka mysql jdbc driver address
########默认以mysql作为存储#########
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
添加环境变量
sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK
export KE_HOME=/app/monitor
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
立即生效!!!
启动测试
启动前应
先启动Zk、Kafka!!!
ke.sh start
# ke.sh cluster restart 分布式启动!
# ke.sh stop #停止
# ke.sh restart #重启
启动
报错
,排查是否已经启动过,kill掉相关进程!!!
集成SpringBoot
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
生产者
yaml
# 应用名称
spring.application.name=Producer
server.port=8081
# =========生产者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=192.168.111.101:9092
# 指定 key 和 value 的序列化器!!
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Producer {
// Kafka 模板用来向 kafka 发送数据
@Autowired
KafkaTemplate<String, String> kafka;
@RequestMapping("/hello")
public String data(String msg) {
kafka.send("first", msg);
return "ok";
}
}
消费者
yaml
# 应用名称
spring.application.name=Consumer
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=192.168.111.101:9092
#指定 key 和 value 的反序列化器!!
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=consumer1
代码
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class Consumer {
// 指定要监听的 topic
@KafkaListener(topics = "first")
public void consumeTopic(String msg) { // 参数 : 收到的 value
System.out.println(" 收到的信息: " + msg);
}
}
测试
启动
Producer和Consumer
服务进行测试
请求
控制台输出
问题
最后
以上就是含糊小笼包为你收集整理的Kafka基础篇Docker安装KafkaLinux安装Kafka集群安装命令行操作监控平台搭建集成SpringBoot问题的全部内容,希望文章能够帮你解决Kafka基础篇Docker安装KafkaLinux安装Kafka集群安装命令行操作监控平台搭建集成SpringBoot问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复