我是靠谱客的博主 精明音响,最近开发中收集的这篇文章主要介绍storm+hbase实现hbase表列名的动态生成,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

Storm框架可以通过hbasebolt将实时处理的数据写入hbase中,但是一般需要指定列名(比如:temp:name,再对应将name的值写入),如果列名是根据数据动态生成的(比如:temp:zhuhai_temp,其中zhuhai是从数据中获取的),那么该如何写入hbase呢?

这里举例进行说明,需要注意的是本例仅适用于本篇文章的讲解,不考虑案例的合理性。

案例:对全国不同省份不同城市的温度进行统计,要求Hbase表的设计如下:

表名:tempStatis

行健:省份名拼音_时间戳

列族:temp

列名:城市名拼音_temp

命令:put 'tempStatis','guangdong_1550906523','temp:zhuhai_temp','20'

           put 'tempStatis','guangdong_1550906523','temp:guangzhou_temp','21'

解释:hbase的表tempStatis中写入数据,广东省珠海市的温度,广东省广州市的温度

 

解决方案

1)直接调用hbase api

思路:实现类,使其继承BaseBasicBolt,在该类中调用hbase api,实现hbase数据的写入。

2)仿照SimpleHBaseMapper,实现类,使其实现HBaseMapper接口

思路:参考SimpleHBaseMapper,在方法public ColumnList columns(Tuple tuple){}中根据需求修改如下代码。

cols.addColumn(this.columnFamily, field.getBytes(), Utils.toBytes(tuple.getValueByField(field)));

cols.addCounter(this.columnFamily, field.getBytes(), Utils.toLong(tuple.getValueByField(field)));

3)仿照HBaseBolt,实现类,使其继承AbstractHBaseBolt

思路:参考HBaseBolt,在方法public void execute(Tuple tuple) {}中根据需求修改代码。

文中主要介绍第三种方案的具体实现。

 

具体实现

1)拓扑图实现

2)Intellij 新建meven项目,pom.xml如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wn</groupId>
    <artifactId>TemperTestAnalyser</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <provided.scope>provided</provided.scope>
        <storm.topology>com.wn.TemperTopo</storm.topology>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.2</version>
            <scope>${provided.scope}</scope><!--本地运行需要将该行注释-->
        </dependency>

        <!--storm+hbase-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>1.0.2</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>true</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>${storm.topology}</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3)代码实现

package com.wn;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
import org.apache.storm.topology.TopologyBuilder;

import java.util.HashMap;
import java.util.Map;

public class TemperTopo {
    public static void main(String[] args) {
        TemperTopo temperTopo = new TemperTopo();
        StormTopology stormTopology = temperTopo.buildTopology();

        Config config = new Config();
        //hbase配置
        Map<String, Object> hbConfig = new HashMap<String, Object>();
        hbConfig.put("hbase.rootdir", "hdfs://******/hbase");
        hbConfig.put("hbase.zookeeper.quorum", "******");
        hbConfig.put("hbase.zookeeper.property.clientPort", "2181");
        config.put("hbConfig", hbConfig);
        config.setMessageTimeoutSecs(500);

        try {
            if (args != null && args.length > 0) {
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], config, stormTopology);
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("temperTopo", config, stormTopology);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //构建拓扑
    public StormTopology buildTopology() {
        TopologyBuilder builder = new TopologyBuilder();
        System.setProperty("HADOOP_USER_NAME", "******");

        //模拟数据源spout
        String spoutId = "spoutId";
        builder.setSpout(spoutId, new SLSpout(),1);

        SimpleHBaseMapper mapper = new SimpleHBaseMapper();
        String tableName = "tempStatis";
        HbaseCusBolt hbaseCusBolt = new HbaseCusBolt(tableName, mapper).withBatchSize(10).withConfigKey("hbConfig");
        builder.setBolt("boltId", hbaseCusBolt, 1).shuffleGrouping(spoutId);

        return builder.createTopology();
    }
}
package com.wn;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * 产生测试数据
 */
public class SLSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private String[] sentences = {
            "{"provinceName": "guangdong","tm": "1550806523","temperDatas": [{"cityName": "zhuhai","temperValue": "14"},{"cityName": "shenzhen","temperValue": "16"}]}",
            "{"provinceName": "shanxi","tm": "1550806523","temperDatas": [{"cityName": "yuncheng","temperValue": "12"},{"cityName": "taiyuan","temperValue": "7"},{"cityName": "datong","temperValue": "3"}]}",
            "{"provinceName": "henan","tm": "1550806523","temperDatas": [{"cityName": "luoyang","temperValue": "9"},{"cityName": "zhengzhou","temperValue": "11"},{"cityName": "xinyang","temperValue": "11"}]}"
    };
    private int index = 0;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));

        index++;
        if (index >= sentences.length) {
            index = 0;
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}
package com.wn;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.storm.hbase.bolt.AbstractHBaseBolt;
import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
import org.apache.storm.hbase.common.ColumnList;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.BatchHelper;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.apache.storm.hbase.common.Utils.toBytes;

