我是靠谱客的博主 英俊康乃馨,这篇文章主要介绍第1关:数据清洗 MapReduce综合应用案例 — 气象数据清洗,现在分享给大家,希望可以做个参考。

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

数据说明如下:a.txt

数据切分方式:一个或多个空格

数据所在位置:/user/test/input/a.txt

2005 01 01 16 -6 -28 10157 260 31 8 0 -9999

2005010116-6-28101572603180-9999
小时温度湿度气压风向风速天气情况1h降雨量6h降雨量

sky.txt

数据切分方式:逗号

数据所在位置:data/sky.txt或者/user/test/input/sky.txt

1,积云

1积云
天气情况cumulus

清洗规则:

  • 将分隔符转化为逗号;
  • 清除不合法数据:字段长度不足,风向不在[0,360]的,风速为负的,气压为负的,天气情况不在[0,10],湿度不在[0,100],温度不在[-40,50]的数据;
  • a.txtsky.txt的数据以天气情况进行join操作,把天气情况变为其对应的云属;
  • 对进入同一个分区的数据排序; 排序规则: (1)同年同月同天为key; (2)按每日温度升序; (3)若温度相同则按风速升序; (4)风速相同则按压强降序。
  • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)

数据清洗后如下:

2005,01,01,16,-6,-28,10157,260,31,卷云,0,-9999

Weather类代码:

package com;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**封装对象*/
public class Weather implements WritableComparable<Weather> {
//年
private String year;
//月
private String month;
//日
private String day;
//小时
private String hour;
//温度
private int temperature;
//湿度
private String dew;
//气压/压强
private int pressure;
//风向
private String wind_direction;
//风速
private int wind_speed;
//天气情况
private String sky_condition;
//1小时降雨量
private String rain_1h;
//6小时降雨量
private String rain_6h;
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public String getMonth() {
return month;
}
public void setMonth(String month) {
this.month = month;
}
public String getDay() {
return day;
}
public void setDay(String day) {
this.day = day;
}
public String getHour() {
return hour;
}
public void setHour(String hour) {
this.hour = hour;
}
public int getTemperature() {
return temperature;
}
public void setTemperature(int temperature) {
this.temperature = temperature;
}
public String getDew() {
return dew;
}
public void setDew(String dew) {
this.dew = dew;
}
public int getPressure() {
return pressure;
}
public void setPressure(int pressure) {
this.pressure = pressure;
}
public String getWind_direction() {
return wind_direction;
}
public void setWind_direction(String wind_direction) {
this.wind_direction = wind_direction;
}
public int getWind_speed() {
return wind_speed;
}
public void setWind_speed(int wind_speed) {
this.wind_speed = wind_speed;
}
public String getSky_condition() {
return sky_condition;
}
public void setSky_condition(String sky_condition) {
this.sky_condition = sky_condition;
}
public String getRain_1h() {
return rain_1h;
}
public void setRain_1h(String rain_1h) {
this.rain_1h = rain_1h;
}
public String getRain_6h() {
return rain_6h;
}
public void setRain_6h(String rain_6h) {
this.rain_6h = rain_6h;
}
/********** begin **********/
@Override
public String toString() {
return year + "," + month + "," + day + "," + hour + "," + temperature + "," + dew + "," + pressure + ","
+ wind_direction + "," + wind_speed + "," + sky_condition + "," + rain_1h + "," + rain_6h;
}
/********** end **********/
public Weather() {
}
public Weather(String year, String month, String day, String hour, int temperature, String dew, int pressure,
String wind_direction, int wind_speed, String sky_condition, String rain_1h, String rain_6h) {
this.year = year;
this.month = month;
this.day = day;
this.hour = hour;
this.temperature = temperature;
this.dew = dew;
this.pressure = pressure;
this.wind_direction = wind_direction;
this.wind_speed = wind_speed;
this.sky_condition = sky_condition;
this.rain_1h = rain_1h;
this.rain_6h = rain_6h;
}
public void readFields(DataInput in) throws IOException {
year = in.readUTF();
month = in.readUTF();
day = in.readUTF();
hour = in.readUTF();
temperature = in.readInt();
dew = in.readUTF();
pressure = in.readInt();
wind_direction = in.readUTF();
wind_speed = in.readInt();
sky_condition = in.readUTF();
rain_1h = in.readUTF();
rain_6h = in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeUTF(year);
out.writeUTF(month);
out.writeUTF(day);
out.writeUTF(hour);
out.writeInt(temperature);
out.writeUTF(dew);
out.writeInt(pressure);
out.writeUTF(wind_direction);
out.writeInt(wind_speed);
out.writeUTF(sky_condition);
out.writeUTF(rain_1h);
out.writeUTF(rain_6h);
}
public int compareTo(Weather o) {
/********** begin **********/
int tmp = this.month.compareTo(o.month);
if (tmp == 0) {
tmp = this.day.compareTo(o.day);
if (tmp == 0) {
tmp = this.temperature - o.temperature;
if (tmp == 0) {
tmp = this.wind_speed - o.wind_speed;
if (tmp == 0) {
tmp = o.pressure - this.pressure;
return tmp;
}
return tmp;
}
return tmp;
}
return tmp;
}
return tmp;
/********** end **********/
}
}

