概述
题目:现有一张emp表,字段分别为
员工编号,员工姓名,工作,管理编号,生日,工资,备注,部门编号
数据:
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/7/13,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/7/13,1100,,20
7783,BOB,ANALYST,7777,1983/7/13,3200,,10
7822,BING,PRESIDENT,,2001/12/17,3000,,20
1233,CINFG,PRESIDENT,,2001/12/17,4000,,30
1233,FFSFG,PRESIDENT,999999,2001/12/17,4000,999999,10
2312,SDA,CLERK,3422,1987/7/13,2222,,30
4353,DFDS,CLERK,4563,1987/7/13,3111,999999,20
4564,RTEW,CLERK,5645,1987/7/13,6753,,20
7783,WOOP,ANALYST,7777,1983/7/13,5500,,10
5675,COC,ANALYST,7777,1983/7/13,6750,,30
3222,DOD,ANALYST,3422,1983/7/13,8400,,20,12
3211,EOE,ANALYST,7777,1983/7/13,2500,,10,33
目标:
1.根据工作类型(job)进行分区,
2.分区之下对每个员工按照部门(deptno)进行分组,
3.分组内部保证工资(sal)是降序。
4.每个分区中拿到每个部门工资排名第二的人(no.2)的信息。
注意:
请注意字段个数(只获取分割后数组大小为8的)
数据中99999为无效数据,请去除。
1.自定义javabean类
public class JobBean implements WritableComparable<JobBean> {
private int empno;
private String ename;
private String job;
private String mgr;
private String hiredate;
private int sal;
private String comm;
private int deptno;
public JobBean() {
}
public JobBean(int empno, String ename, String job, String mgr, String hiredate, int sal, String comm, int deptno) {
this.empno = empno;
this.ename = ename;
this.job = job;
this.mgr = mgr;
this.hiredate = hiredate;
this.sal = sal;
this.comm = comm;
this.deptno = deptno;
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public String getMgr() {
return mgr;
}
public void setMgr(String mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public String getComm() {
return comm;
}
public void setComm(String comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
@Override
public String toString() {
return "JobBean{" +
"empno=" + empno +
", ename='" + ename + ''' +
", job='" + job + ''' +
", mgr=" + mgr +
", hiredate='" + hiredate + ''' +
", sal=" + sal +
", comm=" + comm +
", deptno=" + deptno +
'}';
}
@Override
public int compareTo(JobBean o) {
int result;
//分区之下对每个员工按照部门(deptno)进行分组,
//分组内部保证工资(sal)是降序
if(this.deptno > o.getDeptno()){
result = 1;
}else if(this.deptno < o.getDeptno()){
result = -1;
}else{
//如果进入到这里 意味着两个部门(deptno)一样 此时根据sal倒序进行排序
result = sal > o.getSal() ? -1:(sal < o.getSal() ? 1:0);
}
return result;
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(empno);
out.writeUTF(ename);
out.writeUTF(job);
out.writeUTF(mgr);
out.writeUTF(hiredate);
out.writeInt(sal);
out.writeUTF(comm);
out.writeInt(deptno);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.empno = in.readInt();
this.ename = in.readUTF();
this.job = in.readUTF();
this.mgr = in.readUTF();
this.hiredate = in.readUTF();
this.sal = in.readInt();
this.comm = in.readUTF();
this.deptno = in.readInt();
}
}
2.自定义分区类
public class JobPartition extends Partitioner<JobBean, NullWritable> {
public static HashMap<String,Integer> pr= new HashMap<String ,Integer>();
static {
pr.put("CLERK",0);
pr.put("SALESMAN",1);
pr.put("MANAGER",2);
pr.put("ANALYST",3);
pr.put("PRESIDENT",4);
}
@Override
public int getPartition(JobBean jobBean, NullWritable nullWritable, int numPartitions) {
Integer integer = pr.get(jobBean.getJob());
if (integer != null) {
return integer;
}
return 5;
}
}
3.自定义分组类
public class JobGroupingComparator extends WritableComparator {
protected JobGroupingComparator(){
super(JobBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
JobBean aBean = (JobBean) a;
JobBean bBean = (JobBean) b;
//本需求中 分组规则是,只要前后两个数据的job一样 就应该分到同一组。
//只要compare 返回0 mapreduce框架就认为两个一样 返回不为0 就认为不一样
//根据工作类型(job)进行分区,
//分区之下对每个员工按照部门(deptno)进行分组,
//分组内部保证工资(sal)是降序
if (aBean.getDeptno() == bBean.getDeptno()) {
return 0;
} else
return 1;
}
}
4.Mapper类
public class JobMapper extends Mapper<LongWritable, Text, JobBean, NullWritable> {
JobBean keyOut = new JobBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
int empno = Integer.parseInt(split[0]);
String ename = split[1];
String job = split[2];
String hiredate = split[4];
int sal = Integer.parseInt(split[5]);
int deptno = Integer.parseInt(split[7]);
String m = split[3];//剔除999999
if(m.equals("999999")){
m = "";
}else{
m = split[3];
}
String mgr = m;
String co = split[6];//剔除999999
if(co.equals("999999")){
co = "";
}else{
co = split[6];
}
String comm = co;
keyOut.setEmpno(empno);
keyOut.setEname(ename);
keyOut.setJob(job);
keyOut.setMgr(mgr);
keyOut.setHiredate(hiredate);
keyOut.setComm(comm);
keyOut.setDeptno(deptno);
keyOut.setSal(sal);
context.write(keyOut,NullWritable.get());
}
}
5.Reducer类
public class JobReducer extends Reducer<JobBean, NullWritable,JobBean, NullWritable> {
@Override
protected void reduce(JobBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
//求第二名
for(NullWritable v: values){
//context.write(key, v);
num ++;
if(num == 2){
context.write(key, v);
break;
}
}
}
}
6.Driver类实现
public class JobTop02Driver {
public static void main(String[] args) throws Exception {
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, JobTop02Driver.class.getSimpleName());
// 设置作业驱动类
//conf.set("mapreduce.framework.name","yarn");
job.setJarByClass(JobTop02Driver.class);
// 设置作业mapper reducer类
job.setMapperClass(JobMapper.class);
job.setReducerClass(JobReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(JobBean.class);
job.setMapOutputValueClass(NullWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(JobBean.class);
job.setOutputValueClass(NullWritable.class);
//这里设置运行reduceTask的个数
//分区个数 == NumReduceTasks
// 分区个数 < NumReduceTasks 程序可以执行 只不过有空文件产生 影响性能
// 分区个数 > NumReduceTasks 程序保存 Illegal partition非法分区
job.setGroupingComparatorClass(JobGroupingComparator.class);
job.setPartitionerClass(JobPartition.class);
job.setNumReduceTasks(6);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job,new Path("D:\a_data\exercise\input"));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job,new Path("D:\a_data\exercise\output"));
//判断输出路径是否存在 如果存在删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("D:\a_data\exercise\output"))){
fs.delete(new Path("D:\a_data\exercise\output"),true);
}
// 提交作业并等待执行完成
boolean b = job.waitForCompletion(true);
//程序退出
System.exit(b?0:1);
}
}
结果展示:
最后
以上就是粗暴羊为你收集整理的hadoop学习整理——mapreduce数据分析案例(2)的全部内容,希望文章能够帮你解决hadoop学习整理——mapreduce数据分析案例(2)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复