概述
1、在ConsumerDemo中上传文件
package com.zpark.kafka;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerDemo {
private static KafkaConsumer<String, String> consumer;
private static Properties props;
static {
props = new Properties();
//消费者kafkka地址
props.put("bootstrap.servers", "hdp3:9092");
//key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//组
props.put("group.id", "yangk");
}
/**
* 从kafka中获取数据(SpringBoot也集成了kafka)
*/
private static void ConsumerMessage() {
//允许自动提交位移
props.put("enable.auto.commit", true);
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singleton("animals"));
//subscribe 订阅 获取分区
//使用轮询拉取数据--消费完成之后会根据设置时长来清除消息,被消费过的消息,如果想再次被消费,可以根据偏移量(offset)来获取
//
URI uri=null;
//
Configuration conf=null;
//
String user ="root";
// FileSystem fs =null;
try {
//
uri =new URI("hdfs://hdp-1:9000");
//
conf= new Configuration();
//
conf.set("dfs.replication","2");
//
conf.set("dfs.blocksize","64m");
//
FileSystem fs = FileSystem.get(uri, conf, user);
//
//FSDataOutputStream out = fs.create(new Path("/dcf.txt"));
FileOutputStream out = new FileOutputStream("D:/haha.txt");
//
File file = new File("D:/heihei.txt");
//
if(!file.exists()){
//
file.mkdir();
//
}
//
FileOutputStream out = new FileOutputStream(file);
while (true) {
//从kafka中读到了数据放在records中
ConsumerRecords<String, String> records = consumer.poll(100);
//poll 获取数据 ,超时时间是100毫秒
for (ConsumerRecord<String, String> r : records) {
String msg =r.topic()+r.offset()+
r.key()+ r.value();
System.out.printf("topic = %s, offset = %s, key = %s, value = %s", r.topic(), r.offset(),
r.key(), r.value());
out.write(msg.getBytes());
//OutputStreamWriter osw = new OutputStreamWriter(out);
//
osw.write(msg);
//
osw.close();
// consumer.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String[] args) {
//调用接收消息的方法
ConsumerMessage();
}
}
//用到kafkaconsumer中的pull可以拉取数据
2、定义一个定时器任务
package com.zpark.kafka;
import java.util.Timer;
public class TimeDemo {
public static void main(String[] args) {
//定义一个定时器任务,每隔一分钟运行一次日志获取
Timer tm = new Timer();
tm.schedule(new AccessTask(),0,1*60*1000L);
}
}
3、将临时目录文件上传到hdfs中
package com.zpark.kafka;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.TimerTask;
public class AccessTask extends TimerTask {
public void run() {
URI uri=null;
//
Configuration conf = null;
try {
uri =new URI("hdfs://hdp1:9000");
Configuration conf = new Configuration();
conf.set("dfs.replication","2");
conf.set("dfs.blocksize","64m");
String user ="root";
FileSystem fs = FileSystem.get(uri, conf, user);
Path src = new Path("D:/haha.txt");
Path dst = new Path("/kafkalogs.txt");
fs.copyFromLocalFile(src,dst);
fs.close();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
最后
以上就是顺利睫毛膏为你收集整理的将flume采集到的数据通过kafka上传到hdfs上的全部内容,希望文章能够帮你解决将flume采集到的数据通过kafka上传到hdfs上所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复