我是靠谱客的博主 震动外套,最近开发中收集的这篇文章主要介绍Map端join算法实现,解决Reduce端数据倾斜,负载不均(分布式缓存),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1.需求:

订单数据表t_order:

关系数据库表-

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

抽象成文件数据:

1001,20150710,P0001,2
1002,20150710,P0001,3
1003,20150710,P0002,3
1002,20150710,P0003,3
1002,20150710,P0002,4

。。。。。。
商品信息表t_product:

id

pname



P0001

小米5



P0002

魅族



抽象成文件数据:
p0001,小米
p0002,魅族
p0003,oppo

 。。。。。。

现在要把2个文件通过pID进行关联,如果按照上一遍文章,在Reduce中做关联那么问题来了,如果小米手机的订单量远远大于魅族手机的订单量,这样就会产生处理小米手机的ReduceTask任务量很大,执行时间久,而处理魅族的ReduceTask任务轻很快就执行完了,出现了负载不均衡,数据倾斜。

2.实现

鉴于是小数据量的表和大数据量的表进行join,可以用分布式缓存把小表缓存到map节点,在map阶段直接使用,与大表进行join,不在reduce阶段进行join,避免数据倾斜,提高并发量和效率。

通过main函数先把缓存文件加载到节点(底层自动分发到map节点--分布式缓存):

job.addCacheFile(“文件路径”);

代码:

package join;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapJoin {
static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
Map<String,String> pmap=new HashMap<String,String>();//装产品数据
Text k=new Text();
/*重写setup方法,看Mapper源码里面有个线程的run方法,
方法执行顺序是setup()-->map(),所以会先加载setup方法
*/
@Override
protected void setup(Context context)throws IOException, InterruptedException {
//读取本地缓存中文件的数据
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product")));
String line;
//如果读取的一行数据不为空,则把产品数据切分,存储在pmap中
while(StringUtils.isNotEmpty(line=br.readLine())){
String[] fields = line.split(",");
pmap.put(fields[0], fields[1]);//K为商品id,v为商品名
}
br.close();
}
//由于已经持有完整的产品信息表,所以直接在map中实现join操作,不需要reduce操作,就不会有数据倾斜
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String oline = value.toString();//获取订单信息
String[] fields = oline.split(",");
String pName = pmap.get(fields[2]);//根据pid获取pName
k.set(oline+","+pName);//数据拼接
context.write(k, new Text(""));//输出
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoin.class);
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));//订单文件地址
FileOutputFormat.setOutputPath(job, new Path(args[1]));//join后的文件数据地址
//指定需要缓存文件到所有mapTask运行节点工作目录
job.addCacheFile(new URI(args[2]));//args[2]缓存文件路径可以写死(product)
//由于Map就把事情搞定了,所以不需要reduce阶段,reduceTask为0
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}

结果:

1001,20150710,P0001,2,小米

。。。。。。

。。。。。。

最后

以上就是震动外套为你收集整理的Map端join算法实现,解决Reduce端数据倾斜,负载不均(分布式缓存)的全部内容,希望文章能够帮你解决Map端join算法实现,解决Reduce端数据倾斜,负载不均(分布式缓存)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部