我是靠谱客的博主 纯真乐曲,最近开发中收集的这篇文章主要介绍HbaseAPI,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

hbaseAPI方法:
API本质就是调用对象的方法去实现的。
ddl的概念是对表的操作和对外面的大的操作,ddl都是去调用admin对象的方法实现的admin对象通过连接.getAdmin获取。
dml的概念是对数据的操作和查询什么的,dml都是去调用Table对象的方法实现的,Table对象通过连接.getTable获取。
步骤
①获取hbase的配置对象
②创建连接对象将配置对象传参
③如果是ddl的话就通过连接调用getAdmin创建admin对象,dml就应该连接调用getTable
④然后就想要什么操作就调用admin对象的对应方法去操作,按照里面的参数去创建对应的参数。
hbase的ddl的API
①判断表存在
/**
* 判断表存在
* @param tablename
* @return
* @throws IOException
*/
public static boolean isTableExist(String tablename) throws IOException {
    boolean exits = admin.tableExists(TableName.valueOf(tablename));
    close(null,admin);
    return exits;
}

②创建表
/**
* 创建表
* @param tableName
* @param cf
* @throws IOException
*/
public static void createTable(String tableName, String...cf) throws IOException {


    if(cf==null){
        System.out.println("输入列族");
    }
    if(isTableExist(tableName)){
        System.out.println("有这个表了");
    }


    HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));


    for (String s : cf) {
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(s);
        hTableDescriptor.addFamily(hColumnDescriptor);
    }


    admin.createTable(hTableDescriptor);


}

③创建命名空间
/**
* 创建命名空间
* @param ns
*/
public static void createNameSpace(String ns){


    NamespaceDescriptor nss = NamespaceDescriptor.create(ns).build();
    try {
        admin.createNamespace(nss);
    } catch (NamespaceExistException e){
        System.out.println("命名空间存在了");
    } catch (IOException e) {
        e.printStackTrace();
    }


}

④删除表
/**
* 删除表
* @param tableName
* @throws IOException
*/
public static void deleteTable(String tableName) throws IOException {
    if(!isTableExist(tableName)){
        System.out.println("表不存在");
    }
    admin.disableTable(TableName.valueOf(tableName));
    admin.deleteTable(TableName.valueOf(tableName));
}

ddl的全部代码:
package hbasetest;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;


import java.io.IOException;




public class hbaseapi01 {


    private static Connection connection = null;
    private static Admin admin = null;
    static{
        try {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "nn01,nn02,dn01,dn02,dn03");


            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (Exception e){
            new RuntimeException(e);
        }
    }


    /**
     * 此方法用来封装关闭方法
     */
    public static void close(Connection connection, Admin admin){
        if(admin!=null){
            try{
                admin.close();
            }catch (Exception e){
                new RuntimeException(e);
            }
        }
        if(connection!=null){
            try{
                connection.close();
            }catch (Exception e){
                new RuntimeException(e);
            }
        }
    }


    /**
     * 判断表存在
     * @param tablename
     * @return
     * @throws IOException
     */
    public static boolean isTableExist(String tablename) throws IOException {


        boolean exits = admin.tableExists(TableName.valueOf(tablename));


        close(null,admin);


        return exits;
    }


    /**
     * 创建表
     * @param tableName
     * @param cf
     * @throws IOException
     */
    public static void createTable(String tableName, String...cf) throws IOException {


        if(cf==null){
            System.out.println("输入列族");
        }
        if(isTableExist(tableName)){
            System.out.println("有这个表了");
        }


        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));


        for (String s : cf) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(s);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }


        admin.createTable(hTableDescriptor);


    }


    /**
     * 删除表
     * @param tableName
     * @throws IOException
     */
    public static void deleteTable(String tableName) throws IOException {
        if(!isTableExist(tableName)){
            System.out.println("表不存在");
        }
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }


    /**
     * 创建命名空间
     * @param ns
     */
    public static void createNameSpace(String ns){


        NamespaceDescriptor nss = NamespaceDescriptor.create(ns).build();
        try {
            admin.createNamespace(nss);
        } catch (NamespaceExistException e){
            System.out.println("命名空间存在了");
        } catch (IOException e) {
            e.printStackTrace();
        }


    }


    public static void main(String[] args) throws IOException {


        createNameSpace("ds1");






    }


}

hbase的dml的API
①插入数据
/**
* 插入数据,想要插入多条的话可以加个for循环传参数,或者将put对象创建然后加入到列表里面,然后将列表传入put方法
* @param tableNAme
* @param rowKey
* @param cf
* @param cn
* @param value
* @throws IOException
*/
public static void putData(String tableNAme,String rowKey,String cf,String cn,String value) throws IOException {


    Table table = connection.getTable(TableName.valueOf(tableNAme));


    Put put = new Put(Bytes.toBytes(rowKey));

    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));


    table.put(put);


    table.close();


}

②查询数据(get)
/**查询数据
* ①构建table
* ②构建get,然后传rowkey,然后在get.addColumn()
* ③然后将get提交给get方法
* ④再打印
* @param tableName
* @param rowKey
* @param cf
* @param cn
* @throws IOException
*/
public static void getData(String tableName,String rowKey,String cf,String cn) throws IOException {


    Table table = connection.getTable(TableName.valueOf(tableName));
    
    Get get = new Get(Bytes.toBytes(rowKey));
    get.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn));
    
    Result result = table.get(get);
    
    for (Cell cell : result.rawCells()) {
        System.out.println("列族"+Bytes.toString(CellUtil.cloneFamily(cell))+",列"+Bytes.toString(CellUtil.cloneQualifier(cell))+",值:"+Bytes.toString(CellUtil.cloneValue(cell)));
    }


}

