我是靠谱客的博主 含糊小笼包,最近开发中收集的这篇文章主要介绍Kafka基础篇Docker安装KafkaLinux安装Kafka集群安装命令行操作监控平台搭建集成SpringBoot问题,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • Docker安装Kafka
    • 下载镜像
    • 安装Zookeeper
    • 安装Kafka
    • 测试
  • Linux安装Kafka
    • 安装Zookeeper
    • 安装Kafka
    • 测试
  • 集群安装
  • 命令行操作
    • Kafka命令行操作
    • 生产消费者命令行操作
  • 监控平台搭建
    • 安装mysql
    • 安装监控
      • JAVA_HOME 配置
      • 解压缩及配置
      • 添加环境变量
    • 启动测试
  • 集成SpringBoot
    • pom依赖
    • 生产者
    • 消费者
    • 测试
      • 请求
      • 控制台输出
  • 问题

课程资料
链接:https://pan.baidu.com/s/14ziQH62MeYmM8N6JsH5RcA
提取码:yyds

Docker安装Kafka

下载镜像

docker load -i kafka-zookeeper.tar #解压
docker network create kafka-zk  #创建网络!

安装Zookeeper

docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2  --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime  --network=kafka-zk zookeeper:v1.0

docker logs zookeeper #查看zookeeper日志
docker exec -it zookeeper /bin/bash #进入容器
cd bin
./zkCli.sh #连接zk

安装Kafka

docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.111.101:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime  --network=kafka-zk kafka:v1.0

参数说明:

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.17.0.2:2181/kafka根目录下新建一个kafka目录,来管理kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.111.101:9092 改成本地的ip,让外界可以访问!

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

注意:KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092当前主机IP或地址!!!!(重点:如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

测试

docker exec -it kafka /bin/bash #进入容器
cd /opt/kafka/bin
#--bootstrap-server 127.0.0.1:9092与上面的KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092息息相关!!!
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --create --topic first # 创建topic
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --list
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --describe --topic first

在这里插入图片描述

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first  # 生产者
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first  # 消费者监听

在这里插入图片描述
在这里插入图片描述

Linux安装Kafka

安装Zookeeper

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz  -C /opt/module/ # 解压到指定目录
cd /opt/module/
mv apache-zookeeper-3.5.7-bin/ zookeeper

cd conf/
cp zoo_sample.cfg  zoo.cfg # 必须要有一份zoo.cfg!!!
mkdir /opt/module/zookeeper/data #存放数据的目录
vim zoo.cfg
#修改如下内容:
dataDir=/opt/module/zookeeper/data

在这里插入图片描述

./zkServer.sh  --config  ../conf start #启动
./zkCli.sh 	#连接zookeeper服务端

在这里插入图片描述

在这里插入图片描述

安装Kafka

tar -zxvf kafka_2.12-3.3.1.tgz -C /opt/module/
cd /opt/module/ 
mv kafka_2.12-3.3.1/ kafka # 更名为kafka
cd /opt/module/kafka/config #进目录
vim server.properties  # 进入到/opt/module/kafka 目录,修改配置文件

在这里插入图片描述

测试

./kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --create --topic first #创建主题
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --list 		#查看所有主题
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --describe --topic first

在这里插入图片描述

./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic=first # 生产者
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic=first #消费者

在这里插入图片描述
在这里插入图片描述

集群安装

参照普通安装,注意修改broker_id,保证zookeeper的地址相同即可!!

命令行操作

Kafka命令行操作

在这里插入图片描述

./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --list #查看所有的topic
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --create --topic second #创建topic
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --describe --topic second #查看主题详情
./kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --delete --topic second #删除主题
#注意 设置分区,相当于数据分片,把大的拆成小的------replication副本数必须大于可用的broker!!!

在这里插入图片描述

注意:

  1. 分区数只能增加,不能减少!
  2. 必须在创建topic时指定副本数!

生产消费者命令行操作

生产者

在这里插入图片描述
在这里插入图片描述

./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic=first # 生产者

消费者

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --from-beginning --topic=first #消费者

监控平台搭建

监控工具需要MySQL作为持久化手段

安装mysql

my.cnf文件

[client]
default_character_set=utf8mb4
[mysqld]
collation_server = utf8mb4_general_ci
character_set_server = utf8mb4

创建容器

#  my.cnf配置
mkdir -p /app/mysql/conf
cd /app/mysql/conf
vim my.cnf #主机的my.cnf
docker run -d  -p 3306:3306 --privileged=true  -v  /app/mysql/log:/var/log/mysql   -v  /app/mysql/data:/var/lib/mysql   -v  /app/mysql/conf:/etc/mysql/conf.d   -e MYSQL_ROOT_PASSWORD=123456  --name=mysql   mysql:8.0

#查看是否启动
docker ps 

记得重启加载配置!

docker restart mysql  # 重启容器加载配置!

docker exec -it mysql /bin/bash
mysql -uroot -p123456
create database ke; #用于存储监控数据!

安装监控

JAVA_HOME 配置

vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin

然后,我们使用source /etc/profile使配置立即生效。

解压缩及配置

tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 monitor
cd monitor/
cd conf/
vi system-config.properties

system-config.properties

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
############Zookeeper的集群配置!!!##############
efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=127.0.0.1:2181/kafka
# 多个以逗号隔开 hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka
# 注意要设置/kafka,为kafka的配置管理目录!!!

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=16

######################################
# 访问的端口号
######################################
efak.webui.port=8048

######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=zk

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#efak.driver=org.sqlite.JDBC
#efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#efak.username=root
#efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
########默认以mysql作为存储#########
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

添加环境变量

sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK
export KE_HOME=/app/monitor
export PATH=$PATH:$KE_HOME/bin

source /etc/profile立即生效!!!

启动测试

启动前应先启动Zk、Kafka!!!

ke.sh start
# ke.sh cluster restart 分布式启动!
# ke.sh stop 	#停止
# ke.sh restart #重启

启动报错,排查是否已经启动过kill掉相关进程!!!

在这里插入图片描述

集成SpringBoot

pom依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>            

生产者

yaml

# 应用名称
spring.application.name=Producer
server.port=8081
# =========生产者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=192.168.111.101:9092

# 指定 key 和 value 的序列化器!!
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

代码


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Producer {

    // Kafka 模板用来向 kafka 发送数据
    @Autowired
    KafkaTemplate<String, String> kafka;

    @RequestMapping("/hello")
    public String data(String msg) {
        kafka.send("first", msg);
        return "ok";
    }
}

消费者

yaml

# 应用名称
spring.application.name=Consumer


# 指定 kafka 的地址
spring.kafka.bootstrap-servers=192.168.111.101:9092

#指定 key 和 value 的反序列化器!!

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的 group_id
spring.kafka.consumer.group-id=consumer1

代码

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class Consumer {
    //  指定要监听的 topic
    @KafkaListener(topics = "first")
    public void consumeTopic(String msg) { //  参数 :  收到的 value
        System.out.println(" 收到的信息: " + msg);
    }
}


测试

启动Producer和Consumer服务进行测试

请求

在这里插入图片描述

控制台输出

在这里插入图片描述

问题

最后

以上就是含糊小笼包为你收集整理的Kafka基础篇Docker安装KafkaLinux安装Kafka集群安装命令行操作监控平台搭建集成SpringBoot问题的全部内容,希望文章能够帮你解决Kafka基础篇Docker安装KafkaLinux安装Kafka集群安装命令行操作监控平台搭建集成SpringBoot问题所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(44)

评论列表共有 0 条评论

立即
投稿
返回
顶部