概述
flume
flume依赖于java环境,安装flume前先安装JDK
安装单机版flume
-
解压
tar -zxvf 包名 -C 解压到指定目录
-
改名(解压后的目录名,可不改)
-
配置环境变量
vim /root/.bash_profile
export FLUME_HOME=指定安装路径
export PATH=$PATH:$FLUME_HOME/bin
-
使配置环境立即生效
source /root/.bash_profile
-
创建配置文件
vim 下载目录/flume/conf/duankou.conf(文件名自取)
监听端口
-
添加内容(监听端口)
# 定义三个flume工作组件 a1.sources=r1 # 数据来源 a1.sinks=k1 # 数据到哪去 a1.channels=c1 # 中间的缓冲频道(一般设置到内存中) a1.sources.r1.type=netcat # 监听网络端口 a1.sources.r1.bind=hadoop101 # 绑定主机(地址)名 a1.sources.r1.port=9999 # 监听端口号 a1.sinks.k1.type=logger # 以日志形式打印 a1.channels.c1.type=memory # 数据缓存在内容中 a1.channels.c1.capacity=1000 # 容量 a1.channels.c1.transactionCapacity=100 # 事务容量 # 将a1,r1,k1关联到c1 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
-
启动监听
flume-ng agent -c conf -f 解压目录/conf/duankou.conf --name a1 -Dflume.root.logger=INFO,console
-
新建连接,创建监听端口
# 在新建连接下 nc hadoop101 9999
监听文件
-
创建配置文件
vim 下载目录/flume/conf/wenjian.conf(文件名自取)
-
添加内容(监听文件)
# 定义三个flume工作组件 a1.sources=r1 # 数据来源 a1.sinks=k1 # 数据到哪去 a1.channels=c1 # 中间的缓冲频道(一般设置到内存中) a1.sources.r1.type=exec # 监听文件 a1.sources.r1.command=tail -F /opt/data/myword.txt # 监听哪个文件 a1.sinks.k1.type=logger a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
-
创建文件
vim /opt/data/myword.txt
-
启动监听
flume-ng agent -c conf -f 解压目录/conf/wenjian.conf --name a1 -Dflume.root.logger=INFO,console
-
向文件写入内容
echo "my name whp" >> /opt/data/myword.txt
链接kafka配置文件
-
创建配置文件
vim 下载目录/flume/conf/kafka.conf(文件名自取)
-
添加内容(监听文件)
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=exec a1.sources.r1.command=tail -F /opt/data/myword.txt # flume监听内容后丢到kafka中 a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink # 设置kafka的主题 a1.sinks.k1.topic=youxi # 设置kafka的broker地址以及端口号 a1.sinks.k1.kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092 # 设置kafka序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.sources.c1.channels=c1 a1.sinks.k1.channel=c1
-
flume链接kafka
zookeeper集群
默认通信端口:2181
-
解压压缩包至指定目录
tar -zxvf zookeeper… -C 指定目录
-
重命名压缩目录
-
配置环境变量
vim /root/.bash_profile
export ZOOKEEPER_HOME=/usr/local/src/zookeeper
export PATH:$ZOOKEEPER_HOME/bin
-
使环境变量生效
source /root/.bash_profile
-
在zookeeper目录下编辑zoo.cfg配置文件
-
拷贝一份zoo_sample.cfg文件
cp zoo_sample.cfg zoo.cfg
-
编辑zoo.cfg文件: vi/vim zoo.cfg
加入以下内容
server.1=hadoop101:2888:3888 server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888
-
在/tmp/zookeeper目录下创建myid
vim myid
-
或者将zoo.cfg中dataDir改为
dataDir=/usr/local/src/zookeeper/tmp/data
在zookeeper目录下创建tmp/data文件夹新建myid文件
vim myid
加入以下内容:
1
-
-
将zookeep目录拷贝至其它机器上
scp -r zookeeper/ hadoop102:/usr/local/src/ scp -r 需拷贝的目录路径/ 主机名:拷贝目录
-
远程登陆其他机器
ssh hadoop102
vim /usr/local/src/zookeeper/tmp/data/myid
改成2
退出
exit;
ssh hadoop103
vim /usr/local/src/zookeeper/tmp/data/myid
改成3
退出
exit;
-
更改另两台机器的环境变量
-
使配置生效: source /root/.bash_profile
-
启动三台机器的zookeeper集群(三台机器输入以下命令)
zkServer.sh start
-
查看三台机器状态
zkServer.sh status
kafka配置
-
解压压缩包至指定目录
-
更改解压目录为kafka
-
更改环境变量
export KAFKA_HOME=/usr/local/src/kafka export PATH=$PATH:$KAFKA_HOME/bin
-
使配置环境生效
source /root/.bash_profile
-
更改在kafka目录下的config目录下的server.properties
vim /usr/local/src/kafka/config/server.properties
# 每台机器上的id都不能一样 broker.id=1 # 添加主机名 host.name=192.168.110.101 # 取消以下注释 #listeners = PLAINTEXT://your.host.name:9092 # 改为 主机名或主机ip listenters = PLAINTEXT://192.168.110.101:9092 #更改log.dirs 改为: log.dirs=/usr/local/src/kafka/tmp/kafka-logs # 更改zookpper通信地址zookeeper.connect zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181
-
将kafka目录拷贝至其他机器
scp -r kafka/ hadoopo102:/usr/local/src/
-
配置另外两台机器的环境变量
-
更改另两台的配置文件
vim server.properties
broker.id=2 host.name=192.168.110.102 listeners = PLAINTEXT://192.168.110.102:9092
-
启动kafka(三台机器输入以下指令)
kafka-server-start.sh -daemon kafka/config/server.properties
-
kafka测试(任选一台机器)
# 创建主题 kafka-topics.sh --create --zookeeper 主机名:2181 --replication-factor 1 --partitions 1 --topic test (test为主题名称) # 查看主题 kafka-topics.sh --zookeeper hadoop101:2181 -list # 启动生产者 kafka-console-producer.sh --broker-list ip地址:9092 --topic test # 启动消费者 kafka-console-consumer.sh --bootstrap-server 地址:9092 --topic test --from-beginning # 删除主题 kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
-
最后
以上就是冷静翅膀为你收集整理的flume安装,配置及端口与文件监听flumeflume链接kafka的全部内容,希望文章能够帮你解决flume安装,配置及端口与文件监听flumeflume链接kafka所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复