③查询数据(scan)
public static void scanData(String tableName) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));
    Scan scan = new Scan();
    scan.setMaxVersions(10);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("rawkey"+Bytes.toString(CellUtil.cloneRow(cell))+",cf:"+Bytes.toString(CellUtil.cloneFamily(cell))+",cl:"+Bytes.toString(CellUtil.cloneQualifier(cell))+",value:"+Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
}

④删数据
public static void deleteData(String tablename,String rowkey,String cf,String cn) throws IOException {




    Table table = connection.getTable(TableName.valueOf(tablename));


    Delete delete = new Delete(Bytes.toBytes(rowkey));
    delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn));
    table.delete(delete);
    
    table.close();


}

dml的全部代码

package hbasetest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;


public class dmlAPI {

    private static Connection connection = null;
    private static Admin admin = null;
    static{
        try {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "nn01,nn02,dn01,dn02,dn03");

            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (Exception e){
            new RuntimeException(e);
        }
    }

    /**
     * 此方法用来封装关闭方法
     */
    public static void close(Connection connection, Admin admin){
        if(admin!=null){
            try{
                admin.close();
            }catch (Exception e){
                new RuntimeException(e);
            }
        }
        if(connection!=null){
            try{
                connection.close();
            }catch (Exception e){
                new RuntimeException(e);
            }
        }
    }

    /**
     * 插入数据,想要插入多条的话可以加个for循环传参数,或者将put对象创建然后加入到列表里面,然后将列表传入put方法
     * @param tableNAme
     * @param rowKey
     * @param cf
     * @param cn
     * @param value
     * @throws IOException
     */
    public static void putData(String tableNAme,String rowKey,String cf,String cn,String value) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableNAme));

        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value));

        table.put(put);

        table.close();

    }

    /**查询数据
     * ①构建table
     * ②构建get,然后传rowkey,然后在get.addColumn()
     * ③然后将get提交给get方法
     * ④再打印
     * @param tableName
     * @param rowKey
     * @param cf
     * @param cn
     * @throws IOException
     */
    public static void getData(String tableName,String rowKey,String cf,String cn) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn));

        Result result = table.get(get);

        for (Cell cell : result.rawCells()) {
            System.out.println("列族"+Bytes.toString(CellUtil.cloneFamily(cell))+",列"+Bytes.toString(CellUtil.cloneQualifier(cell))+",值:"+Bytes.toString(CellUtil.cloneValue(cell)));
        }

    }

    public static void scanData(String tableName) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.setMaxVersions(10);
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println("rawkey"+Bytes.toString(CellUtil.cloneRow(cell))+",cf:"+Bytes.toString(CellUtil.cloneFamily(cell))+",cl:"+Bytes.toString(CellUtil.cloneQualifier(cell))+",value:"+Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
    }

    public static void deleteData(String tablename,String rowkey,String cf,String cn) throws IOException {


        Table table = connection.getTable(TableName.valueOf(tablename));

        Delete delete = new Delete(Bytes.toBytes(rowkey));
        delete.addColumns(Bytes.toBytes(cf),Bytes.toBytes(cn));
        table.delete(delete);

        table.close();

    }

    public static void main(String[] args) throws IOException {

//    getData("test","1001","cf","name");
        scanData("test");
    }

}

与mapreduce关联:
map类
package hbasetest.mr2;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class mrMapper extends TableMapper<ImmutableBytesWritable, Put> {

@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {

    Put put = new Put(value.getRow());

    for (Cell cell : value.rawCells()) {
        if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
            put.add(cell);
         }
    }
    context.write(key,put);

    }
}

reduce类
package hbasetest.mr2;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class mrMapper extends TableMapper<ImmutableBytesWritable, Put> {

@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {

    Put put = new Put(value.getRow());

    for (Cell cell : value.rawCells()) {
        if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
            put.add(cell);
        }
    }
    context.write(key,put);

    }
}
driver类
package hbasetest.mr2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class mrDriver implements Tool {

    private Configuration conf = null;

    @Override
    public int run(String[] strings) throws Exception {

        Job job = Job.getInstance(conf);
        job.setJarByClass(mrDriver.class);
        TableMapReduceUtil.initTableMapperJob(strings[0],new Scan(),mrMapper.class,                                 
        ImmutableBytesWritable.class, Put.class,job);
        TableMapReduceUtil.initTableReducerJob(strings[1],mrReducer.class,job);
        boolean result = job.waitForCompletion(true);

        return result?0:1;
    }

    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static void main(String[] args) {
        try {
            Configuration conf = HBaseConfiguration.create();
            ToolRunner.run(conf,new mrDriver(),args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

打包后提交到集群运行的命令
yarn jar jar包名 驱动类名 参数
hbase知识点:
预分区:
    三种创建方法:
        
第一种传递一个数组来分区
第二种创建15个分区然后以16进制字符串划分
第三种写一个文件里面是命名规则然后划分(会对文件先排序)

最后

以上就是纯真乐曲为你收集整理的HbaseAPI的全部内容,希望文章能够帮你解决HbaseAPI所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部