概述
1. 概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
Java的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制(Writable),精简,高效。不用像java对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。
一个类要支持可序列化只需实现这个接口即可。
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
2. Writable序列化接口
如需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序,此时,自定义的bean实现的接口应该是:
public class FlowBean implements WritableComparable<FlowBean>
需要自己实现的方法是:
/**
* 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
dflow = in.readLong();
sumflow = in.readLong();
}
/**
* 序列化的方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(dflow);
out.writeLong(sumflow);
}
@Override
public int compareTo(FlowBean o) {
//实现按照sumflow的大小倒序排序
return sumflow>o.getSumflow()?-1:1;
}
/**
* 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致
*/
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
dflow = in.readLong();
sumflow = in.readLong();
}
/**
* 序列化的方法
*/
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(dflow);
out.writeLong(sumflow);
}
public int compareTo(FlowBean o) {
//实现按照sumflow的大小倒序排序
return sumflow>o.getSumflow()?-1:1;
}
compareTo方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
例如:o1.compareTo(o2);
返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
总结:
- 概念:对象在进程间或者网络传递的时候,需要转化为字节流 进行传递
- 序列化:对象===》字节流
- 反序列化:字节流===》对象
- java提供的序列化机制:Serializable 只要对象实现该接口就可以序列化
- 弊端:在序列化对象的时候,会附带很多额外的校验信息,包括继承体系,依赖关系等,臃肿 庞重
- 因此Hadoop自己封装了一套序列化机制 Writable
- Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
- HashPartitioner是mapreduce的默认partitioner。计算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
- (例子以jar形式运行)
案例实现:
DataBean类:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataBean implements Writable{
//电话号码
private String phone;
//上行流量
private Long upPayLoad;
//下行流量
private Long downPayLoad;
//总流量
private Long totalPayLoad;
public DataBean(){}
public DataBean(String phone,Long upPayLoad, Long downPayLoad) {
super();
this.phone=phone;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad=upPayLoad+downPayLoad;
}
/**
* 序列化
* 注意:序列化和反序列化的顺序和类型必须一致
*/
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phone);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
/**
* 反序列化
*/
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.phone=in.readUTF();
this.upPayLoad=in.readLong();
this.downPayLoad=in.readLong();
this.totalPayLoad=in.readLong();
}
@Override
public String toString() {
return upPayLoad +"t"+ downPayLoad +"t"+
totalPayLoad;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public Long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(Long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public Long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(Long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public Long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(Long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataBean implements Writable{
//电话号码
private String phone;
//上行流量
private Long upPayLoad;
//下行流量
private Long downPayLoad;
//总流量
private Long totalPayLoad;
public DataBean(){}
public DataBean(String phone,Long upPayLoad, Long downPayLoad) {
super();
this.phone=phone;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad=upPayLoad+downPayLoad;
}
/**
* 序列化
* 注意:序列化和反序列化的顺序和类型必须一致
*/
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phone);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
/**
* 反序列化
*/
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.phone=in.readUTF();
this.upPayLoad=in.readLong();
this.downPayLoad=in.readLong();
this.totalPayLoad=in.readLong();
}
public String toString() {
return upPayLoad +"t"+ downPayLoad +"t"+ totalPayLoad;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public Long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(Long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public Long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(Long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public Long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(Long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
DataCount类:
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.P;
public class DataCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(DataCount.class);
job.setMapperClass(DataCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setReducerClass(DataCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setPartitionerClass(DataPartitioner.class);
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
public static class DataCountMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DataBean>.Context context)
throws IOException, InterruptedException {
String hang=value.toString();
String[] strings=hang.split("t");
String phone=strings[1];
long up=Long.parseLong(strings[2]);
long down=Long.parseLong(strings[3]);
DataBean dataBean=new DataBean(phone,up, down);
context.write(new Text(phone), dataBean);
}
}
public static class DataCountReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text k2, Iterable<DataBean> v2,
Reducer<Text, DataBean, Text, DataBean>.Context context)
throws IOException, InterruptedException {
long upSum=0;
long downSum=0;
for(DataBean dataBean:v2){
upSum += dataBean.getUpPayLoad();
downSum += dataBean.getDownPayLoad();
}
DataBean dataBean=new DataBean(k2.toString(),upSum,downSum);
context.write(new Text(k2), dataBean);
}
}
public static class DataPartitioner extends Partitioner<Text, DataBean>{
private static Map<String,Integer> map=new HashMap<String,Integer>();
static{
/**
* 规则:1表示移动,2表示联通,3表示电信,0表示其他
*/
map.put("134", 1);
map.put("135", 1);
map.put("136", 1);
map.put("137", 1);
map.put("138", 2);
map.put("139", 2);
map.put("150", 3);
map.put("159", 3);
}
@Override
public int getPartition(Text key, DataBean value, int numPartitions) {
// TODO Auto-generated method stub
String tel=key.toString();
String tel_sub=tel.substring(0, 3);
Integer code=map.get(tel_sub);
if(code == null){
code = 0;
}
return code;
}
}
}
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.P;
public class DataCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(DataCount.class);
job.setMapperClass(DataCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setReducerClass(DataCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setPartitionerClass(DataPartitioner.class);
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
public static class DataCountMapper extends Mapper<LongWritable, Text, Text, DataBean>{
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DataBean>.Context context)
throws IOException, InterruptedException {
String hang=value.toString();
String[] strings=hang.split("t");
String phone=strings[1];
long up=Long.parseLong(strings[2]);
long down=Long.parseLong(strings[3]);
DataBean dataBean=new DataBean(phone,up, down);
context.write(new Text(phone), dataBean);
}
}
public static class DataCountReducer extends Reducer<Text, DataBean, Text, DataBean>{
protected void reduce(Text k2, Iterable<DataBean> v2,
Reducer<Text, DataBean, Text, DataBean>.Context context)
throws IOException, InterruptedException {
long upSum=0;
long downSum=0;
for(DataBean dataBean:v2){
upSum += dataBean.getUpPayLoad();
downSum += dataBean.getDownPayLoad();
}
DataBean dataBean=new DataBean(k2.toString(),upSum,downSum);
context.write(new Text(k2), dataBean);
}
}
public static class DataPartitioner extends Partitioner<Text, DataBean>{
private static Map<String,Integer> map=new HashMap<String,Integer>();
static{
/**
* 规则:1表示移动,2表示联通,3表示电信,0表示其他
*/
map.put("134", 1);
map.put("135", 1);
map.put("136", 1);
map.put("137", 1);
map.put("138", 2);
map.put("139", 2);
map.put("150", 3);
map.put("159", 3);
}
public int getPartition(Text key, DataBean value, int numPartitions) {
// TODO Auto-generated method stub
String tel=key.toString();
String tel_sub=tel.substring(0, 3);
Integer code=map.get(tel_sub);
if(code == null){
code = 0;
}
return code;
}
}
}
最后
以上就是震动帅哥为你收集整理的MapReduce的Partitioner编程案例实现:的全部内容,希望文章能够帮你解决MapReduce的Partitioner编程案例实现:所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复