我是靠谱客的博主 精明音响,这篇文章主要介绍storm+hbase实现hbase表列名的动态生成,现在分享给大家,希望可以做个参考。

前言

Storm框架可以通过hbasebolt将实时处理的数据写入hbase中,但是一般需要指定列名(比如:temp:name,再对应将name的值写入),如果列名是根据数据动态生成的(比如:temp:zhuhai_temp,其中zhuhai是从数据中获取的),那么该如何写入hbase呢?

这里举例进行说明,需要注意的是本例仅适用于本篇文章的讲解,不考虑案例的合理性。

案例:对全国不同省份不同城市的温度进行统计,要求Hbase表的设计如下:

表名:tempStatis

行健:省份名拼音_时间戳

列族:temp

列名:城市名拼音_temp

命令:put 'tempStatis','guangdong_1550906523','temp:zhuhai_temp','20'

           put 'tempStatis','guangdong_1550906523','temp:guangzhou_temp','21'

解释:hbase的表tempStatis中写入数据,广东省珠海市的温度,广东省广州市的温度

 

解决方案

1)直接调用hbase api

思路:实现类,使其继承BaseBasicBolt,在该类中调用hbase api,实现hbase数据的写入。

2)仿照SimpleHBaseMapper,实现类,使其实现HBaseMapper接口

思路:参考SimpleHBaseMapper,在方法public ColumnList columns(Tuple tuple){}中根据需求修改如下代码。

复制代码
1
2
3
cols.addColumn(this.columnFamily, field.getBytes(), Utils.toBytes(tuple.getValueByField(field))); cols.addCounter(this.columnFamily, field.getBytes(), Utils.toLong(tuple.getValueByField(field)));

3)仿照HBaseBolt,实现类,使其继承AbstractHBaseBolt

思路:参考HBaseBolt,在方法public void execute(Tuple tuple) {}中根据需求修改代码。

文中主要介绍第三种方案的具体实现。

 

具体实现

1)拓扑图实现

2)Intellij 新建meven项目,pom.xml如下所示:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wn</groupId> <artifactId>TemperTestAnalyser</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <provided.scope>provided</provided.scope> <storm.topology>com.wn.TemperTopo</storm.topology> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.2</version> <scope>${provided.scope}</scope><!--本地运行需要将该行注释--> </dependency> <!--storm+hbase--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>1.0.2</version> <type>jar</type> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>${storm.topology}</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>

