我是靠谱客的博主 美丽流沙,这篇文章主要介绍MyEclipse+hadoop+气象数据清洗,现在分享给大家,希望可以做个参考。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package org.apache.hadoop.examples; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PMtest <BaseOutputKey, BaseOutputValue> { public static class AirMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //将行数据转成字符串 String line = value.toString(); //切割整行数据 String[] filed = line.split(","); //过滤出PM2.5的数据 if (filed.length>4 && filed[2].equals("PM2.5")) { for (int i = 3,j=1; i < filed.length; i++,j++) { if ("".equals(filed[i])||filed[i].equals(null)||filed[i].equals(" ")) { filed[i]="0"; } context.write(new Text(filed[0]+","+j),new Text(filed[i])); } } } } public static class AirReducer extends Reducer<Text, Text, Text, LongWritable>{ @Override protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum=0l; for (Text text : v2s) { long pm = Long.parseLong(text.toString()); sum=sum+pm; } long avg=sum/24; context.write(k2, new LongWritable(avg)); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); //设置输出文件key和value分隔符 String input="hdfs://192.168.216.128:9000/input/"; String ouput="hdfs://192.168.216.128:9000/outut/"; conf.set("mapred.textoutputformat.separator", ","); Job job = Job.getInstance(conf,PMtest.class.getSimpleName()); //必须指定 job.setJarByClass(PMtest.class); //数据来自哪里 FileInputFormat.addInputPath(job, new Path(input)); //自定义的mapper在哪里 job.setMapperClass(AirMapper.class); //指定mapper输出的<k2,v2>的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //自定义的reducer在哪里 job.setReducerClass(AirReducer.class); //指定reducer输出的<k3,v3>的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(ouput)); job.waitForCompletion(true); } }

最后

以上就是美丽流沙最近收集整理的关于MyEclipse+hadoop+气象数据清洗的全部内容,更多相关MyEclipse+hadoop+气象数据清洗内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部