1、在ConsumerDemo中上传文件
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97package com.zpark.kafka; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerDemo { private static KafkaConsumer<String, String> consumer; private static Properties props; static { props = new Properties(); //消费者kafkka地址 props.put("bootstrap.servers", "hdp3:9092"); //key反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //组 props.put("group.id", "yangk"); } /** * 从kafka中获取数据(SpringBoot也集成了kafka) */ private static void ConsumerMessage() { //允许自动提交位移 props.put("enable.auto.commit", true); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singleton("animals")); //subscribe 订阅 获取分区 //使用轮询拉取数据--消费完成之后会根据设置时长来清除消息,被消费过的消息,如果想再次被消费,可以根据偏移量(offset)来获取 // URI uri=null; // Configuration conf=null; // String user ="root"; // FileSystem fs =null; try { // uri =new URI("hdfs://hdp-1:9000"); // conf= new Configuration(); // conf.set("dfs.replication","2"); // conf.set("dfs.blocksize","64m"); // FileSystem fs = FileSystem.get(uri, conf, user); // //FSDataOutputStream out = fs.create(new Path("/dcf.txt")); FileOutputStream out = new FileOutputStream("D:/haha.txt"); // File file = new File("D:/heihei.txt"); // if(!file.exists()){ // file.mkdir(); // } // FileOutputStream out = new FileOutputStream(file); while (true) { //从kafka中读到了数据放在records中 ConsumerRecords<String, String> records = consumer.poll(100); //poll 获取数据 ,超时时间是100毫秒 for (ConsumerRecord<String, String> r : records) { String msg =r.topic()+r.offset()+ r.key()+ r.value(); System.out.printf("topic = %s, offset = %s, key = %s, value = %s", r.topic(), r.offset(), r.key(), r.value()); out.write(msg.getBytes()); //OutputStreamWriter osw = new OutputStreamWriter(out); // osw.write(msg); // osw.close(); // consumer.close(); } } } catch (IOException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String[] args) { //调用接收消息的方法 ConsumerMessage(); } } //用到kafkaconsumer中的pull可以拉取数据
2、定义一个定时器任务
复制代码
1
2
3
4
5
6
7
8
9
10package com.zpark.kafka; import java.util.Timer; public class TimeDemo { public static void main(String[] args) { //定义一个定时器任务,每隔一分钟运行一次日志获取 Timer tm = new Timer(); tm.schedule(new AccessTask(),0,1*60*1000L); } }
3、将临时目录文件上传到hdfs中
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.zpark.kafka; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.TimerTask; public class AccessTask extends TimerTask { public void run() { URI uri=null; // Configuration conf = null; try { uri =new URI("hdfs://hdp1:9000"); Configuration conf = new Configuration(); conf.set("dfs.replication","2"); conf.set("dfs.blocksize","64m"); String user ="root"; FileSystem fs = FileSystem.get(uri, conf, user); Path src = new Path("D:/haha.txt"); Path dst = new Path("/kafkalogs.txt"); fs.copyFromLocalFile(src,dst); fs.close(); } catch (URISyntaxException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
最后
以上就是顺利睫毛膏最近收集整理的关于将flume采集到的数据通过kafka上传到hdfs上的全部内容,更多相关将flume采集到内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复