3)代码实现

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.wn; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper; import org.apache.storm.topology.TopologyBuilder; import java.util.HashMap; import java.util.Map; public class TemperTopo { public static void main(String[] args) { TemperTopo temperTopo = new TemperTopo(); StormTopology stormTopology = temperTopo.buildTopology(); Config config = new Config(); //hbase配置 Map<String, Object> hbConfig = new HashMap<String, Object>(); hbConfig.put("hbase.rootdir", "hdfs://******/hbase"); hbConfig.put("hbase.zookeeper.quorum", "******"); hbConfig.put("hbase.zookeeper.property.clientPort", "2181"); config.put("hbConfig", hbConfig); config.setMessageTimeoutSecs(500); try { if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, stormTopology); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("temperTopo", config, stormTopology); } } catch (Exception e) { e.printStackTrace(); } } //构建拓扑 public StormTopology buildTopology() { TopologyBuilder builder = new TopologyBuilder(); System.setProperty("HADOOP_USER_NAME", "******"); //模拟数据源spout String spoutId = "spoutId"; builder.setSpout(spoutId, new SLSpout(),1); SimpleHBaseMapper mapper = new SimpleHBaseMapper(); String tableName = "tempStatis"; HbaseCusBolt hbaseCusBolt = new HbaseCusBolt(tableName, mapper).withBatchSize(10).withConfigKey("hbConfig"); builder.setBolt("boltId", hbaseCusBolt, 1).shuffleGrouping(spoutId); return builder.createTopology(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.wn; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; /** * 产生测试数据 */ public class SLSpout extends BaseRichSpout { private SpoutOutputCollector collector; private String[] sentences = { "{"provinceName": "guangdong","tm": "1550806523","temperDatas": [{"cityName": "zhuhai","temperValue": "14"},{"cityName": "shenzhen","temperValue": "16"}]}", "{"provinceName": "shanxi","tm": "1550806523","temperDatas": [{"cityName": "yuncheng","temperValue": "12"},{"cityName": "taiyuan","temperValue": "7"},{"cityName": "datong","temperValue": "3"}]}", "{"provinceName": "henan","tm": "1550806523","temperDatas": [{"cityName": "luoyang","temperValue": "9"},{"cityName": "zhengzhou","temperValue": "11"},{"cityName": "xinyang","temperValue": "11"}]}" }; private int index = 0; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } @Override public void nextTuple() { this.collector.emit(new Values(sentences[index])); index++; if (index >= sentences.length) { index = 0; } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.wn; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.storm.hbase.bolt.AbstractHBaseBolt; import org.apache.storm.hbase.bolt.mapper.HBaseMapper; import org.apache.storm.hbase.common.ColumnList; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.BatchHelper; import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.List; import java.util.Map; import static org.apache.storm.hbase.common.Utils.toBytes; public class HbaseCusBolt extends AbstractHBaseBolt { private static final Logger logger = LoggerFactory.getLogger(HbaseCusBolt.class); private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; boolean writeToWAL = true;//是否写入到滚动日志(在hbase中滚动日志的存在能够确保在断电等紧急情况发生后,重新开机数据不丢失;但是会降低吞吐量) List<Mutation> batchMutations = new LinkedList(); int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; int batchSize; BatchHelper batchHelper; public HbaseCusBolt(String tableName, HBaseMapper mapper) { super(tableName,mapper); } public HbaseCusBolt writeToWAL(boolean writeToWAL) { this.writeToWAL = writeToWAL; return this; } public HbaseCusBolt withConfigKey(String configKey) { this.configKey = configKey; return this; } public HbaseCusBolt withBatchSize(int batchSize) { this.batchSize = batchSize; return this; } public HbaseCusBolt withFlushIntervalSecs(int flushIntervalSecs) { this.flushIntervalSecs = flushIntervalSecs; return this; } public Map<String, Object> getComponentConfiguration() { return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs); } //可以对比org.apache.storm.hbase.bolt.HbaseBolt类,rowKey,ColumnList的获取有改动 public void execute(Tuple tuple) { try { if (this.batchHelper.shouldHandle(tuple)) { String initData = tuple.getString(0); //************获取temper data*************** Gson gson = new Gson(); java.lang.reflect.Type typeClass = new TypeToken<Temper>() {}.getType(); Temper tempers = gson.fromJson(initData, typeClass); List<Temper.TemperData> temperDatas = tempers.temperDatas; ColumnList cols = new ColumnList(); String columnFamily = "temp";//列族 for (Temper.TemperData temperData : temperDatas) { String cityName = temperData.cityName; String temperValue = temperData.temperValue; cols.addColumn(toBytes(columnFamily), toBytes(cityName + "_temp"), toBytes(temperValue)); } String rowkey = tempers.provinceName+"_"+tempers.tm;//行健 List<Mutation> mutations = this.hBaseClient.constructMutationReq(toBytes(rowkey), cols, this.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); this.batchMutations.addAll(mutations); this.batchHelper.addBatch(tuple); } if (this.batchHelper.shouldFlush()) { this.hBaseClient.batchMutate(this.batchMutations); logger.debug("acknowledging tuples after batchMutate"); this.batchHelper.ack(); this.batchMutations.clear(); } } catch (Exception e) { this.batchHelper.fail(e); this.batchMutations.clear(); } } public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { super.prepare(map, topologyContext, collector); this.batchHelper = new BatchHelper(batchSize, collector); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.wn; import java.util.List; public class Temper{ public String provinceName; public String tm;//时间戳 public List<TemperData> temperDatas; public static class TemperData { public String cityName;//城市名称 public String temperValue;//温度值 } }

4)运行结果

scan 'tempStatis'

最后

以上就是精明音响最近收集整理的关于storm+hbase实现hbase表列名的动态生成的全部内容,更多相关storm+hbase实现hbase表列名内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(78)

评论列表共有 0 条评论

立即
投稿
返回
顶部