我是靠谱客的博主 细腻水壶,最近开发中收集的这篇文章主要介绍MapReduce作业3,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

第一题:请简述一下shuffle-map端的流程

每个map任务都会有一个环形缓冲区存储数据。环形缓冲区的默认大小为100MB,阈值为80MB。当达到阈值时,会有一个后台线程把内容溢写到磁盘中。剩下的数据继续写入剩余20MB中。如果20MB写满后会进入阻塞状态。

在溢写前,会先根据分区器的逻辑将数据分区。每个分区在内存中会进行排序(QuickSort,默认是字典排序)。如果指定了combiner函数,那么它就在排序后的输出上运行。使得map输出更加紧凑

如果至少存在3个溢出文件时,则combiner就会在输出文件写到磁盘之前再次运行。如果只有1或2个溢出文件的话。不会运行map。最终合并(归并算法)成一个最终的临时文件。然后写入到磁盘。

为了使磁盘读写的速度更快,节约磁盘空间。在溢写到磁盘前对数据进行压缩,在进行传输

第二题:请简述一下shuffle-reduce端的流程

在每个mapTask完成后,reduceTask会利用线程开始fetch属于自己要处理的分区的数据,线程默认是5个,线程数量是针对每一个节点来说的,线程通过http协议抓取数据

如果抓过来的数据相当小,会被复制到reduce任务JVM的内存中进行归并排序。输入给reduce函数

如果数据量过大,则会直接拷贝到本地磁盘上,然后讲多个文件合并成较大的文件,合并因子是10,合并时采用的算法是归并算法。(最后一次合并一定要满足10这个因子,而且不会溢写成文件,直接输出给reduce)

在归并算法合并时,如果map端的数据是压缩的,那么要在reduceTask的内存中解压缩在合并。

reduce处理后将数据存储到HFDS上。

第三题:问题描述

利用MR将所有的电影信息进行排序,排序规则:先按照uid排序,如果相同,按照电影id排序(这就是二次排序)

代码

