我是靠谱客的博主 闪闪香水,最近开发中收集的这篇文章主要介绍企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng 企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng 通常我们在电信项目中采用HBase来存储用户终端明细数据等,供前台页面即时查询。HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力。针对HBase的多条件查询也有多种方案,实时搜索已成为信息检索领域的热点问题之一。但是有些方案要么太复杂,要么效率太低,本文,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng


通常我们在电信项目中采用HBase来存储用户终端明细数据等,供前台页面即时查询。HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力。针对HBase的多条件查询也有多种方案,实时搜索已成为信息检索领域的热点问题之一。但是有些方案要么太复杂,要么效率太低,本文将对基于SolrCloud的HBase多条件查询方案进行测试和验证,同时结合SparkStreaming对SolrCloud的索引进行定期维护操作。

SolrCloud+HBase 组合的原理:

基于Solr的HBase多条件查询原理很简单,将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,通过Solr的多条件查询快速获得符合过滤条件的rowkey值,拿到这些rowkey之后在HBASE中通过指定rowkey进行查询。

1.    操作系统:centos6.5(64位)

2.    应用软件:CDH5.6.0(HDFS、Yarn、Zookeeper、Kafka、HBase、Flume-ng、SolrCloud)

3.    编程环境:oracle-j2sdk1.7-1.7.0+update67-1.x86_64




三、  Flume程序设计

数据是从消息中间件(kafka)中读取,由flume担任消费者(comsumer),同时将数据实时sink到hbase和solr cloud。用flume为hbase和solr cloud搭建起桥梁。代码如下:

public class MMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {

    private byte[] table;
    private byte[] cf;
    private byte[][] payload;
    private byte[][] payloadColumn;
    private final String payloadColumnSplit = "\|";
    private String SOLR_URL="";

    @Override
    public void configure(Context context) {
        String pCol = context.getString("payloadColumn","pCol");
        SOLR_URL=context.getString("solrurl");
        if(SOLR_URL.isEmpty()){
            //取solrURL地址,且不能为空
            System.out.println("Solr Url地址不能为空!");
            System.exit(-1);
        }
        if(pCol != null && !pCol.isEmpty()) {
            // 从配置文件中读出column。
            String[] pCols = pCol.replace(" ", "").split(",");
            payloadColumn = new byte[pCols.length][];
            for (int i = 0; i < pCols.length; i++) {
                // 列名转为小写
                payloadColumn[i] = pCols[i].toLowerCase().getBytes(Charsets.UTF_8);
            }
        }
    }

    @Override
    public void configure(ComponentConfiguration context) {
    }

    @Override
    public void initialize(byte[] table, byte[] cf) {
        this.table = table;
        this.cf = cf;
    }

    @Override
    public void setEvent(Event event) {
        String strBody = new String(event.getBody());
        String[] subBody = strBody.split(this.payloadColumnSplit);
        if (subBody.length == this.payloadColumn.length){
            this.payload = new byte[subBody.length][];
            for (int i = 0; i < subBody.length; i++){
                this.payload[i] = subBody[i].getBytes(Charsets.UTF_8);
            }
        }
    }

