概述
1.设置输出格式 job.setOutputFormatClass(MultiTableOutputFormat.class);
2.reduce时声明表名 ImmutableBytesWritable putTable1 = new ImmutableBytesWritable(Bytes.toBytes("analyzer_w1"));
3.reduce写数据 context.write(putTable1, put);
例子:
package example;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import util.HbaseServiceUtil;
/**
* 多表输出
* @author lijl
*
*/
public class MultiTableOutputMR {
static class MultiTableOutputMapper extends Mapper<ImmutableBytesWritable,Result,ImmutableBytesWritable,Text>{
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
context.write(key, new Text(value.getValue("baseInfo".getBytes(), "aa".getBytes())));
}
}
static class MultiTableOutputReducer extends Reducer<ImmutableBytesWritable,Text,Writable,Put>{
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
ImmutableBytesWritable putTable1 = new ImmutableBytesWritable(Bytes.toBytes("analyzer_w2"));
ImmutableBytesWritable putTable2 = new ImmutableBytesWritable(Bytes.toBytes("analyzer_w3"));
for(Text value:values){
Put put = new Put(key.get());
put.add("baseInfo".getBytes(), "ab".getBytes(),value.toString().getBytes());
context.write(putTable1, put);
context.write(putTable2, put);
}
}
}
static String convertScanToString(Scan scan) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(out);
scan.write(dos);
return Base64.encodeBytes(out.toByteArray());
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
String inputTableName = "analyzer_w1";
// String OutputTableName = "test1";
Scan scan = new Scan();
scan.setCaching(100); // 1 is the default in Scan, which will be bad for
// MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
Job job = new Job(conf);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
job.setMapperClass(MultiTableOutputMapper.class);
job.setReducerClass(MultiTableOutputReducer.class);
job.setNumReduceTasks(2);
job.setJarByClass(MultiTableOutputMR.class);
job.setJobName("MultiTableOutPut");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
最后
以上就是友好抽屉为你收集整理的hbase多表输出的全部内容,希望文章能够帮你解决hbase多表输出所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复