1.需求:
订单数据表t_order:
关系数据库表-
id | date | pid | amount |
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 3 |
1002 | 20150710 | P0002 | 3 |
1001,20150710,P0001,2
1002,20150710,P0001,3
1003,20150710,P0002,3
1002,20150710,P0003,3
1002,20150710,P0002,4
。。。。。。
商品信息表t_product:
id | pname | | |
P0001 | 小米5 | | |
P0002 | 魅族 | | |
p0001,小米
p0002,魅族
p0003,oppo
。。。。。。
现在要把2个文件通过pID进行关联,如果按照上一遍文章,在Reduce中做关联那么问题来了,如果小米手机的订单量远远大于魅族手机的订单量,这样就会产生处理小米手机的ReduceTask任务量很大,执行时间久,而处理魅族的ReduceTask任务轻很快就执行完了,出现了负载不均衡,数据倾斜。
2.实现
鉴于是小数据量的表和大数据量的表进行join,可以用分布式缓存把小表缓存到map节点,在map阶段直接使用,与大表进行join,不在reduce阶段进行join,避免数据倾斜,提高并发量和效率。
通过main函数先把缓存文件加载到节点(底层自动分发到map节点--分布式缓存):
job.addCacheFile(“文件路径”);
代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package join; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapJoin { static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ Map<String,String> pmap=new HashMap<String,String>();//装产品数据 Text k=new Text(); /*重写setup方法,看Mapper源码里面有个线程的run方法, 方法执行顺序是setup()-->map(),所以会先加载setup方法 */ @Override protected void setup(Context context)throws IOException, InterruptedException { //读取本地缓存中文件的数据 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product"))); String line; //如果读取的一行数据不为空,则把产品数据切分,存储在pmap中 while(StringUtils.isNotEmpty(line=br.readLine())){ String[] fields = line.split(","); pmap.put(fields[0], fields[1]);//K为商品id,v为商品名 } br.close(); } //由于已经持有完整的产品信息表,所以直接在map中实现join操作,不需要reduce操作,就不会有数据倾斜 @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String oline = value.toString();//获取订单信息 String[] fields = oline.split(","); String pName = pmap.get(fields[2]);//根据pid获取pName k.set(oline+","+pName);//数据拼接 context.write(k, new Text(""));//输出 } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoin.class); job.setMapperClass(JoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//订单文件地址 FileOutputFormat.setOutputPath(job, new Path(args[1]));//join后的文件数据地址 //指定需要缓存文件到所有mapTask运行节点工作目录 job.addCacheFile(new URI(args[2]));//args[2]缓存文件路径可以写死(product) //由于Map就把事情搞定了,所以不需要reduce阶段,reduceTask为0 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
结果:
1001,20150710,P0001,2,小米
。。。。。。
。。。。。。
最后
以上就是震动外套最近收集整理的关于Map端join算法实现,解决Reduce端数据倾斜,负载不均(分布式缓存)的全部内容,更多相关Map端join算法实现内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复