    @Override
    public List<PutRequest> getActions() {
        List<PutRequest> actions = new ArrayList<PutRequest>();
        List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
        if (payloadColumn != null) {
            try {
                // for 循环,提交所有列和对于数据的put请求。
                String keys = HashRowKeyGenerator.getRowKey().toString();
                String rowkey="";
                for (int i = 0; i < this.payload.length; i++) {
                    //HBase操作
                    rowkey = new String(payload[0], "UTF8") + keys;
                    PutRequest putRequest = new PutRequest(table,rowkey.getBytes("UTF8"), cf, payloadColumn[i],payload[i]);
                    actions.add(putRequest);
                }
                //solr操作
                SolrInputDocument doc1 = new SolrInputDocument();
                if(!rowkey.isEmpty()){
                    doc1.addField("id", rowkey, 1.0f);
                    doc1.addField("productName", new String(payload[6], "UTF8"),1.0f);
                    docs.add(doc1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                CloudSolrServer solrserver = new CloudSolrServer(urlSolr);
                solrserver.setDefaultCollection(defaultCollection);
                solrserver.setZkClientTimeout(zkClientTimeOut);
                solrserver.setZkConnectTimeout(zkConnectTimeOut);           server.add(docs.iterator());
                server.commit();
            } catch (Exception e) {
                System.out.println(e);
            }
        }
        return actions;
    }

    @Override
    public List<AtomicIncrementRequest> getIncrements() {
        List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
        return actions;
    }

    @Override
    public void cleanUp() {
    }
}

Flume 配置文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.plugins.KafkaSource
a1.sources.r1.zookeeper.connect=Master:2181,Slave1:2181,Slave2:2181
a1.sources.r1.group.id=test-consumer-group
a1.sources.r1.zookeeper.session.timeout.ms=400
a1.sources.r1.zookeeper.sync.time.ms=200
a1.sources.r1.auto.commit.interval.ms=1000
a1.sources.r1.custom.topic.name=test2
a1.sources.r1.custom.thread.per.consumer=4
a1.sources.r1.batchSize = 10000000

a1.sinks.k1.type  = org.apache.flume.sink.hbase.AsyncHBaseSink
a1.sinks.k1.table = test11
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer.pp = 11
a1.sinks.k1.serializer.payloadColumn=msg_id,service_id,msg_fmt,src_id,dest_phone,msg_length,msg_content_tmp,timestr,result
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.MMSAsyncHbaseEventSerializer

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000000
a1.channels.c1.transactionCapacity = 1000000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

四、HBase的RowKey设计

当处理由连续事件得到的数据时,即时间上连续的数据。它们显著的特点就是rowkey中含有事件发生时间。带来的一个问题便是HBase对于row的不均衡分布,它们被存储在一个唯一的rowkey区间中,被称为region,区间的范围被称为Start KeyEnd Key

对于单调递增的时间类型数据,很容易被散列到同一个Region中,这样它们会被存储在同一个服务器上,从而所有的访问和更新操作都会集中到这一台服务器上,从而在集群中形成一个hot spot,从而不能将集群的整体性能发挥出来。

要解决这个问题是非常容易的,只需要将所有的数据散列到全部的Region上即可。这是可以做到的,比如,在rowkey前面加上一个非线程序列,本案例的做法如下:

 HBase RowKey使用msg_id+ MD5Hash.getMD5AsHex(Bytes.toBytes(currentId)).substring(0,8).getBytes(),同时TTL保留24小时数据,轮训删除

COLUMNFAMILIES DESCRIPTION                                                                                                                                                                   

{NAME=> 'cf', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '5', TTL=> '86400 SECONDS', MIN_VERSIONS => '0', KEEP_DELETED_CE

LLS=> 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE=> 'true'}  

public class HashRowKeyGenerator {

   
public static String getRowKey(){
       
long currentId= 100L;
        byte
[]rowkey=Bytes.add(MD5Hash.getMD5AsHex(Bytes.toBytes(currentId)).substring(0, 8).getBytes(),Bytes.toBytes(currentId));
        return
rowkey.toString();
   
}
}

 

五、Cloudera Manger管理Solr Cloud

由于现在solrcloud上是没有collection的,我们先创建一个collection,命名为collection1,在创建collection1之前,首先要将配置信息上传到zookeeper上,我们这里就将solr自带的examples里面的那个solr目录下面的collection1的配置上传,对应cm安装的集群,这个目录在/etc/solr/conf/collection1

sudo-u solr solrctl instancedir --create collection1 /etc/solr/conf/collection1

SolrCloud提供近实时搜索,索引持久化,同时SolrCloud也能被配置成分片(shard)索引,这样集群主机可以把索引数据分布到各个主机上,多个shard可以组成一个Collection.

SolrCloud集群的所有的配置存储在ZooKeeper. 一旦一个SolrCloud节点启动, 该节点的配置信息将发送到ZooKeeper上存储.
Shard Replica除了作为容灾备份存在, 另外一个作用就是分散查询请求, 提高整个集群的查询能力.

创建solr中心库product

 

public static void query(){
    try{
        HttpSolrServer server=new HttpSolrServer(SOLR_URL);
        SolrQuery query=new SolrQuery("productName:10元/");
        QueryResponse res=server.query(query);
        SolrDocumentList list=res.getResults();
        for(int i=0;i<list.size();i++){
            System.out.println(">>>>>>>>>>>>>>>>>"+list.get(i).get("rowkey"));
        }
    }catch(Exception e){
        e.printStackTrace();
    }
}

 通过solr cloud提供的rowkey进行filter查询

// 获取指定rowkey数据集
public static void getRows(String tableName, List<String> rows) throws Exception {
    table = connection.getTable(tableName);
    for(String row:rows){
        Get get = new Get(Bytes.toBytes(row));
        Result result = table.get(get);
        // 输出结果,raw方法返回所有keyvalue数组
        for (Cell cell : result.rawCells()) {
            System.out.print("行名:" + new String(CellUtil.cloneRow(cell)) + " ");
            System.out.print("时间戳:" + cell.getTimestamp()+ " ");
            System.out.print("列族名:" + new String(CellUtil.cloneFamily(cell)) + " ");
            System.out.print("列名:" + new String(CellUtil.cloneQualifier(cell)) + " ");
            System.out.println("值:" + new String(CellUtil.cloneValue(cell)));
        }
    }
}

 中文分词(Jcseg)设计

由于SolrCloud对中文分词的支持不是很好,经过对多个中文分词(IKAnalysis)、ansj_seg、mmseg4j等进行比较,要么对中文标点符号和特殊符号支持不佳,要么分词效率比较低。经多方面考虑采用Jcseg。

jcseg是使用Java开发的一款开源的中文分词器, 使用mmseg算法. 分词准确率高达

98.4%,支持中文人名识别, 同义词匹配, 停止词过滤..., 详情请查看jcseg官方首页.

官方首页:  https://code.google.com/p/jcseg/

下载地址: https://code.google.com/p/jcseg/downloads/list

 git   源码: http://git.oschina.net/lionsoul/jcseg

Jcseg详细功能介绍:(可以略过, 方便查看新版本功能变化)

1。目前最高版本:jcseg-1.9.5。兼容最高版本 lucene, solr, elasticsearch

2。三种切分模式:

(1).简易模式:FMM算法,适合速度要求场合。

(2).复杂模式-MMSEG四种过滤算法,具有较高的岐义去除,分词准确率达到了98.41%。

(3).检测模式:只返回词库中已有的词条,很适合某些应用场合。(1.9.4开始)

3。智能中文人名识别。中文人名识别正确率达94%以上。

4.。支持自定义词库。在lexicon文件夹下,可以随便添加/删除/更改词库和词库内容,并且对词库进行了分类。

具体参考官方文档。

操作步骤如下

1.在solrconfig.xml中配置引用的中文分词的jar包目录:

<lib dir="目录前缀/solr/product/lib/"regex=".*.jar" />

1.在shema.xml中配置

<field name="rowkey" type="string" indexed="true" stored="true" required="true" multiValued="false" />
<field name="Content" type="textComplex" indexed="true" stored="false" omitNorms="true" />

 

<!-- 复杂模式分词: -->
<fieldtype name="textComplex" class="solr.TextField">
<analyzer>
<tokenizer class="org.lionsoul.jcseg.solr.JcsegTokenizerFactory" mode="complex"/>
</analyzer>
</fieldtype>
<!-- 简易模式分词: -->
<fieldtype name="textSimple" class="solr.TextField">
<analyzer>
<tokenizer class="org.lionsoul.jcseg.solr.JcsegTokenizerFactory" mode="simple"/>
</analyzer>
</fieldtype>
<!-- 检测模式分词: -->
<fieldtype name="textSimple1" class="solr.TextField">
<analyzer>
<tokenizer class="org.lionsoul.jcseg.solr.JcsegTokenizerFactory" mode="detect"/>
</analyzer>
</fieldtype>

 

将编辑好的文件传到主机上,并提交Cloud Manager集群主机: sudo -u solr solrctl instancedir --create product/etc/solr/conf/product

 

 

、SolrCloud增量索引-删除业务

SolrCloud索引数据保存24小时内的数据,所以借住webapplication的listener监听器,做定时任务,每隔1分钟或者10秒,删除时间戳范围数据。

public class SolrSchedulJob implements ServletContextListener{

   
private Timertimer;
    private
MyTask task;

   
@Override
   
public void contextDestroyed(ServletContextEventarg0) {
       
timer.cancel();
       
System.out.println("定时器已销毁");
   
}

    @Override
   
public void contextInitialized(ServletContextEvent event) {
        timer =
new java.util.Timer(true);
       
task = new MyTask();
       
System.out.println("定时器已启动");
        
timer.schedule(task, 0, 10 * 1000);
       
System.out.println("已经添加任务调度表");
   
}

}

 

Task任务定时删除时间戳索引:

public class SolrRemoveIndexTask extends TimerTask {

   
private static final String SOLR_URL= "http://Master:8983/Solr/product";

   
@Override
   
public void run() {
       
long start= System.currentTimeMillis();
       
Date beginDate = new Date();
       
Calendar date = Calendar.getInstance();
       
date.setTime(beginDate);
       
date.set(Calendar.DATE, date.get(Calendar.DATE)- 1);
        long
end = date.getTime().getTime();
       
HttpSolrServer server = new HttpSolrServer(SOLR_URL);
       
SolrQuery query = new SolrQuery("createtime:[" + start + " to " + end + "]");
       
QueryResponse res;
        try
{
            res = server.query(query)
;
           
SolrDocumentList list = res.getResults();
           
List<String> ids = new ArrayList<String>();
            for
(int i = 0; i < list.size(); i++){
                ids.add(list.get(i).get(
"rowkey").toString());
            
}
            server.deleteById(ids)
;
           
server.commit();
       
} catch(Exception e) {
            e.printStackTrace()
;
       
}
        System.
out.println("timeelapsed(ms):" + (System.currentTimeMillis()- start));
   
}
}

 

在web.xml中配置的listener监听器:

<listener>
<listener-class>servlet
.SolrSchedulJob</listener-class>

</listener>

 

基于Solr的HBase多条件查询原理很简单,将HBase表中涉及条件过滤的字段和rowkey在Solr中建立索引,通过Solr的多条件查询快速获得符合过滤条件的rowkey值,拿到这些rowkey之后在HBASE中通过指定rowkey进行查询。当然实现rowkey共享机制还有很多种方法。此案例开发过程中也存在诸多问题。欢迎指正。谢谢!


最后

以上就是闪闪香水为你收集整理的企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng 企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng 通常我们在电信项目中采用HBase来存储用户终端明细数据等,供前台页面即时查询。HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力。针对HBase的多条件查询也有多种方案,实时搜索已成为信息检索领域的热点问题之一。但是有些方案要么太复杂,要么效率太低,本文的全部内容,希望文章能够帮你解决企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng 企业级分布式实时搜索模型研究与实现SorlCloud+HBase+Flume-ng 通常我们在电信项目中采用HBase来存储用户终端明细数据等,供前台页面即时查询。HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力。针对HBase的多条件查询也有多种方案,实时搜索已成为信息检索领域的热点问题之一。但是有些方案要么太复杂,要么效率太低,本文所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部