概述
1,要处理的数据,也是要测试的数据:tq.txt
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
2 案例要求
找出每月气温最高的两天
3代码实现
首先创建温度天气的实体类TQ,并实现WritableComparable接口
/*
* 创建对应天气的实体类
*/
public class TQ implements WritableComparable<TQ>{
private int year;
private int month;
private int day;
private int wd;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getDay() {
return day;
}
public void setDay(int day) {
this.day = day;
}
public int getWd() {
return wd;
}
public void setWd(int wd) {
this.wd = wd;
}
@Override
public String toString() {
return year + "-" + month + "-" + day;
}
//读取数据,要与往外写数据的顺序一致
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.month = in.readInt();
this.day = in.readInt();
this.wd = in.readInt();
}
//往外写数据
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(day);
out.writeInt(wd);
}
@Override
public int compareTo(TQ o) {
int c1 = Integer.compare(this.year, o.getYear());
if(c1 == 0) {
int c2 = Integer.compare(this.month, o.getMonth());
if(c2 == 0) {
return Integer.compare(this.day, o.getDay());
}
return c2;
}
return c1;
}
}
创建TqMapper类
public class TqMapper extends Mapper<LongWritable,Text,TQ,Text>{
TQ tq = new TQ();
Text vwd = new Text();
//重写map方法
@Override
protected void map(LongWritable key,Text value,Context context)throws IOException, InterruptedException{
try{
//将1951-07-03 12:21:03 47c切割
String [] strs = StringUtils.split(value.toString,'t');
/*
处理日期
*/
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = null;
date = sdf.parse(strs[0]);
//日历解析器
Calendar cal = Calendar.getInstance();
cal.setTime(date);
//向TQ中设置数据
tq.setYear(cal.get(Calendar.YEAR));
tq.setMonth(cal.get(Calendar.MONTH));
tq.setDay(cal.get(Calendar.DAY_OF_MONTH));
/**
处理温度
*/
int wd = Integer.parseInt(strs[1].substring(0,strs[1].length()-1));
tq.setWd(wd);
vwd.set(wd+"");
//往外输出
context.write(rq,vwd);
}catch(ParseException e){
e.printStackTrace();
}
}
}
自定义排序比较器
package Tq;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TSortComparator extends WritableComparator{
TQ t1 = null;
TQ t2 = null;
public TSortComparator() {
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
t1 = (TQ) a;
t2 = (TQ) b;
int c1 = Integer.compare(t1.getYear(), t2.getYear());
if(c1 == 0) {
int c2 = Integer.compare(t1.getDay(), t2.getDay());
if(c2 == 0) {
return - Integer.compare(t1.getWd(), t2.getWd());
}
return c2;
}
return c1;
}
}
自定义分区器,需要定义一个继承Partitioner,并实现getPartition方法
public class TPartioner extends Partitioner<TQ, IntWritable>{
@Override
public int getPartition(TQ key, IntWritable value, int numPartitions) {
//numPartitions这个可以设置,可以通过reducetask数量的设置来设置这个
return key.getYear() % numPartitions; //分区数的确认不需要根据year的hashcode值了,应为int的hash值还是本身。所以此处直接使用key.getYear()的值去取模
}
}
创建组排序器
public class TGroupConparator extends WritableComparator{
TQ t1 = null;
TQ t2 = null;
public TGroupConparator() {
super(TQ.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
t1 = (TQ) a;
t2 = (TQ) b;
int c1 = Integer.compare(t1.getYear(), t2.getYear());
if(c1 == 0) {
return Integer.compare(t1.getMonth(), t2.getMonth());
}
return c1;
}
}
创建Reduce类
public class Treduce extends Reducer<TQ, Text, Text, Text>{
Text rkey = new Text();
Text rval = new Text();
@Override
protected void reduce(TQ key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
/*为了只记录不同天数中的最大值,
* 有可能前两个温度最大的是同意天,所以设置标志,用来记录判断是否为同一天,如果不是同一天的最高温度就往外写,如果是就不写.
*
*/
int flg = 0;
int day = 0;
for(Text v : values) {
if(flg == 0) {
day = key.getDay();
rkey.set(key.toString());
rval.set(key.getWd()+"");
context.write(rkey, rval);
flg ++;
}
if(flg != 0 && day != key.getDay()) {//不是第一个数据,两个最高温度也不是同一天
rkey.set(key.toString());
rval.set(key.getWd()+"");
context.write(rkey, rval);
return; //两个数据已经写完,跳出reduce方法,回到调用reduce方法的地方
}
}
}
}
创建主类TQMR
public class TQMR{
public static void main(String [] args ) {
//1.配置
Configuration conf = new Configuration();
//2.创建job
Job job = Job.getInstance(conf);
//3.设置哪一个类作为jar包的主类
job.setJarByClass(TQMR.class);
//4.设置job作业名
job.setJobName("tq");
//5.设置文件输入路径和结果输出路径
Path inPath = new Path("/tq/input/tq.txt");
FileInputFormat.addInputPath(job,inPath);
//设置结果输出路径
Path outPath = new Path("/tq/output.txt");
//判断输出路径是否存在,如果存在则删除
if(outPath.getFileSystem(conf).exists(outPath)){
outPath.getFileSystem(conf).delete(outPath);
}
FileOutputFormat.setOutputPath(job,outPath);
//6.设置Mapper
job.setMapperClass(TqMapper.class); //该Mapper类使用我们自己创建的
//7.设置输出key的类型
job.setMapOutputKeyClass(TQ.class);
//8.设置输出值的类型
job.setMapOutputValueClass(Text.class);
//9.自定义排序比较器
job.setSortComparatorClass(TSortComparator.class);
//10.自定义分区器
job.setPartitionerClass(TPartioner.class);
//11.自定义组排序
job.setGroupingComparatorClass(TGroupConparator.class);
job.setCombinerKeyGroupingComparatorClass(TGroupConparator.class);
//12.设置reducetask数量
job.setNumReduceTasks(2);
//13.设置reduce
job.setReduceClass(Treduce.class);
//14.提交作业
job.waitForCompletion(true);
}
}
注意本次导入的类:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
最后
以上就是陶醉钻石为你收集整理的Hadoop的MapReduce作业实现筛选天气案例——代码实现的全部内容,希望文章能够帮你解决Hadoop的MapReduce作业实现筛选天气案例——代码实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复