public class HbaseCusBolt extends AbstractHBaseBolt {
    private static final Logger logger = LoggerFactory.getLogger(HbaseCusBolt.class);
    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;

    boolean writeToWAL = true;//是否写入到滚动日志(在hbase中滚动日志的存在能够确保在断电等紧急情况发生后,重新开机数据不丢失;但是会降低吞吐量)
    List<Mutation> batchMutations = new LinkedList();
    int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
    int batchSize;
    BatchHelper batchHelper;

    public HbaseCusBolt(String tableName, HBaseMapper mapper) {
        super(tableName,mapper);
    }

    public HbaseCusBolt writeToWAL(boolean writeToWAL) {
        this.writeToWAL = writeToWAL;
        return this;
    }

    public HbaseCusBolt withConfigKey(String configKey) {
        this.configKey = configKey;
        return this;
    }

    public HbaseCusBolt withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    public HbaseCusBolt withFlushIntervalSecs(int flushIntervalSecs) {
        this.flushIntervalSecs = flushIntervalSecs;
        return this;
    }

    public Map<String, Object> getComponentConfiguration() {
        return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs);
    }

    //可以对比org.apache.storm.hbase.bolt.HbaseBolt类,rowKey,ColumnList的获取有改动
    public void execute(Tuple tuple) {
        try {
            if (this.batchHelper.shouldHandle(tuple)) {
                String initData = tuple.getString(0);

                //************获取temper data***************
                Gson gson = new Gson();
                java.lang.reflect.Type typeClass = new TypeToken<Temper>() {}.getType();
                Temper tempers = gson.fromJson(initData, typeClass);

                List<Temper.TemperData> temperDatas = tempers.temperDatas;
                ColumnList cols = new ColumnList();
                String columnFamily = "temp";//列族
                for (Temper.TemperData temperData : temperDatas) {
                    String cityName = temperData.cityName;
                    String temperValue = temperData.temperValue;

                    cols.addColumn(toBytes(columnFamily), toBytes(cityName + "_temp"), toBytes(temperValue));
                }

                String rowkey = tempers.provinceName+"_"+tempers.tm;//行健
                List<Mutation> mutations = this.hBaseClient.constructMutationReq(toBytes(rowkey), cols, this.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
                this.batchMutations.addAll(mutations);
                this.batchHelper.addBatch(tuple);
            }

            if (this.batchHelper.shouldFlush()) {
                this.hBaseClient.batchMutate(this.batchMutations);
                logger.debug("acknowledging tuples after batchMutate");
                this.batchHelper.ack();
                this.batchMutations.clear();
            }
        } catch (Exception e) {
            this.batchHelper.fail(e);
            this.batchMutations.clear();
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        super.prepare(map, topologyContext, collector);
        this.batchHelper = new BatchHelper(batchSize, collector);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}
package com.wn;

import java.util.List;

public class Temper{
    public String provinceName;
    public String  tm;//时间戳
    public List<TemperData> temperDatas;
    public static class TemperData {
        public String cityName;//城市名称
        public String temperValue;//温度值
    }
}

4)运行结果

scan 'tempStatis'

最后

以上就是精明音响为你收集整理的storm+hbase实现hbase表列名的动态生成的全部内容,希望文章能够帮你解决storm+hbase实现hbase表列名的动态生成所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部