MR生成HFile文件
概述
一、MR生成HFile文件
- package insert.tools.hfile;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class TestHFileToHBase {
- public static class TestHFileToHBaseMapper extends Mapper {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String[] values = value.toString().split("/t", 2);
- byte[] row = Bytes.toBytes(values[0]);
- ImmutableBytesWritable k = new ImmutableBytesWritable(row);
- KeyValue kvProtocol = new KeyValue(row, "PROTOCOLID".getBytes(), "PROTOCOLID".getBytes(), values[1]
- .getBytes());
- context.write(k, kvProtocol);
- // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(),
- // "SRCIP".getBytes(), values[1].getBytes());
- // context.write(k, kvSrcip);
- // HFileOutputFormat.getRecordWriter
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = HBaseConfiguration.create();
- Job job = new Job(conf, "TestHFileToHBase");
- job.setJarByClass(TestHFileToHBase.class);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
- job.setMapperClass(TestHFileToHBaseMapper.class);
- job.setReducerClass(KeyValueSortReducer.class);
- // job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
- job.setOutputFormatClass(HFileOutputFormat.class);
- // job.setNumReduceTasks(4);
- // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);
- // HBaseAdmin admin = new HBaseAdmin(conf);
- // HTable table = new HTable(conf, "hua");
- HFileOutputFormat.configureIncrementalLoad(job, table);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
二、改进后的HFileOutputFormat
源码中的HFileOutputFormat只适合一次生成一个列族的HFile,改进后的HFileOutputFormat适合同时多列族生成HFile文件。有add标签的是在源码上添加代码,有revise标签的是在源码上增加代码。参考:https://review.cloudera.org/r/1272/diff/1/?file=17977#file17977line93
- /**
- * Copyright 2009 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package insert.tools.hfile;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.TreeMap;
- import java.util.TreeSet;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.io.hfile.Compression;
- import org.apache.hadoop.hbase.io.hfile.HFile;
- import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
- import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
- import org.apache.hadoop.hbase.regionserver.StoreFile;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import com.google.common.base.Preconditions;
- /**
- * Writes HFiles. Passed KeyValues must arrive in order. Currently, can only
- * write files to a single column family at a time. Multiple column families
- * requires coordinating keys cross family. Writes current time as the sequence
- * id for the file. Sets the major compacted attribute on created hfiles.
- *
- * @see KeyValueSortReducer
- */
- public class HFileOutputFormat extends
- FileOutputFormat {
- static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
- public RecordWriter getRecordWriter(
- final TaskAttemptContext context) throws IOException,
- InterruptedException {
- // Get the path of the temporary output file
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputdir = new FileOutputCommitter(outputPath, context)
- .getWorkPath();
- Configuration conf = context.getConfiguration();
- final FileSystem fs = outputdir.getFileSystem(conf);
- // These configs. are from hbase-*.xml
- // revise
- // final long maxsize = conf.getLong("hbase.hregion.max.filesize",
- // 268435456);
- // final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
- final long maxsize = conf.getLong("hbase.hregion.max.filesize",
- HConstants.DEFAULT_MAX_FILE_SIZE);
- final int blocksize = conf.getInt("hfile.min.blocksize.size",
- HFile.DEFAULT_BLOCKSIZE);
- // -revise
- // Invented config. Add to hbase-*.xml if other than default
- // compression.
- final String compression = conf.get("hfile.compression",
- Compression.Algorithm.NONE.getName());
- return new RecordWriter() {
- // Map of families to writers and how much has been output on the
- // writer.
- private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(
- Bytes.BYTES_COMPARATOR);
- private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
- private final byte[] now = Bytes
- .toBytes(System.currentTimeMillis());
- // add
- private boolean rollRequested = false;
- // -add
- public void write(ImmutableBytesWritable row, KeyValue kv)
- throws IOException {
- // add
- // null input == user explicitly wants to flush
- if (row == null && kv == null) {
- rollWriters();
- return;
- }
- byte[] rowKey = kv.getRow();
- // -add
- long length = kv.getLength();
- byte[] family = kv.getFamily();
- WriterLength wl = this.writers.get(family);
- // revise
- // if (wl == null
- // || ((length + wl.written) >= maxsize)
- // && Bytes.compareTo(this.previousRow, 0,
- // this.previousRow.length, kv.getBuffer(), kv
- // .getRowOffset(), kv.getRowLength()) != 0) {
- // // Get a new writer.
- // Path basedir = new Path(outputdir, Bytes.toString(family));
- // if (wl == null) {
- // wl = new WriterLength();
- // this.writers.put(family, wl);
- // if (this.writers.size() > 1)
- // throw new IOException("One family only");
- // // If wl == null, first file in family. Ensure family
- // // dir exits.
- // if (!fs.exists(basedir))
- // fs.mkdirs(basedir);
- // }
- // wl.writer = getNewWriter(wl.writer, basedir);
- // LOG
- // .info("Writer="
- // + wl.writer.getPath()
- // + ((wl.written == 0) ? "" : ", wrote="
- // + wl.written));
- // wl.written = 0;
- // }
- // If this is a new column family, verify that the directory
- // exists
- if (wl == null) {
- fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
- }
- // If any of the HFiles for the column families has reached
- // maxsize, we need to roll all the writers
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
- // This can only happen once a row is finished though
- if (rollRequested
- && Bytes.compareTo(this.previousRow, rowKey) != 0) {
- rollWriters();
- }
- // create a new HLog writer, if necessary
- if (wl == null || wl.writer == null) {
- wl = getNewWriter(family);
- }
- // we now have the proper HLog writer. full steam ahead
- // -revise
- kv.updateLatestStamp(this.now);
- wl.writer.append(kv);
- wl.written += length;
- // Copy the row so we know when a row transition.
- // revise
- // this.previousRow = kv.getRow();
- this.previousRow = rowKey;
- // -revise
- }
- // revise
- // /*
- // * Create a new HFile.Writer. Close current if there is one.
- // *
- // * @param writer
- // *
- // * @param familydir
- // *
- // * @return A new HFile.Writer.
- // *
- // * @throws IOException
- // */
- // private HFile.Writer getNewWriter(final HFile.Writer writer,
- // final Path familydir) throws IOException {
- // close(writer);
- // return new HFile.Writer(fs, StoreFile.getUniqueFile(fs,
- // familydir), blocksize, compression,
- // KeyValue.KEY_COMPARATOR);
- // }
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info("Writer="
- + wl.writer.getPath()
- + ((wl.written == 0) ? "" : ", wrote="
- + wl.written));
- close(wl.writer);
- }
- wl.writer = null;
- wl.written = 0;
- }
- this.rollRequested = false;
- }
- /*
- * Create a new HFile.Writer.
- *
- * @param family
- *
- * @return A WriterLength, containing a new HFile.Writer.
- *
- * @throws IOException
- */
- private WriterLength getNewWriter(byte[] family) throws IOException {
- WriterLength wl = new WriterLength();
- Path familydir = new Path(outputdir, Bytes.toString(family));
- wl.writer = new HFile.Writer(fs, StoreFile.getUniqueFile(fs,
- familydir), blocksize, compression,
- KeyValue.KEY_COMPARATOR);
- this.writers.put(family, wl);
- return wl;
- }
- // -revise
- private void close(final HFile.Writer w) throws IOException {
- if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes
- .toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes
- .toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes
- .toBytes(true));
- w.close();
- }
- }
- // revise
- // public void close(TaskAttemptContext c) throws IOException,
- // InterruptedException {
- // for (Map.Entry e : this.writers
- // .entrySet()) {
- // close(e.getValue().writer);
- // }
- // }
- public void close(TaskAttemptContext c) throws IOException,
- InterruptedException {
- for (WriterLength wl : this.writers.values()) {
- close(wl.writer);
- }
- }
- // -revise
- };
- }
- /*
- * Data structure to hold a Writer and amount of data written on it.
- */
- static class WriterLength {
- long written = 0;
- HFile.Writer writer = null;
- }
- /**
- * Return the start keys of all of the regions in this table, as a list of
- * ImmutableBytesWritable.
- */
- private static List getRegionStartKeys(HTable table)
- throws IOException {
- byte[][] byteKeys = table.getStartKeys();
- ArrayList ret = new ArrayList(
- byteKeys.length);
- for (byte[] byteKey : byteKeys) {
- ret.add(new ImmutableBytesWritable(byteKey));
- }
- return ret;
- }
- /**
- * Write out a SequenceFile that can be read by TotalOrderPartitioner that
- * contains the split points in startKeys.
- *
- * @param partitionsPath
- * output path for SequenceFile
- * @param startKeys
- * the region start keys
- */
- private static void writePartitions(Configuration conf,
- Path partitionsPath, List startKeys)
- throws IOException {
- Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed");
- // We're generating a list of split points, and we don't ever
- // have keys < the first region (which has an empty start key)
- // so we need to remove it. Otherwise we would end up with an
- // empty reducer with index 0
- TreeSet sorted = new TreeSet(
- startKeys);
- ImmutableBytesWritable first = sorted.first();
- Preconditions
- .checkArgument(
- first.equals(HConstants.EMPTY_BYTE_ARRAY),
- "First region of table should have empty start key. Instead has: %s",
- Bytes.toStringBinary(first.get()));
- sorted.remove(first);
- // Write the actual file
- FileSystem fs = partitionsPath.getFileSystem(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- partitionsPath, ImmutableBytesWritable.class,
- NullWritable.class);
- try {
- for (ImmutableBytesWritable startKey : sorted) {
- writer.append(startKey, NullWritable.get());
- }
- } finally {
- writer.close();
- }
- }
- /**
- * Configure a MapReduce Job to perform an incremental load into the given
- * table. This
- *
- *
- Inspects the table to configure a total order partitioner
- *
- Uploads the partitions file to the cluster and adds it to the
- * DistributedCache
- *
- Sets the number of reduce tasks to match the current number of
- * regions
- *
- Sets the output key/value class to match HFileOutputFormat's
- * requirements
- *
- Sets the reducer up to perform the appropriate sorting (either
- * KeyValueSortReducer or PutSortReducer)
- *
- * The user should be sure to set the map output value class to either
- * KeyValue or Put before running this function.
- */
- public static void configureIncrementalLoad(Job job, HTable table)
- throws IOException {
- Configuration conf = job.getConfiguration();
- job.setPartitionerClass(TotalOrderPartitioner.class);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
- job.setOutputFormatClass(HFileOutputFormat.class);
- // Based on the configured map output class, set the correct reducer to
- // properly
- // sort the incoming values.
- // TODO it would be nice to pick one or the other of these formats.
- if (KeyValue.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(KeyValueSortReducer.class);
- } else if (Put.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(PutSortReducer.class);
- } else {
- LOG.warn("Unknown map output value type:"
- + job.getMapOutputValueClass());
- }
- LOG.info("Looking up current regions for table " + table);
- List startKeys = getRegionStartKeys(table);
- LOG.info("Configuring " + startKeys.size() + " reduce partitions "
- + "to match current region count");
- job.setNumReduceTasks(startKeys.size());
- Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_"
- + System.currentTimeMillis());
- LOG.info("Writing partition information to " + partitionsPath);
- FileSystem fs = partitionsPath.getFileSystem(conf);
- writePartitions(conf, partitionsPath, startKeys);
- partitionsPath.makeQualified(fs);
- URI cacheUri;
- try {
- cacheUri = new URI(partitionsPath.toString() + "#"
- + TotalOrderPartitioner.DEFAULT_PATH);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
- DistributedCache.addCacheFile(cacheUri, conf);
- DistributedCache.createSymlink(conf);
- LOG.info("Incremental table output configured.");
- }
- }
三、MR生成HFile的注意事项
1. 无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是: 或者< ImmutableBytesWritable, Put>。
2. Map或者reduce的输出类型是KeyValue 或Put对应KeyValueSortReducer或PutSortReducer。
3. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat是改进后的mr,可适用于多列族同时生成HFile文件,源码中只适合一次对单列族组织成HFile文件。
4. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置,SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。
因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
5. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。
四、HFile入库到HBase
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
- import org.apache.hadoop.hbase.util.Bytes;
- public class TestLoadIncrementalHFileToHBase {
- // private static final byte[] TABLE = Bytes.toBytes("hua");
- // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID");
- // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID");
- public static void main(String[] args) throws IOException {
- Configuration conf = HBaseConfiguration.create();
- // byte[] TABLE = Bytes.toBytes("hua");
- byte[] TABLE = Bytes.toBytes(args[0]);
- HTable table = new HTable(TABLE);
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- loader.doBulkLoad(new Path(args[1]), table);
- // loader.doBulkLoad(new Path("/hua/testHFileResult/"), table);
- }
- }
五、HFile入库到HBase注意事项
1. 通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库,入库的第一个参数是表名,第二个参数是HFile的路径(以上MR生成HFile的输出路径),也可一个个列族录入到HBase中对应的表列族。
2. 如何入库的相关链接:
http://hbase.apache.org/docs/r0.89.20100726/bulk-loads.html
http://hbase.apache.org/docs/r0.20.6/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
http://genius-bai.javaeye.com/blog/641927
3. 入库分为代码入库以及脚本入库。代码入库有两种,一种是
hadoop jar hbase-VERSION.jar completebulkload /myoutput mytable;
另外一种是通过以上的TestLoadIncrementalHFileToHBase类。
脚本入库为:jruby $HBASE_HOME/bin/loadtable.rb hbase-mytable hadoop-hbase-hfile-outputdir。
最后
以上就是调皮红酒为你收集整理的MR生成HFile文件的全部内容,希望文章能够帮你解决MR生成HFile文件所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
- 本文分类:Other
- 浏览次数:51 次浏览
- 发布日期:2024-09-29 08:40:02
- 本文链接:https://www.kaopuke.com/article/k-p-k_13_u_7_o_10_fz_13_j_18_3.html
发表评论 取消回复