题目:现有一张emp表,字段分别为
员工编号,员工姓名,工作,管理编号,生日,工资,备注,部门编号
数据:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
237369,SMITH,CLERK,7902,1980/12/17,800,,20 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30 7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30 7566,JONES,MANAGER,7839,1981/4/2,2975,,20 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30 7782,CLARK,MANAGER,7839,1981/6/9,2450,,10 7788,SCOTT,ANALYST,7566,1987/7/13,3000,,20 7839,KING,PRESIDENT,,1981/11/17,5000,,10 7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30 7876,ADAMS,CLERK,7788,1987/7/13,1100,,20 7783,BOB,ANALYST,7777,1983/7/13,3200,,10 7822,BING,PRESIDENT,,2001/12/17,3000,,20 1233,CINFG,PRESIDENT,,2001/12/17,4000,,30 1233,FFSFG,PRESIDENT,999999,2001/12/17,4000,999999,10 2312,SDA,CLERK,3422,1987/7/13,2222,,30 4353,DFDS,CLERK,4563,1987/7/13,3111,999999,20 4564,RTEW,CLERK,5645,1987/7/13,6753,,20 7783,WOOP,ANALYST,7777,1983/7/13,5500,,10 5675,COC,ANALYST,7777,1983/7/13,6750,,30 3222,DOD,ANALYST,3422,1983/7/13,8400,,20,12 3211,EOE,ANALYST,7777,1983/7/13,2500,,10,33
目标:
1.根据工作类型(job)进行分区,
2.分区之下对每个员工按照部门(deptno)进行分组,
3.分组内部保证工资(sal)是降序。
4.每个分区中拿到每个部门工资排名第二的人(no.2)的信息。
注意:
请注意字段个数(只获取分割后数组大小为8的)
数据中99999为无效数据,请去除。
1.自定义javabean类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147public class JobBean implements WritableComparable<JobBean> { private int empno; private String ename; private String job; private String mgr; private String hiredate; private int sal; private String comm; private int deptno; public JobBean() { } public JobBean(int empno, String ename, String job, String mgr, String hiredate, int sal, String comm, int deptno) { this.empno = empno; this.ename = ename; this.job = job; this.mgr = mgr; this.hiredate = hiredate; this.sal = sal; this.comm = comm; this.deptno = deptno; } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public String getMgr() { return mgr; } public void setMgr(String mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public String getComm() { return comm; } public void setComm(String comm) { this.comm = comm; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } @Override public String toString() { return "JobBean{" + "empno=" + empno + ", ename='" + ename + ''' + ", job='" + job + ''' + ", mgr=" + mgr + ", hiredate='" + hiredate + ''' + ", sal=" + sal + ", comm=" + comm + ", deptno=" + deptno + '}'; } @Override public int compareTo(JobBean o) { int result; //分区之下对每个员工按照部门(deptno)进行分组, //分组内部保证工资(sal)是降序 if(this.deptno > o.getDeptno()){ result = 1; }else if(this.deptno < o.getDeptno()){ result = -1; }else{ //如果进入到这里 意味着两个部门(deptno)一样 此时根据sal倒序进行排序 result = sal > o.getSal() ? -1:(sal < o.getSal() ? 1:0); } return result; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeInt(empno); out.writeUTF(ename); out.writeUTF(job); out.writeUTF(mgr); out.writeUTF(hiredate); out.writeInt(sal); out.writeUTF(comm); out.writeInt(deptno); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.empno = in.readInt(); this.ename = in.readUTF(); this.job = in.readUTF(); this.mgr = in.readUTF(); this.hiredate = in.readUTF(); this.sal = in.readInt(); this.comm = in.readUTF(); this.deptno = in.readInt(); } }
2.自定义分区类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class JobPartition extends Partitioner<JobBean, NullWritable> { public static HashMap<String,Integer> pr= new HashMap<String ,Integer>(); static { pr.put("CLERK",0); pr.put("SALESMAN",1); pr.put("MANAGER",2); pr.put("ANALYST",3); pr.put("PRESIDENT",4); } @Override public int getPartition(JobBean jobBean, NullWritable nullWritable, int numPartitions) { Integer integer = pr.get(jobBean.getJob()); if (integer != null) { return integer; } return 5; } }
3.自定义分组类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class JobGroupingComparator extends WritableComparator { protected JobGroupingComparator(){ super(JobBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { JobBean aBean = (JobBean) a; JobBean bBean = (JobBean) b; //本需求中 分组规则是,只要前后两个数据的job一样 就应该分到同一组。 //只要compare 返回0 mapreduce框架就认为两个一样 返回不为0 就认为不一样 //根据工作类型(job)进行分区, //分区之下对每个员工按照部门(deptno)进行分组, //分组内部保证工资(sal)是降序 if (aBean.getDeptno() == bBean.getDeptno()) { return 0; } else return 1; } }
4.Mapper类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class JobMapper extends Mapper<LongWritable, Text, JobBean, NullWritable> { JobBean keyOut = new JobBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); int empno = Integer.parseInt(split[0]); String ename = split[1]; String job = split[2]; String hiredate = split[4]; int sal = Integer.parseInt(split[5]); int deptno = Integer.parseInt(split[7]); String m = split[3];//剔除999999 if(m.equals("999999")){ m = ""; }else{ m = split[3]; } String mgr = m; String co = split[6];//剔除999999 if(co.equals("999999")){ co = ""; }else{ co = split[6]; } String comm = co; keyOut.setEmpno(empno); keyOut.setEname(ename); keyOut.setJob(job); keyOut.setMgr(mgr); keyOut.setHiredate(hiredate); keyOut.setComm(comm); keyOut.setDeptno(deptno); keyOut.setSal(sal); context.write(keyOut,NullWritable.get()); } }
5.Reducer类
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class JobReducer extends Reducer<JobBean, NullWritable,JobBean, NullWritable> { @Override protected void reduce(JobBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { int num = 0; //求第二名 for(NullWritable v: values){ //context.write(key, v); num ++; if(num == 2){ context.write(key, v); break; } } } }
6.Driver类实现
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class JobTop02Driver { public static void main(String[] args) throws Exception { //配置文件对象 Configuration conf = new Configuration(); // 创建作业实例 Job job = Job.getInstance(conf, JobTop02Driver.class.getSimpleName()); // 设置作业驱动类 //conf.set("mapreduce.framework.name","yarn"); job.setJarByClass(JobTop02Driver.class); // 设置作业mapper reducer类 job.setMapperClass(JobMapper.class); job.setReducerClass(JobReducer.class); // 设置作业mapper阶段输出key value数据类型 job.setMapOutputKeyClass(JobBean.class); job.setMapOutputValueClass(NullWritable.class); //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型 job.setOutputKeyClass(JobBean.class); job.setOutputValueClass(NullWritable.class); //这里设置运行reduceTask的个数 //分区个数 == NumReduceTasks // 分区个数 < NumReduceTasks 程序可以执行 只不过有空文件产生 影响性能 // 分区个数 > NumReduceTasks 程序保存 Illegal partition非法分区 job.setGroupingComparatorClass(JobGroupingComparator.class); job.setPartitionerClass(JobPartition.class); job.setNumReduceTasks(6); // 配置作业的输入数据路径 FileInputFormat.addInputPath(job,new Path("D:\a_data\exercise\input")); // 配置作业的输出数据路径 FileOutputFormat.setOutputPath(job,new Path("D:\a_data\exercise\output")); //判断输出路径是否存在 如果存在删除 FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path("D:\a_data\exercise\output"))){ fs.delete(new Path("D:\a_data\exercise\output"),true); } // 提交作业并等待执行完成 boolean b = job.waitForCompletion(true); //程序退出 System.exit(b?0:1); } }
结果展示:
最后
以上就是粗暴羊最近收集整理的关于hadoop学习整理——mapreduce数据分析案例(2)的全部内容,更多相关hadoop学习整理——mapreduce数据分析案例(2)内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复