我是靠谱客的博主 大气芒果,最近开发中收集的这篇文章主要介绍工具类:Hbase (Admin) Client常量ConstantsDDL 操作 : 建表 + 修改表HBaseClient:增删改查,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

常量Constants

package com.wj.hbase;
import org.apache.hadoop.hbase.util.Bytes;
/**
* 常量
*/
public interface Constants {
String ZOOKEEPER_LIST = "...";
String ZOOKEEPER_CLIENT_PORT = "2015";
String ZOOKEEPER_ZNODE_PARENT = "/hbase";
String USER_NAME = "wj";
String TABLE_NAME = "wj_table";
String COLUMN_FAMILY = "f";
byte[] ROWKEY = Bytes.toBytes("rowkey-1");
byte[] ROWKEY_2 = Bytes.toBytes("rowkey-2");
byte[] FAMILY = Bytes.toBytes("f");
byte[] QUALIFIER = Bytes.toBytes("c1");
byte[] QUALIFIER_2 = Bytes.toBytes("c2");
byte[] VALUE = Bytes.toBytes("v1");
byte[] VALUE_2 = Bytes.toBytes("v2");
}

DDL 操作 : 建表 + 修改表

package com.wj.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* DDL 操作 : 建表 + 修改表
*/
public class HBaseAdminClient implements Constants {
public static void main(String[] args) throws IOException {
HBaseAdminClient clientExample = new HBaseAdminClient();
// Step 1: 初始化HBase连接,返回Admin实例。
Admin admin = getConnection().getAdmin();
// Step 2: 设置表结构并建表。
clientExample.createSchemaTables(admin);
// Step 3: 修改表结构。
clientExample.modifySchema(admin);
// Step 4: 清空表数据
clientExample.truncateHBaseTable(admin);
}
/**
* 初始化HBase连接
*/
private static Connection getConnection() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, ZOOKEEPER_CLIENT_PORT);
conf.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_LIST);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZOOKEEPER_ZNODE_PARENT);
User user = User.create(UserGroupInformation.createRemoteUser(USER_NAME));
return ConnectionFactory.createConnection(conf, user);
}
/**
* 创建HBase表
*/
private void createSchemaTables(Admin admin) {
try {
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(Constants.TABLE_NAME));
table.addFamily(new HColumnDescriptor(Constants.COLUMN_FAMILY).setCompressionType(Compression.Algorithm.LZO));
// 建表前需判断表是否存在,若存在,则删除
if (admin.tableExists(table.getTableName())) {
admin.disableTable(table.getTableName());
admin.deleteTable(table.getTableName());
}
admin.createTable(table);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 更改表结构:可修改 压缩方式、TTL 和 版本数
*/
private void modifySchema(Admin admin) {
try {
TableName tn = TableName.valueOf(Constants.TABLE_NAME);
// 表不存在,则不用修改
if (!admin.tableExists(tn)) {
System.out.println("Table does not exist...");
System.exit(-1);
}
// 创建表描述实例。
HTableDescriptor tableDesc = admin.getTableDescriptor(tn);
// 定义新列族属性,并添加到表数据实例。
HColumnDescriptor newColumn = new HColumnDescriptor("new_cf");
newColumn.setCompactionCompressionType(Compression.Algorithm.LZO);
// 设置最大版本数
newColumn.setMaxVersions(10);
// 设置TTL
newColumn.setTimeToLive(86400);
tableDesc.addFamily(newColumn);
// 更新列族属性
HColumnDescriptor existingColumn = new HColumnDescriptor(COLUMN_FAMILY);
existingColumn.setCompactionCompressionType(Compression.Algorithm.NONE);
existingColumn.setMaxVersions(3);
existingColumn.setTimeToLive(Integer.MAX_VALUE);
tableDesc.modifyFamily(existingColumn);
admin.modifyTable(tn, tableDesc);
// 删除某已存在的列族(当表仅有一个列族时无法进行此操作)
admin.disableTable(tn);
admin.deleteColumn(tn, COLUMN_FAMILY.getBytes(StandardCharsets.UTF_8));
admin.enableTable(tn);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 清空表数据
*/
private void truncateHBaseTable(Admin admin) throws IOException {
admin.disableTable(TableName.valueOf(Constants.TABLE_NAME));
admin.deleteTable(TableName.valueOf(Constants.TABLE_NAME));
}
}

HBaseClient:增删改查

package com.wj.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 常见客户端操作
*/
public class HBaseClient implements Constants {
public static void main(String[] args) throws IOException {
HBaseClient client = new HBaseClient();
// 插入单条数据(建议优选批量插入数据)
client.singlePut();
// 批量插入数据(老版本 + 同步)
client.multiplePut();
// 批量插入数据(新版本 + 批量异步)
client.multiplePut2();
// 对 Rowkey 散列处理后插入数据,避免热点问题
client.hashPut();
// 根据rowkey查询 单条数据
client.singleGet();
// 根据 rowkey + family + column 查询数据
client.singleGetWithColumn();
// 批量查询 多个 Get
client.multipleGet();
// 扫描数据(设定 起始 / 结束 范围)
client.scanByStartAndStop();
// 根据 rowkey 删除记录(建议批量删除)
client.singleDelete();
// 删除某行某列数据
client.singleDeleteColumn();
// 根据 rowkeys 批量删除记录(优选)
client.multipleDelete();
}
/**
* 初始化HBase连接
*/
private static Connection getConnection() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, ZOOKEEPER_LIST);
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, ZOOKEEPER_CLIENT_PORT);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZOOKEEPER_ZNODE_PARENT);
User user = User.create(UserGroupInformation.createRemoteUser(USER_NAME));
return ConnectionFactory.createConnection(conf, user);
}
/**
* 插入单条数据(建议优选批量插入数据)
*/
private void singlePut() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
Put put = new Put(ROWKEY);
put.addColumn(FAMILY, QUALIFIER, VALUE);
put.addColumn(FAMILY, QUALIFIER_2, VALUE_2);
table.put(put);
}
}
/**
* 批量插入数据(老版本 + 同步)
*/
private void multiplePut() throws IOException {
try (HTable hTable = (HTable) getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
List<Put> list = new ArrayList<>();
Put put1 = new Put(ROWKEY);
put1.addColumn(FAMILY, QUALIFIER, VALUE);
put1.addColumn(FAMILY, QUALIFIER_2, VALUE_2);
Put put2 = new Put(ROWKEY_2);
put2.addColumn(FAMILY, QUALIFIER, VALUE);
put2.addColumn(FAMILY, QUALIFIER_2, VALUE_2);
list.add(put1);
list.add(put2);
hTable.put(list);
/*
* 进行大量的Put的时候,务必确认 HTable 的 autoFlush = false,否则,每执行一个 Put 就要向 RegionServer 发一个请求
*/
hTable.setAutoFlush(false, false);
}
}
/**
* 批量插入数据(新版本 + 批量 + 异步)
*/
private void multiplePut2() throws IOException {
try (BufferedMutator bufferedMutator = getConnection().getBufferedMutator(TableName.valueOf(TABLE_NAME))) {
List<Put> list = new ArrayList<>();
Put put2 = new Put(ROWKEY_2);
put2.addColumn(FAMILY, QUALIFIER_2, VALUE_2);
put2.addColumn(FAMILY, QUALIFIER, VALUE);
Put put1 = new Put(ROWKEY);
put1.addColumn(FAMILY, QUALIFIER_2, VALUE_2);
put1.addColumn(FAMILY, QUALIFIER, VALUE);
list.add(put2);
list.add(put1);
bufferedMutator.mutate(list);
// 刷新
bufferedMutator.flush();
}
}
/**
* 对 Rowkey 散列处理后插入数据,避免热点问题
*/
private void hashPut() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
byte[] row1 = Bytes.toBytes(reverse("zhongguo-beijing"));
byte[] row2 = Bytes.toBytes(md5AsHex("zhongguo-beijing"));
byte[] row3 = Bytes.toBytes(hash2("zhongguo-beijing"));
byte[] row4 = Bytes.toBytes(hash3("zhongguo-beijing"));
List<Put> list = new ArrayList<>();
Put put1 = new Put(row1);
put1.addColumn(FAMILY, QUALIFIER, VALUE);
Put put2 = new Put(row2);
put2.addColumn(FAMILY, QUALIFIER, VALUE);
Put put3 = new Put(row3);
put3.addColumn(FAMILY, QUALIFIER, VALUE);
Put put4 = new Put(row4);
put4.addColumn(FAMILY, QUALIFIER, VALUE);
list.add(put1);
list.add(put2);
list.add(put3);
list.add(put4);
table.put(list);
}
}
/**
* 根据 rowkey 查询数据
*/
private void singleGet() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
Get get = new Get(ROWKEY);
Result result = table.get(get);
// 打印指定行,指定列族,指定列的单个Cell的值
System.out.println(Bytes.toString(result.getValue(FAMILY, QUALIFIER)));
// 遍历每个当前行每个cell的值,即:row,列族,列和value
printResult(result);
}
}
/**
* 根据 rowkey + family + column 查询数据
*/
private void singleGetWithColumn() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
Get get = new Get(ROWKEY);
get.addColumn(FAMILY, QUALIFIER);
Result result = table.get(get);
// 遍历每个当前行每个cell的值,即:row,列族,列和value
printResult(result);
}
}
/**
* 批量查询 多个 Get
*/
private void multipleGet() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
List<Get> list = new ArrayList<>();
Get get1 = new Get(ROWKEY);
Get get2 = new Get(ROWKEY_2);
list.add(get1);
list.add(get2);
Result[] results = table.get(list);
for (Result result : results) {
// 遍历每个当前行每个cell的值,即:row,列族,列和value
printResult(result);
}
}
}
/**
* 遍历每个当前行每个cell的值,即:row,列族,列和value
*/
private void printResult(Result result) {
for (Cell c : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toString(CellUtil.cloneValue(c)) + "}");
}
System.out.println("---------------------");
}
/**
* 扫描数据( 设定 起始 / 结束 范围 )
*/
private void scanByStartAndStop() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
Scan scan = new Scan();
scan.setStartRow(ROWKEY);
scan.setStopRow(ROWKEY_2);
// 设置单次rpc(最大)查询的数量。增大可以减少rpc次数,但是可能会导致timeout,减小会增加rpc次数,影响效率。
// 可理解为 cache 是面向行的优化处理
scan.setCaching(50);
// 用来控制 每次调用 next() 操作时会返回多少列
// 可理解为 batch 是面向列的优化处理
scan.setBatch(5);
// 是否缓存块,默认缓存
scan.setCacheBlocks(false);
SingleColumnValueFilter filter = new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareFilter.CompareOp.EQUAL, VALUE);
scan.setFilter(filter);
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
// 遍历每个当前行每个cell的值,即:row,列族,列和value
printResult(result);
}
}
}
}
/**
* 根据 rowkey 删除记录(建议批量删除)
*/
private void singleDelete() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
Delete delete = new Delete(ROWKEY);
table.delete(delete);
}
}
/**
* 指定 rowkey + family + column 删除指定列(建议批量删除)
*/
private void singleDeleteColumn() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
Delete delete = new Delete(ROWKEY);
delete.addColumn(FAMILY, QUALIFIER);
table.delete(delete);
}
}
/**
* 根据 rowkeys 批量删除记录(优选)
*/
private void multipleDelete() throws IOException {
try (Table table = getConnection().getTable(TableName.valueOf(TABLE_NAME))) {
List<Delete> list = new ArrayList<>();
Delete delete = new Delete(ROWKEY);
Delete delete2 = new Delete(ROWKEY_2);
list.add(delete);
list.add(delete2);
table.delete(list);
}
}
// --------------------------------------------------------------------------------------------
// 以下为 Hbase Rowkey 的常见处理方式,以避免热点问题
// --------------------------------------------------------------------------------------------
/**
* 反转字符串(一般为数字类型的顺序或者随机字符串)
*
* @param str 处理前的 Rowkey
* @return 处理后的 Rowkey
*/
private static String reverse(String str) {
return new StringBuffer(str).reverse().toString();
}
/**
* md5处理(对长度差异很大的字符串,可以用此来做 定长和 salt)
*
* @param str 处理前的 Rowkey
* @return 处理后的 Rowkey
*/
private static String md5AsHex(String str) {
return MD5Hash.getMD5AsHex(Bytes.toBytes(str));
}
/**
* 使用hashCode后2位来加盐(适用于分区较少的表的key)
*
* @param str 处理前的 Rowkey
* @return 处理后的 Rowkey
*/
private static String hash2(String str) {
return String.format("%02d%s", Math.abs(str.hashCode() % 100), str);
}
/**
* 使用hashCode后3位来加盐(适用于分区较多的表的key)
*
* @param str 处理前的 Rowkey
* @return 处理后的 Rowkey
*/
private static String hash3(String str) {
return String.format("%03d%s", Math.abs(str.hashCode() % 1000), str);
}
}

最后

以上就是大气芒果为你收集整理的工具类:Hbase (Admin) Client常量ConstantsDDL 操作 : 建表 + 修改表HBaseClient:增删改查的全部内容,希望文章能够帮你解决工具类:Hbase (Admin) Client常量ConstantsDDL 操作 : 建表 + 修改表HBaseClient:增删改查所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部