概述
整体思路:
写一个循环脚本制造假数据,通过flume将这些数据下沉到kafka中,然后通过java代码,把这些数据暂存到本地临时路径下,然后设置一个定时器,定时将这些数据上传到hdfs中,代码如下:
1、消费者:负责将数据放到本地路径暂存
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerDemo {
private static KafkaConsumer<String, String> consumer;
private static Properties props;
//static静态代码块,在main之前先执行
static {
props = new Properties();
//消费者kafka地址
props.put("bootstrap.servers", "hdp-2:9092");
//key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//组
props.put("group.id", "yangk");
}
/**
* 从kafka中获取数据(SpringBoot也集成了kafka)
*/
private static void ConsumerMessage() {
//允许自动提交位移
props.put("enable.auto.commit", true);
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singleton("first_kafka"));
//使用轮询拉取数据--消费完成之后会根据设置时长来清除消息,被消费过的消息,如果想再次被消费,可以根据偏移量(offset)来获取
try {
File file = new File("F:/xiu.txt");
//建文件输出流
FileOutputStream fos = new FileOutputStream(file);
while (true) {
//从kafka中读到了数据放在records中
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> r : records) {
String a = r.topic()+ r.offset()+ r.key()+ r.value() + "rn";
//将数据写到F:/xiu.txt
fos.write(a.getBytes());
System.out.printf("topic = %s, offset = %s, key = %s, value = %s", r.topic(), r.offset(),
r.key(), r.value() + "n");
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String[] args) {
//调用接收消息的方法
ConsumerMessage();
}
}
2、从本地临时路径上传到hdfs
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.TimerTask;
public class UploadData extends TimerTask{
public void run() {
URI uri = null;
try {
uri = new URI("hdfs://hdp-1:9000");
Configuration conf = new Configuration();
conf.set("dfs.replication", "2");//name,value 副本个数
conf.set("dfs.blocksize", "64m");//块的大小
String user = "root";
FileSystem fs = FileSystem.get(uri,conf,user);
Path src = new Path("F:/xiu.txt");//本地临时路径文件
Path dst = new Path("/hello_kafka.txt");//上传到hdfs的目标文件
fs.copyFromLocalFile(src,dst);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、设置定时器
import java.util.Timer;
public class TimerDemo {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new UploadData(),0,4*1000);//测试为了方便4s钟重新加载一次UploalData类
}
}
4、每隔4s刷新一次50070页面,会看到文件在变化,前提是采集数据的脚本1s要设置生成许多假数据,不然效果不明显
最后
以上就是正直水池为你收集整理的flume采集下沉到kafka再上传到hdfs整体思路:的全部内容,希望文章能够帮你解决flume采集下沉到kafka再上传到hdfs整体思路:所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复