概述
hadoop涉及输出文本的默认输出编码统一用没有BOM的UTF-8的形式,但是对于中文的输出window系统默认的是GBK,有些格式文件例如CSV格式的文件用excel打开输出编码为没有BOM的UTF-8文件时,输出的结果为乱码,只能由UE或者记事本打开才能正常显示。因此将hadoop默认输出编码更改为GBK成为非常常见的需求。
自定义 TextOutputFormat.class 子类
TextOutputFormat.class 类代码展示:
package main.java.util;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
/**
*
* <p>Title:hadoop MapReduce 输出结果中文乱码解决</p>
* <p> 功能描述:: </p>
* <p>Company: adteach </p>
* @version 1.0
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8"; // 将UTF-8转换成GBK
private static final byte[] newline;
static {
try {
newline = "n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o; // 将此行代码注释掉
out.write(to.getBytes(), 0, to.getLength()); // 将此行代码注释掉
} else { // 将此行代码注释掉
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
默认的情况下MR主程序中,设定输出编码的设置语句为:
job.setOutputFormatClass(TextOutputFormat.class);
但是注意:
上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat
(注意是
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)
即可,如下代码
package main.java.util;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
/**
*
* <p>
* Title: GbkOutputFormat
* </p>
* <p>
* 功能描述::
* hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat
* (注意是 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)
* </p>
* <p>
* Company: adteach
* </p>
* @version 1.0
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String utf8 = "GBK";
private static final byte[] newline;
static {
try {
newline = "n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
// Text to = (Text) o;
// out.write(to.getBytes(), 0, to.getLength());
// } else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
最后将输出编码类型设置成GbkOutputFormat.class,如:
job.setOutputFormatClass(GbkOutputFormat.class);
代码示例:
package main.java.demo;
import main.java.util.GbkOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class DemoLogETL {
//使用LongWritable而不是Long是因为map读文件写文件都要序列化分序列化,map处理完结果写到磁盘,而reduce读map的结果需要反序列化,所以使用**Writable
//数据类型使用Writable接口,以便于这些类型定义的数据可以被序列化用于网络传输和文件存储。
//静态内部类(静态内部类只能访问外部类的静态成员)
public static class WordCountMapper extends Mapper<LongWritable, Text, LongWritable,Text>{ //前两个输入参数 后两个定义输出
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if(null != value){
// String row = value.toString(); //直接toString()会乱码
// String row=new String(value.getBytes(),0,value.getLength(),"GBK");//读取gbk编码格式的文件inputjsonLog1.txt
String row=new String(value.getBytes(),0,value.getLength(),"utf-8");//读取UTF-8编码格式的文件inputjsonLog.txt
String sub = row.substring(20, row.length() - 20);
context.write(key,new Text(sub));
}
}
}
//dirver(将dirver提出来了)
public int run(String[] args) throws Exception {
//1.get configuration
Configuration conf = new Configuration();
// conf.set("mapreduce.framework.name", "local");
//2.create job
Job job = Job.getInstance(conf,this.getClass().getSimpleName());
// run jar
job.setJarByClass(DemoLogETL.class);
//3.set job (input -> map -> reduce -> output)
//3.1 map
job.setMapperClass(WordCountMapper.class);
//设置Map端输出key类和输出value类
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(GbkOutputFormat.class);
//3.3
//input path
FileInputFormat.addInputPath(job,new Path("input\jsonLog.txt"));
//output path
FileOutputFormat.setOutputPath(job,new Path("DemoLogETL1"));
//submit job 提交任务
boolean isSuccess = job.waitForCompletion(true);//表示打印日志信息
System.out.println(isSuccess);
return isSuccess ? 0 : 1;
}
// run program 运行整个工程
public static void main(String[] args) throws Exception{
int status = new DemoLogETL().run(args);
// 结束程序
System.exit(status);
}
}
参考:https://blog.csdn.net/u014033218/article/details/75413332
最后
以上就是故意香水为你收集整理的hadoop:MapReduce (Writable)输出结果中文乱码解决的全部内容,希望文章能够帮你解决hadoop:MapReduce (Writable)输出结果中文乱码解决所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复