我是靠谱客的博主 单身小熊猫,这篇文章主要介绍计算HBase指定表中所有用户的平均年龄写入到HDFS中,现在分享给大家,希望可以做个参考。

HBase表中的数据是通过上一篇博客导入的:从HDFS读取文件中的数据写入到HBase的表中

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HBASE2HDFS extends Configured implements Tool{ private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum"; private static final String ZK_CONNECT_VALUE = "hadoop2:2181,hadoop3:2181,hadoop4:2181"; @Override public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://myha1"); conf.addResource("config/core-site.xml"); conf.addResource("config/hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); FileSystem fs = FileSystem.get(conf); Configuration config = HBaseConfiguration.create(conf); config.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE); Job job = Job.getInstance(config); job.setJarByClass(HBASE2HDFS.class); Scan scan = new Scan(); scan.addColumn("info".getBytes(), "age".getBytes()); TableMapReduceUtil.initTableMapperJob("users".getBytes(), scan, H2H_Mapper.class, Text.class, IntWritable.class, job, false); job.setReducerClass(H2H_Reducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); if(fs.exists(new Path("/student/output"))){ fs.delete(new Path("/student/output"),true); } FileOutputFormat.setOutputPath(job, new Path("/student/output")); boolean isDone = job.waitForCompletion(true); return isDone ? 0:1; } public static void main(String[] args) throws Exception { int isDone = ToolRunner.run(new HBASE2HDFS(), args); System.exit(isDone); } /** * get 'stduent','rk01' ==== Result * * 需求:读出所有的记录(Result),然后提取出对应的 age 信息 * * mapper阶段的 * * 输入: 从hbase来 * * key : rowkey * value : result * * ImmutableBytesWritable, Result * * 输出: hdfs * * key : age * value : 年龄值 * * reducer阶段: * * 输入: * * key : "age" * value : 年龄值 = 18 * * 输出: * * key: NullWritbale * value: 平均 */ public static class H2H_Mapper extends TableMapper<Text, IntWritable>{ Text outkey = new Text("age"); @Override protected void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException { boolean containsColumn = value.containsColumn("info".getBytes(), "age".getBytes()); if(containsColumn){ List<Cell> cells = value.getColumnCells("info".getBytes(), "age".getBytes()); //每次读取一行记录,从这行记录中获取age对应的值, Cell cell = cells.get(0); byte[] cloneValue = CellUtil.cloneValue(cell); String age = Bytes.toString(cloneValue); context.write(outkey, new IntWritable(Integer.parseInt(age))); } } } public static class H2H_Reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; int sum = 0; for (IntWritable iw : values) { count ++; sum += iw.get(); } double avg = sum * 1D / count; context.write(key, new DoubleWritable(avg)); } } }

最后

以上就是单身小熊猫最近收集整理的关于计算HBase指定表中所有用户的平均年龄写入到HDFS中的全部内容,更多相关计算HBase指定表中所有用户内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部