概述
HBase表中的数据是通过上一篇博客导入的:从HDFS读取文件中的数据写入到HBase的表中
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBASE2HDFS extends Configured implements Tool{
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "hadoop2:2181,hadoop3:2181,hadoop4:2181";
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://myha1");
conf.addResource("config/core-site.xml");
conf.addResource("config/hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
FileSystem fs = FileSystem.get(conf);
Configuration config = HBaseConfiguration.create(conf);
config.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
Job job = Job.getInstance(config);
job.setJarByClass(HBASE2HDFS.class);
Scan scan = new Scan();
scan.addColumn("info".getBytes(), "age".getBytes());
TableMapReduceUtil.initTableMapperJob("users".getBytes(), scan, H2H_Mapper.class, Text.class, IntWritable.class, job, false);
job.setReducerClass(H2H_Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
if(fs.exists(new Path("/student/output"))){
fs.delete(new Path("/student/output"),true);
}
FileOutputFormat.setOutputPath(job, new Path("/student/output"));
boolean isDone = job.waitForCompletion(true);
return isDone ? 0:1;
}
public static void main(String[] args) throws Exception {
int isDone = ToolRunner.run(new HBASE2HDFS(), args);
System.exit(isDone);
}
/**
*
get 'stduent','rk01'
====
Result
*
*
需求:读出所有的记录(Result),然后提取出对应的 age 信息
*
*
mapper阶段的
*
*
输入: 从hbase来
*
*
key :
rowkey
*
value :
result
*
*
ImmutableBytesWritable, Result
*
*
输出: hdfs
*
*
key :
age
*
value : 年龄值
*
*
reducer阶段:
*
*
输入:
*
*
key :
"age"
*
value :
年龄值 = 18
*
*
输出:
*
*
key: NullWritbale
*
value: 平均
*/
public static class H2H_Mapper extends TableMapper<Text, IntWritable>{
Text outkey = new Text("age");
@Override
protected void map(ImmutableBytesWritable key, Result value,Context context)
throws IOException, InterruptedException {
boolean containsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
if(containsColumn){
List<Cell> cells = value.getColumnCells("info".getBytes(), "age".getBytes());
//每次读取一行记录,从这行记录中获取age对应的值,
Cell cell = cells.get(0);
byte[] cloneValue = CellUtil.cloneValue(cell);
String age = Bytes.toString(cloneValue);
context.write(outkey, new IntWritable(Integer.parseInt(age)));
}
}
}
public static class H2H_Reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int count = 0;
int sum = 0;
for (IntWritable iw : values) {
count ++;
sum += iw.get();
}
double avg = sum * 1D / count;
context.write(key, new DoubleWritable(avg));
}
}
}
最后
以上就是单身小熊猫为你收集整理的计算HBase指定表中所有用户的平均年龄写入到HDFS中的全部内容,希望文章能够帮你解决计算HBase指定表中所有用户的平均年龄写入到HDFS中所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复