概述
flume监控oracle实时增量并推送至kafka
- 启动kafka
- 检查topic
- 创建topic
- 开启消费消息
- 检查flume配置文件
- 启动agent
- 检查Kafka消费数据记录
该方式只能实现增量,修改删除不可用;
启动kafka
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties
检查topic
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
创建topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic testoracle
开启消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic testoracle
检查flume配置文件
(数据库相关、增量开始字段、自定义查询等)
a1.channels = c1
a1.sources = r1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
#source组件类型名称,必须为org.keedio.flume.source.SQLSource
a1.sources.r1.type = org.keedio.flume.source.SQLSource
#与远程数据库连接的网址,这里我是oracle
a1.sources.r1.hibernate.connection.url = jdbc:oracle:thin:@X.X.X.X:1521/orcl
# Hibernate Database connection properties
#用于连接数据库的用户名
a1.sources.r1.hibernate.connection.user = user
#连接数据库的密码
a1.sources.r1.hibernate.connection.password = pwd
#是否自动提交
a1.sources.r1.hibernate.connection.autocommit = true
#Dialect to use by hibernate
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
#驱动类
a1.sources.r1.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
#在运行查询完成后等待时间(ms)
a1.sources.r1.run.query.delay=10
#保存状态文件的路径
a1.sources.r1.status.file.path = /opt/flume/flumestatus
#保存状态文件的名称
a1.sources.r1.status.file.name = sqlSource.status
# Custom query
#查询开始的标识,使用你增量的字段。这里我用的时间毫秒值来做增量判断,可以根据业务来更改
a1.sources.r1.start.from = 0
#自定义查询语句,一定要确保查询结果的第一位置返回增量字段
a1.sources.r1.custom.query = SELECT * FROM (SELECT TO_NUMBER(create_time-TO_DATE('2021-11-01 08:00:00','YYYY-MM-DD HH24:MI:SS'))*24*60*60*1000 AS SJHM,id,name FROM testflume) WHERE SJHM>$@$ ORDER BY SJHM ASC
#批量大小
a1.sources.r1.batch.size = 1000
#最大行数
a1.sources.r1.max.rows = 1000
#连接池驱动类
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
#连接池最小个数
a1.sources.r1.hibernate.c3p0.min_size=1
#连接池最大个数
a1.sources.r1.hibernate.c3p0.max_size=10
##############################
#channel的类型
a1.channels.c1.type = memory
#在channel中最多能保存多少个event
a1.channels.c1.capacity = 10000
#在每次从source获取数据后者将数据sink出去的一次事务操作中,最多处理的event数
a1.channels.c1.transactionCapacity = 10000
#在channel中最多容纳所有event body的总字节数
a1.channels.c1.byteCapacity = 800000
#这个值得含义跟上面一样,只不过这个计算event header跟最大可用内存的字节占比
a1.channels.c1.byteCapacityBufferPercentage = 20
##############################
#sink的类型
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#kafka topic
a1.sinks.k1.topic = testoracle
#brokerList
a1.sinks.k1.brokerList = localhost:9092
#ack机制:1表示producer只要收到一个leader分区副本成功写入的通知就认为推送消息成功了。0代表producer发送一次就不在发送,不管成不成功。-1代表producer只有收到分区内所有副本成功写入的通知才认为推送成功
a1.sinks.k1.requiredAcks = 1
#batch的大小
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = c1
a1.sources.r1.channels=c1
启动agent
./bin/flume-ng agent --conf ./conf --conf-file ./conf/test.conf --name a1 -Dflume.root.logger=INFO,console
检查Kafka消费数据记录
最后
以上就是热情小笼包为你收集整理的【flume监控oracle实时增量并推送至kafka】的全部内容,希望文章能够帮你解决【flume监控oracle实时增量并推送至kafka】所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复