(2)WeatherMap类代码:

package com;
import java.io.*;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import javax.sound.midi.Soundbank;
public class WeatherMap extends Mapper<LongWritable, Text, Weather, NullWritable> {
/********** begin **********/
Text text = new Text();
HashMap<String, String> map = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
File f=new File("data/sky.txt");
InputStream inputStream = new FileInputStream(f);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
String[] split = line.split(",");
map.put(split[0], split[1]);
}
bufferedReader.close();
inputStream.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String split[] = line.split("\s+");
String year = split[0];
String month = split[1];
String day = split[2];
String hour = split[3];
int temperature = Integer.valueOf(split[4]);
String dew = split[5];
int pressure = Integer.valueOf(split[6]);
String wind_direction = split[7];
int wind_speed = Integer.valueOf(split[8]);
String sky_condition = split[9];
String rain_1h = split[10];
String rain_6h = split[11];
if (split.length != 12 || pressure < 0
|| Integer.valueOf(wind_direction) < 0 || Integer.valueOf(wind_direction) > 360
|| Integer.valueOf(sky_condition) < 0 || Integer.valueOf(sky_condition) > 10
||
temperature< -40 || temperature>50
||
Integer.valueOf(dew)< 0 || Integer.valueOf(dew)>100
|| wind_speed<0
) {
return;
}
for (Entry<String, String> entry : map.entrySet()) {
if (sky_condition.equals(entry.getKey())) {
sky_condition = entry.getValue();
}
}
Weather weather = new Weather(year, month, day, hour, temperature, dew, pressure, wind_direction, wind_speed,
sky_condition, rain_1h, rain_6h);
context.write(weather, NullWritable.get());
}
/********** end **********/
}

(3)WeatherReduce类代码:

package com;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class WeatherReduce extends Reducer<Weather, NullWritable, Weather, NullWritable> {
/********** begin **********/
@Override
protected void reduce(Weather key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
for (NullWritable nullWritable : values) {
context.write(key, NullWritable.get());
}
}
/********** end **********/
}

(4)Auto类代码:

package com;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class Auto extends Partitioner<Weather, NullWritable> {
/********** begin **********/
public static Map<String, Integer> provinceDict = new HashMap<String, Integer>();
static {
int a = 0;
for (int i = 1980; i <= 1981; i++) {
provinceDict.put(i + "", a);
a++;
}
}
public int getPartition(Weather key, NullWritable nullWritable, int numPartitions) {
Integer id = provinceDict.get(key.toString().substring(0, 4));
return id == null ? 2 : id;
}
/********** end **********/
}

5)WeatherTest类代码:

package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WeatherTest {
public static void main(String[] args) throws Exception {
/********** begin **********/
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WeatherTest.class);
job.setMapperClass(WeatherMap.class);
job.setMapOutputKeyClass(Weather.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(WeatherReduce.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Weather.class);
job.setNumReduceTasks(3);
job.setPartitionerClass(Auto.class);
Path inPath = new Path("/user/test/input/a.txt");
Path out = new Path("/user/test/output");
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);
/********** end **********/
}
}

启动hadoop#start-all.sh

最后

以上就是英俊康乃馨最近收集整理的关于第1关:数据清洗 MapReduce综合应用案例 — 气象数据清洗的全部内容,更多相关第1关:数据清洗内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部