概述
前言
Storm框架可以通过hbasebolt将实时处理的数据写入hbase中,但是一般需要指定列名(比如:temp:name,再对应将name的值写入),如果列名是根据数据动态生成的(比如:temp:zhuhai_temp,其中zhuhai是从数据中获取的),那么该如何写入hbase呢?
这里举例进行说明,需要注意的是本例仅适用于本篇文章的讲解,不考虑案例的合理性。
案例:对全国不同省份不同城市的温度进行统计,要求Hbase表的设计如下:
表名:tempStatis |
行健:省份名拼音_时间戳 |
列族:temp |
列名:城市名拼音_temp |
命令:put 'tempStatis','guangdong_1550906523','temp:zhuhai_temp','20' put 'tempStatis','guangdong_1550906523','temp:guangzhou_temp','21' 解释:hbase的表tempStatis中写入数据,广东省珠海市的温度,广东省广州市的温度 |
解决方案
1)直接调用hbase api
思路:实现类,使其继承BaseBasicBolt,在该类中调用hbase api,实现hbase数据的写入。
2)仿照SimpleHBaseMapper,实现类,使其实现HBaseMapper接口
思路:参考SimpleHBaseMapper,在方法public ColumnList columns(Tuple tuple){}中根据需求修改如下代码。
cols.addColumn(this.columnFamily, field.getBytes(), Utils.toBytes(tuple.getValueByField(field)));
cols.addCounter(this.columnFamily, field.getBytes(), Utils.toLong(tuple.getValueByField(field)));
3)仿照HBaseBolt,实现类,使其继承AbstractHBaseBolt
思路:参考HBaseBolt,在方法public void execute(Tuple tuple) {}中根据需求修改代码。
文中主要介绍第三种方案的具体实现。
具体实现
1)拓扑图实现
2)Intellij 新建meven项目,pom.xml如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wn</groupId>
<artifactId>TemperTestAnalyser</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<provided.scope>provided</provided.scope>
<storm.topology>com.wn.TemperTopo</storm.topology>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>${provided.scope}</scope><!--本地运行需要将该行注释-->
</dependency>
<!--storm+hbase-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.0.2</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3)代码实现
package com.wn;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
import org.apache.storm.topology.TopologyBuilder;
import java.util.HashMap;
import java.util.Map;
public class TemperTopo {
public static void main(String[] args) {
TemperTopo temperTopo = new TemperTopo();
StormTopology stormTopology = temperTopo.buildTopology();
Config config = new Config();
//hbase配置
Map<String, Object> hbConfig = new HashMap<String, Object>();
hbConfig.put("hbase.rootdir", "hdfs://******/hbase");
hbConfig.put("hbase.zookeeper.quorum", "******");
hbConfig.put("hbase.zookeeper.property.clientPort", "2181");
config.put("hbConfig", hbConfig);
config.setMessageTimeoutSecs(500);
try {
if (args != null && args.length > 0) {
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, stormTopology);
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("temperTopo", config, stormTopology);
}
} catch (Exception e) {
e.printStackTrace();
}
}
//构建拓扑
public StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
System.setProperty("HADOOP_USER_NAME", "******");
//模拟数据源spout
String spoutId = "spoutId";
builder.setSpout(spoutId, new SLSpout(),1);
SimpleHBaseMapper mapper = new SimpleHBaseMapper();
String tableName = "tempStatis";
HbaseCusBolt hbaseCusBolt = new HbaseCusBolt(tableName, mapper).withBatchSize(10).withConfigKey("hbConfig");
builder.setBolt("boltId", hbaseCusBolt, 1).shuffleGrouping(spoutId);
return builder.createTopology();
}
}
package com.wn;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* 产生测试数据
*/
public class SLSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"{"provinceName": "guangdong","tm": "1550806523","temperDatas": [{"cityName": "zhuhai","temperValue": "14"},{"cityName": "shenzhen","temperValue": "16"}]}",
"{"provinceName": "shanxi","tm": "1550806523","temperDatas": [{"cityName": "yuncheng","temperValue": "12"},{"cityName": "taiyuan","temperValue": "7"},{"cityName": "datong","temperValue": "3"}]}",
"{"provinceName": "henan","tm": "1550806523","temperDatas": [{"cityName": "luoyang","temperValue": "9"},{"cityName": "zhengzhou","temperValue": "11"},{"cityName": "xinyang","temperValue": "11"}]}"
};
private int index = 0;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index >= sentences.length) {
index = 0;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
package com.wn;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.storm.hbase.bolt.AbstractHBaseBolt;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.common.ColumnList;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.BatchHelper;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.storm.hbase.common.Utils.toBytes;
public class HbaseCusBolt extends AbstractHBaseBolt {
private static final Logger logger = LoggerFactory.getLogger(HbaseCusBolt.class);
private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
boolean writeToWAL = true;//是否写入到滚动日志(在hbase中滚动日志的存在能够确保在断电等紧急情况发生后,重新开机数据不丢失;但是会降低吞吐量)
List<Mutation> batchMutations = new LinkedList();
int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
int batchSize;
BatchHelper batchHelper;
public HbaseCusBolt(String tableName, HBaseMapper mapper) {
super(tableName,mapper);
}
public HbaseCusBolt writeToWAL(boolean writeToWAL) {
this.writeToWAL = writeToWAL;
return this;
}
public HbaseCusBolt withConfigKey(String configKey) {
this.configKey = configKey;
return this;
}
public HbaseCusBolt withBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
public HbaseCusBolt withFlushIntervalSecs(int flushIntervalSecs) {
this.flushIntervalSecs = flushIntervalSecs;
return this;
}
public Map<String, Object> getComponentConfiguration() {
return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
}
//可以对比org.apache.storm.hbase.bolt.HbaseBolt类,rowKey,ColumnList的获取有改动
public void execute(Tuple tuple) {
try {
if (this.batchHelper.shouldHandle(tuple)) {
String initData = tuple.getString(0);
//************获取temper data***************
Gson gson = new Gson();
java.lang.reflect.Type typeClass = new TypeToken<Temper>() {}.getType();
Temper tempers = gson.fromJson(initData, typeClass);
List<Temper.TemperData> temperDatas = tempers.temperDatas;
ColumnList cols = new ColumnList();
String columnFamily = "temp";//列族
for (Temper.TemperData temperData : temperDatas) {
String cityName = temperData.cityName;
String temperValue = temperData.temperValue;
cols.addColumn(toBytes(columnFamily), toBytes(cityName + "_temp"), toBytes(temperValue));
}
String rowkey = tempers.provinceName+"_"+tempers.tm;//行健
List<Mutation> mutations = this.hBaseClient.constructMutationReq(toBytes(rowkey), cols, this.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
this.batchMutations.addAll(mutations);
this.batchHelper.addBatch(tuple);
}
if (this.batchHelper.shouldFlush()) {
this.hBaseClient.batchMutate(this.batchMutations);
logger.debug("acknowledging tuples after batchMutate");
this.batchHelper.ack();
this.batchMutations.clear();
}
} catch (Exception e) {
this.batchHelper.fail(e);
this.batchMutations.clear();
}
}
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
super.prepare(map, topologyContext, collector);
this.batchHelper = new BatchHelper(batchSize, collector);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
package com.wn;
import java.util.List;
public class Temper{
public String provinceName;
public String tm;//时间戳
public List<TemperData> temperDatas;
public static class TemperData {
public String cityName;//城市名称
public String temperValue;//温度值
}
}
4)运行结果
scan 'tempStatis'
最后
以上就是精明音响为你收集整理的storm+hbase实现hbase表列名的动态生成的全部内容,希望文章能够帮你解决storm+hbase实现hbase表列名的动态生成所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复