概述
Join中数据倾斜问题解决
问题描述:
就是在一个reducer任务中累加的数量过大,而在另一个reducer任务累加的数量较少,这样就造成了数据倾斜
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-F2hcOloM-1632316508589)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215036327.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GEvmJbm6-1632316508591)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215625320.png)]
如上图,一个产品对应多个订单,但是假设iPhoneX卖得非常的好,而iPhone8P销量寥寥,那么我们在使用 mapreduce做数据分析的时候,我们的某个reducetask就会压力山大,而某些reducetask就很闲。 (这是把两个表放在一起进行处理)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Uroipon6-1632316508592)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918215734214.png)]
解决方案:
1、我们之所以产生数据倾斜,是因为我们使用reducetask这一个阶段来拼接pid相同的product和order,所以我们在reducetask才会产生数据倾斜
2、如果我们在maptask就能将product和order都join起 来,那么不需要reducetask就不会产生倾斜了
3、所以我们如果可以在map阶段就获取到产品的全表,那么读取到order表 就能够直接进行join了
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5Um2dtTT-1632316508593)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918221055288.png)]
分析:
遇到的问题
1、我们如何将product放在maptask?
2、我们并不确定maptask在哪
3、我们存放文件应该存放在maptask的job目录下,但是job目录是maptask启动之后才创建的。
4、把product拷贝到maptask下是一个难题
解决方案:
使用分布式缓存(Distributed Cache)在存储数据,然后maptask都从分布式缓存中读取,这样子就没有maptask不知道在哪里的问题以及redis的问题了。(也就是把两个表分开处理)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cbCdPvzI-1632316508594)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20210918224052864.png)]
通过查看源码了解到,map最终是运行在run方法中的,而run方法其实是线程运行的方法,再继续观察发现,在调用map方法之前会调用一次setup方法
*****简单的说处理这个问题就是:
(这里的主要目的就是把两张表提前进行连接,这样久不会产生数据倾斜问题)
1、建立一个集合存放商品信息表,(当然了在创建之前需要创建对象)
然后把相应的信息加入到集合中,如果没有值久设为空
2、在通过map进行
再次把信息加入到集合中,然后在输出,不用reduce阶段的聚合
这里主要是对集合的使用
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class BeanInfo implements WritableComparable<BeanInfo> {
//正常是实现Writable接口的,但是Writable是用作value的
//WritableComparable是把这个对象看做key来用
private int orderId;//订单id
private String date;//订单时间
private String pid;//商品id
private int amount;//购买数量
private String pname;//商品名称
private
int category_id;//商品分类id
private double price;//商品价格
//创建构造方法
public BeanInfo() {
}
public void set(int orderId, String date, String pid, int amount, String pname, int category_id, double price) {
this.orderId = orderId;
this.date = date;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.category_id = category_id;
this.price = price;
}
//创建get、set方法
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public int getCategory_id() {
return category_id;
}
public void setCategory_id(int category_id) {
this.category_id = category_id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
//实现这个接口,需要重写compareTo放方法
//在这里的这个主要作用是判断两个表的id号是否相同
//如果相同则返回0
@Override
public int compareTo(BeanInfo o) {
return this.category_id = o.category_id;
}
//这个是实现这个接口的序列化
//序列化是将对象的字段信息写入输出流
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.orderId);
//如果是字符串,要实现序列化需要使用writeUTF
out.writeUTF(this.date);
out.writeUTF(this.pid);
out.writeInt(this.amount);
out.writeUTF(this.pname);
out.writeInt(this.category_id);
out.writeDouble(this.price);
}
//这个实现这个接口的反序列化
//从输入流中读取各字段的信息
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readInt();
this.date = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.category_id = in.readInt();
this.price = in.readDouble();
}
//实现toString()方法
@Override
public String toString() {
return
orderId +" "+ date +" "+ pid +" "+ amount +" "+ pname +" "+ category_id +" "+ price;
}
}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class JoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
//创建一个集合,把订单表信息先行加入到集合中,如果没有该值则设为空
List<BeanInfo> beanInfoList = new ArrayList<>();
private Text k = new Text();
//读取product表中的数据
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
//1、控制读取数据
//这里是把数据放到idea中,通过反射进行读取
BufferedReader br = new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream("product.txt")));
//2、遍历这个文件中的数据
String line = null;
while ((line = br.readLine())!= null){
//实例化BeanInfo对象
BeanInfo beanInfo = new BeanInfo();
String[] words = line.split(",");
//对对象进行赋值
beanInfo.set(0,"",words[0],0,words[1],Integer.parseInt(words[2]),Double.parseDouble(words[3]));
beanInfoList.add(beanInfo);
}
//3、释放
br.close();
}
//读取order信息,读取订单表信息
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(",");
int orderId = Integer.parseInt(words[0]);
String date = words[1];
String pid = words[2];
int amount = Integer.parseInt(words[3]);
//迭代product集合,用于比较集合中pid和order中的pid
Iterator<BeanInfo> iterator = beanInfoList.iterator();
while (iterator.hasNext()){
BeanInfo next = iterator.next();
//获取pid,通过pid可以把数据加到相应的集合中数据里
String pid1 = next.getPid();
if (pid1.equals(pid)){
next.setOrderId(orderId);
next.setDate(date);
next.setAmount(amount);
k.set(next.toString());
context.write(k,NullWritable.get());
}
}
}
}
最后
以上就是包容乌龟为你收集整理的Join中数据倾斜问题解决的全部内容,希望文章能够帮你解决Join中数据倾斜问题解决所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复