概述
注意点:
1: map输出的一定是两表的外键
2:构造的信息bean要有一个标志位,用来判别现在的bean中的信息是属于哪个表的
下面是实现代码已运行通过
package join;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class InfoBean implements Writable {
public InfoBean() {
}
public void set(String pid, String data, String price, String amount, String productName, String orderId, String categoryId,String flag) {
this.pid = pid;
this.data = data;
this.price = price;
this.amount = amount;
this.productName = productName;
this.orderId = orderId;
this.categoryId = categoryId;
this.flag = flag;
}
private String pid;
private String data;
private String price;
private String amount;
private String productName;
private String orderId;
private String categoryId;
//0 order 1 product
private String flag;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(pid);
out.writeUTF(data);
out.writeUTF(price);
out.writeUTF(amount);
out.writeUTF(productName);
out.writeUTF(orderId);
out.writeUTF(categoryId);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput input) throws IOException {
this.pid = input.readUTF();
this.data = input.readUTF();
this.price = input.readUTF();
this.amount = input.readUTF();
this.productName = input.readUTF();
this.orderId = input.readUTF();
this.categoryId = input.readUTF();
this.flag = input.readUTF();
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public String getPrice() {
return price;
}
public void setPrice(String price) {
this.price = price;
}
public String getAmount() {
return amount;
}
public void setAmount(String amount) {
this.amount = amount;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getCategoryId() {
return categoryId;
}
public void setCategoryId(String categoryId) {
this.categoryId = categoryId;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return "InfoBean{" +
"pid='" + pid + ''' +
", data='" + data + ''' +
", price='" + price + ''' +
", amount='" + amount + ''' +
", productName='" + productName + ''' +
", orderId='" + orderId + ''' +
", categoryId='" + categoryId + ''' +
", flag='" + flag + ''' +
'}';
}
}
这个bean包含了我们要输出的全部信息外加了一个标志位用来说明
mr程序
package join;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DataJoin {
//map输出时key为联系表的外键
static class JoinMapper extends Mapper<LongWritable,Text,Text,InfoBean> {
@Override
protected void map(LongWritable number, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split(",");
InfoBean info = new InfoBean();
Text key = new Text();
//InfoBean为合体bean,所以先判断map进来的是哪种bean
FileSplit split
= (FileSplit) context.getInputSplit();
String filename = split.getPath().getName();
if (filename.startsWith("order")){
info.set(fileds[2],fileds[1],"",fileds[3],"",fileds[0],"","0");
}else {
info.set(fileds[0],"",fileds[3],"",fileds[1],"",fileds[2],"1");
}
String pid = info.getPid();
key.set(pid);
context.write(key,info);
}
}
static class JoinReducer extends Reducer<Text,InfoBean,InfoBean,NullWritable>{
@Override
protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException {
List<InfoBean> infoBeans = new ArrayList<>();
InfoBean productBean = new InfoBean();
for (InfoBean value:values){
String flag = value.getFlag();
if ("0".equals(flag)){
InfoBean bean = new InfoBean();
try {
BeanUtils.copyProperties(bean,value);
infoBeans.add(bean);
} catch (Exception e) {
e.printStackTrace();
}
}else {
try {
BeanUtils.copyProperties(productBean,value);
} catch (Exception e) {
e.printStackTrace();
}
}
}
infoBeans.forEach(
infoBean -> {
infoBean.setProductName(productBean.getProductName());
infoBean.setCategoryId(productBean.getCategoryId());
infoBean.setPrice(productBean.getPrice());
try {
context.write(infoBean,NullWritable.get());
} catch (Exception e) {
e.printStackTrace();
}
}
);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//本地调试模式
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance();
//指定本程序的jar包所在的本地路径
job.setJarByClass(DataJoin.class);
//指定运行的map程序
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
//指定map输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
//最终输出数据类型kv
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);
//指定job的输入原始文件所在目录
//
FileInputFormat.setInputPaths(job,new Path("/wordcount/input"));
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
在上诉的例子中:
当一个商品卖的特别火的时候,他的order表将很多项,这时候通过pid的hashcode决定reducer的时候,就会出现数据倾斜。就是一个reducer处理很多,而其他的很少
解决方法:
使用 map side join(不需要reducer,只使用map)
将产品信息表(比order要小很多很多)放入一个distributedcache中,
然而hadoop已经为我们实现了distributedcache
所以代码改动如下:
1:新建一个product类
public class Product {
private String pid;
private String data;
private String price;
2:map端:
//map输出时key为联系表的外键
static class JoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
Product p = new Product();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File("product.txt"))));
String line = br.readLine();
if (StringUtils.isNotEmpty(line)){
String[] fileds = line.split(",");
p.set(fileds[0],fileds[3],fileds[2],fileds[1]);
}
br.close();
}
@Override
protected void map(LongWritable number, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fileds = line.split(",");
InfoBean info = new InfoBean();
Text key = new Text();
info.set(fileds[2],fileds[1],p.getPrice(),fileds[3],p.getProductName(),fileds[0],p.getCategoryId(),"0");
String pid = info.getPid();
key.set(pid + p.getProductName() + p.getPrice());
context.write(key,NullWritable.get());
}
}
3:job端
Configuration conf = new Configuration();
//本地调试模式
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///");
//
本地提交模式 hdfs在线
//
conf.set("mapreduce.framework.name","local");
//
conf.set("fs.defaultFS","hdfs://master:9000");
Job job = Job.getInstance();
//线上
job.setJarByClass(DataJoin.class);
//调试模式
//
job.setJar("/home/willian/Desktop/project/java/hadoop/out/jar/word.jar");
//指定运行的map程序
job.setMapperClass(JoinMapper.class);
//
job.setReducerClass(JoinReducer.class);
//
//指定map输出数据的kv类型
//
job.setMapOutputKeyClass(Text.class);
//
job.setMapOutputValueClass(InfoBean.class);
//最终输出数据类型kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
//
//
job.addArchiveToClassPath(); //缓存jar包到task节点的classpath
//
job.addFileToClassPath(); //缓存普通文件到task节点的classpath
//
job.addCacheFile(); //缓存普通文件到task节点的工作目录中
job.addCacheFile(new URI("file:///home/willian/Desktop/project/java/hadoop/product.txt"));//或hdfs文件路径
//指定job的输入原始文件所在目录
//
FileInputFormat.setInputPaths(job,new Path("/wordcount/input"));
FileInputFormat.setInputPaths(job, new Path("/home/willian/Desktop/project/java/hadoop/mrlocal/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("/home/willian/Desktop/project/java/hadoop/mrlocal/out"));
Boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
最后
以上就是爱笑烤鸡为你收集整理的hadoop join数据倾斜解决方法的全部内容,希望文章能够帮你解决hadoop join数据倾斜解决方法所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复