概述
实现思路:
1、在 MapReduce 中访问外部资源
2、 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write()
package mapreduce.format.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MultipleOutputMR {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MultipleOutputMR.class);
job.setMapperClass(MultipleOutputMRMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 自定义OutputFormat组件
job.setOutputFormatClass(LongOutputFormat.class);
FileInputFormat.setInputPaths(job, args[0]);
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job, outPath);
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
private static class MultipleOutputMRMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// value
// 参考次数大于7次算合格
String[] splits = value.toString().split("t");
if(splits.length > 9){
context.write(new Text("1::" + value.toString()), NullWritable.get());
}else{
context.write(new Text("2::" + value.toString()), NullWritable.get());
}
}
}
}
自定义OutputFormat组件:
package mapreduce.format.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LongOutputFormat extends FileOutputFormat<Text,NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
// 通过方法的参数job对象能获取Configuration
Configuration configuration = job.getConfiguration();
// 通过Configuration对象能获取文件系统对象FileSystem
FileSystem fs = FileSystem.get(configuration);
Path p1 = new Path("");
Path p2 = new Path("");
// 通过文件系统对象能获取输出流
FSDataOutputStream out1 = fs.create(p1);
FSDataOutputStream out2 = fs.create(p2);
// 通过构造方法传入MyRecordWriter中write方法所需要往外写数据的输出流out1 和 out2
return new MyRecordWriter(out1, out2);
}
static class MyRecordWriter extends RecordWriter<Text,NullWritable>{
FSDataOutputStream fsdout = null;
FSDataOutputStream fsdout1 = null;
public MyRecordWriter(FSDataOutputStream fsdout, FSDataOutputStream fsdout1) {
super();
this.fsdout = fsdout;
this.fsdout1 = fsdout1;
}
// write方法的逻辑是我们自定义的,如果记录中含有1,表示写往第一个文件,那就通过一个输出流fsdout
// 如果记录中含有2,则表示写往第二个文件,则通过输出流fsdout1来往目的地写
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String[] strs = key.toString().split("::");
if(strs[0].equals("1")){
fsdout.write((strs[1] + "n").getBytes());
}else{
fsdout1.write((strs[1] + "n").getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(fsdout);
IOUtils.closeStream(fsdout1);
}
}
}
例如:
// 参考次数大于 7 次算合格
package mapreduce.format.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ScoreOutputFormatMR extends Configured implements Tool {
// 这个run方法就相当于Driver
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(conf);
job.setMapperClass(ScoreOutputFormatMRMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
// 这就是默认的输入输出组件
job.setInputFormatClass(TextInputFormat.class);
// 这是默认往外输出数据的组件
// job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(MyScoreOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path output = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job, output);
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new ScoreOutputFormatMR(),args);
System.exit(run);
}
private static class ScoreOutputFormatMRMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split(",");
if(split.length - 2 >= 6){
context.write(new Text("1::" + value.toString()), NullWritable.get());
}else{
context.write(new Text("2::" + value.toString()), NullWritable.get());
}
}
}
}
自定义 OutputFormat:
package mapreduce.format.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.zookeeper.common.IOUtils;
public class MyScoreOutputFormat extends TextOutputFormat<Text,NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
// 通过方法的参数job对象能获取Configuration
Configuration configuration = job.getConfiguration();
// 通过Configuration对象能获取文件系统对象FileSystem
FileSystem fs = FileSystem.get(configuration);
Path p1 = new Path("/input/score/outputFormat/output1");
Path p2 = new Path("/input/score/outputFormat/output2");
if(fs.exists(p1)){
fs.delete(p1, true);
}
if(fs.exists(p2)){
fs.delete(p2,true);
}
// 通过文件系统对象能获取输出流
FSDataOutputStream fsout1 = fs.create(p1);
FSDataOutputStream fsout2 = fs.create(p2);
// 通过构造方法传入MyRecordWriter中write方法所需要往外写数据的输出流out1 和 out2
return new MyRecordWriter(fsout1,fsout2);
}
private static class MyRecordWriter extends RecordWriter<Text, NullWritable>{
FSDataOutputStream dout1 = null;
FSDataOutputStream dout2 = null;
public MyRecordWriter(FSDataOutputStream dout1, FSDataOutputStream dout2) {
this.dout1 = dout1;
this.dout2 = dout2;
}
// write方法的逻辑是我们自定义的,如果记录中含有1,表示写往第一个文件,那就通过一个输出流dout1
// 如果记录中包含有2,则表示写往第二个文件,则通过输出流dout2来往目的地写
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String[] strs = key.toString().split("::");
if(strs[0].equals("1")){
dout1.writeBytes(strs[1] + "n");
}else{
dout2.writeBytes(strs[1] + "n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(dout1);
IOUtils.closeStream(dout2);
}
}
}
最后
以上就是文静项链为你收集整理的自定义 OutputFormat--数据分类输出的全部内容,希望文章能够帮你解决自定义 OutputFormat--数据分类输出所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复