package mr.MovieRating;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 根据数据格式:{"movie":"1193","rate":"5","datetime":"978300760","uid":"1"}
* 可以封装成一个hadoop类型RateBean,一个对象对应一行记录
*/
public class MovieBean implements WritableComparable<MovieBean> {
private int movie;
private int rate;
private String datetime;
private int uid;
public MovieBean() {
}
public MovieBean(int movie, int rate, String datetime, int uid) {
this.movie = movie;
this.rate = rate;
this.datetime = datetime;
this.uid = uid;
}
public int getMovie() {
return movie;
}
public void setMovie(int movie) {
this.movie = movie;
}
public int getRate() {
return rate;
}
public void setRate(int rate) {
this.rate = rate;
}
public String getDatetime() {
return datetime;
}
public void setDatetime(String datetime) {
this.datetime = datetime;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
@Override
public int compareTo(MovieBean o) {
return uid == o.uid ? o.movie - movie : o.uid - uid;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(movie);
dataOutput.writeInt(rate);
dataOutput.writeUTF(datetime);
dataOutput.writeInt(uid);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
movie = dataInput.readInt();
rate = dataInput.readInt();
datetime = dataInput.readUTF();
uid = dataInput.readInt();
}
@Override
public String toString() {
return uid + "t" + movie + "t" + rate;
}
}
package mr.MovieRating;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class RateDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(RateDriver.class);
job.setMapperClass(RateMapper.class);
job.setReducerClass(RateReduce.class);
job.setOutputKeyClass(MovieBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(RateComparator.class);
FileInputFormat.addInputPath(job, new Path("src\main\resources\rating.json"));
Path outputPath = new Path("D:/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
//使用args参数的第二个元素充当输出路径,灵活
FileOutputFormat.setOutputPath(job, outputPath);
//提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class RateMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> {
ObjectMapper mapper;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mapper = new ObjectMapper();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
MovieBean movieBean = mapper.readValue(value.toString(), MovieBean.class);
//写出k2v2
context.write(movieBean, NullWritable.get());
}
}
public static class RateReduce extends Reducer<MovieBean, NullWritable, MovieBean, NullWritable> {
@Override
protected void reduce(MovieBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
//自定义分组器
public static class RateComparator extends WritableComparator {
public RateComparator() {//将要重新定义的分组的key的类型传进来
super(MovieBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
MovieBean m1 = (MovieBean) a;
MovieBean m2 = (MovieBean) b;
return m1.getUid() - m2.getUid();
}
}
}

第四题:问题描述

1、自定义FileInputFormat

案例需求:统计奇数行的sum和偶数行的sum,数据

f1.txt
12
13
24
123
234
56
1
35
6
使用默认的输入规则TextInputFormat不能完成上述需求

案例分析:

- k1不再是行偏移量,让其成为行号:1,2,3,4,5
- v1是行数据
- 数据扭转
<k1,v1>-->map--><k2,v2>
-->reduce-->
<k3,v3>
1
12
奇数,[12,24,234,...]
奇数:total
2
13
偶数,[13,123,56,...]
偶数:total
3
24
.....

代码

package mr.a;
import com.google.common.io.LineReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.io.InputStreamReader;
public class OddEvenLineDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取配置信息
Configuration conf = new Configuration();
//获取job对象
Job job = Job.getInstance(conf, "sum_count");
//设置驱动类型
job.setJarByClass(OddEvenLineDriver.class);
//设置mapper和reducer类型
job.setMapperClass(SumMapper.class);
job.setReducerClass(SumReducer.class);
//设置reduce的输出k3和v3的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置自定义输入规则
job.setInputFormatClass(OddEvenLineInputFormat.class);
//设置输入和输出路径
FileInputFormat.addInputPath(job, new Path("src\main\resources\f1.txt"));
Path outPath = new Path("D:/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
//提交等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text k2 = new Text();
LongWritable v2 = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//转成数字
int num = Integer.parseInt(value.toString());
//判断奇偶
if (key.get() % 2 == 0) {
//设置k2
k2.set("偶数行");
} else {
k2.set("奇数行");
}
//设置v2
v2.set(num);
//写出
context.write(k2, v2);
}
}
public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;//统计所有奇数或者偶数的和
for (LongWritable value : values) {
//取出元素,累加到sum中
sum += value.get();
}
//将和与个数,拼接成字符串,并封装成v3
LongWritable v3 = new LongWritable(sum);
//写出去
context.write(key, v3);
}
}
}
/**
* 自定义输入规则, 继承FileInputFormat,泛型是K1,V1的类型
*/
class OddEvenLineInputFormat extends FileInputFormat<LongWritable, Text> {
/**
* 重写createRecordReader方法,返回一个自定义的阅读器
*
* @param split
* @param context
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new OddEvenLineRecordReader();
}
/**
* 重写isSplitable ,表示要统计的文件是否支持切分,如果支持切分,就会有很多分片,如果不支持,就只有一个分片
*
* @param context
* @param file
* @return
*/
@Override
protected boolean isSplitable(JobContext context, Path file) {
return true; //支持
}
}
/**
* 自定义记录阅读器类型。
*/
class OddEvenLineRecordReader extends RecordReader<LongWritable, Text> {
private long start;
//分片记录的开始位置
private long end;
//分片记录的结束位置
private LineReader in;
//读取一行的输入流
private FSDataInputStream fileIn; //读取hdfs上的文件的输入流
private LongWritable key;
private Text value;
private long lineNum;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//获取路径
FileSplit fileSplit = (FileSplit) split;
start = fileSplit.getStart();
//初始化start
end = start + fileSplit.getLength();
//通过路径打开输入流
Path path = fileSplit.getPath();
Configuration conf = context.getConfiguration();
FileSystem fileSystem = FileSystem.get(conf);
fileIn = fileSystem.open(path); //初始化读取hdfs上的文件的输入流
fileIn.seek(start);
in = new LineReader(new InputStreamReader(fileIn)); //将filein包装成按行读取的流
key = new LongWritable();
value = new Text();
lineNum = 1;
}
/**
* 该方法是真正的在读取数据
*
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//读取每行数据
key.set(lineNum++);
String str = "";
if ((str = in.readLine()) != null) {
value.set(str);
return true;
} else {
return false;
}
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
fileIn.close();
}
}

第五题:问题描述

1、求每个员工的基本信息及其所在部门信息(reduce-join,map-jion)

dept.txt

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

emp.txt

7369,SMITH,CLERK,7902,1980-12-17,800,null,20
7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30
7566,JONES,MANAGER,7839,1981-04-02,2975,null,20
7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30
7782,CLARK,MANAGER,7839,1981-06-09,2450,null,10
7788,SCOTT,ANALYST,7566,1987-04-19,3000,null,20
7839,KING,PRESIDENT,null,1981-11-17,5000,null,10
7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30
7876,ADAMS,CLERK,7788,1987-05-23,1100,null,20
7900,JAMES,CLERK,7698,1981-12-03,950,null,30
7902,FORD,ANALYST,7566,1981-12-02,3000,null,20
7934,MILLER,CLERK,7782,1982-01-23,1300,null,10
reduce-join代码
package mr.emp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* reduce端进行join:
* 在map函数中读取数据时,行内容要打上标记,标记为文件名。
* K2为关联字段的值,也就是部门号。
* <p>
* <p>
* 10
员工信息
* 10
员工信息
* 10
部门信息
* <p>
* 在reduce端,判断这些数据时那些文件里的,将其封装到两个集合中。
* 然后使用双层for循环进行拼接到一起。
* <p>
* <p>
* 优点:
思路简单,容易编写
* 缺点:
在join时,不需要的数据可能很多,但是还是经过了shuffle阶段,传入到reduce端。产生了大量的网络IO。
*/
public class ReduceJoinDriver {
public static void main(String[] args) {
try {
//获取配置信息
Configuration conf = new Configuration();
//获取job对象
Job job = Job.getInstance(conf);
//设置驱动类型
job.setJarByClass(ReduceJoinDriver.class);
//设置mapper和reducer类型
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
//设置map的输出k2和v2的类型
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//设置reduce的输出k3和v3的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置输入和输出路径
FileInputFormat.addInputPath(job, new Path("src\main\resources\input"));
Path outPath = new Path("D:/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
//提交等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
static class ReduceJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取文件名
FileSplit fileSplit = (FileSplit) context.getInputSplit();
Path path = fileSplit.getPath();
//解析行内容
String line = value.toString().trim();
String[] data = line.split(",");
int deptno = -1;
String label = "";
if (path.toString().contains("emp")) {
deptno = Integer.parseInt(data[data.length - 1]);
label = "emp";
} else if (path.toString().contains("dept")) {
deptno = Integer.parseInt(data[0]);
label = "dept";
}
//封装K2
IntWritable k2 = new IntWritable(deptno);
line = label + "," + line;
//封装成v2
Text v2 = new Text(line);
context.write(k2, v2);
}
}
/**
* k2:
部门号
* v2:
emp,..........
或者是 dept,..........
*/
static class ReduceJoinReducer extends Reducer<IntWritable, Text, Text, NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> emps = new ArrayList<>();
List<String> depts = new ArrayList<>();
String line = "";
for (Text value : values) {
line = value.toString().trim();
if (line.startsWith("emp")) {
emps.add(line.substring(line.indexOf(",") + 1));
} else if (line.startsWith("dept")) {
depts.add(line.substring(line.indexOf(",") + 1));
}
}
//利用双层for循环,进行笛卡尔积拼接
for (String emp : emps) {
for (String dept : depts) {
context.write(new Text(emp + "," + dept), NullWritable.get());
}
}
}
}
}
map-jion代码
package mr.emp;
import com.google.common.io.LineReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.Buffer;
import java.util.HashMap;
import java.util.Map;
/**
* Map端的join操作:
* 由于reduce端join可能会产生不必要的大量的网络IO。所以可以提前在map端join。
* 思路:
* 可以使用io,先将小文件读取到mapTask运行前的内存中。然后在map函数中与其他文件进行join连接。连接后的数据输出给reduce端。
* 没有join上的数据,就直接过滤不要了。
*/
public class MapJoinDriver {
public static void main(String[] args) {
Configuration configuration = new Configuration();
try {
Job job = Job.getInstance(configuration, "map-join");
job.setJarByClass(MapJoinDriver.class);
//先将小文件加载到缓存中
Path path = new Path("src\main\resources\input\dept.txt");
URI uri = path.toUri();
job.addCacheFile(uri); //将要读取的小文件路径添加到缓存中。
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path("src\main\resources\input\emp.txt"));
Path outPath = new Path("D:/output");
FileSystem fs = FileSystem.get(configuration);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
//提交等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<Integer, String> map = new HashMap<>();
Text k2 = new Text();
/**
* 使用该方法,来提前加载小文件到内存中,存储到HashMap对象里,
关联字段作为key, 内容
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
URI uri = context.getCacheFiles()[0];
String path = uri.getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
String line = "";
while ((line = br.readLine()) != null) {
line = line.trim();
int k = Integer.parseInt(line.split(",")[0]);
map.put(k, line);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().trim();
String[] strs = line.split(","); //将emp的记录转成数组
int deptno = Integer.parseInt(strs[strs.length - 1]);
if (map.containsKey(deptno)) {
String newLine = line + "," + map.get(deptno);
k2.set(newLine);
context.write(k2, NullWritable.get());
}
}
}
}

最后

以上就是细腻水壶为你收集整理的MapReduce作业3的全部内容,希望文章能够帮你解决MapReduce作业3所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部