概述
使用Java API 操作Kudu
- 概述
- Maven依赖
- 操作Kudu表
- 操作Kudu数据
概述
不是给纯小白看的,也就不谈概念这种废话了,直接上代码。。。使用一个叫KuduPlus的小工具辅助测试。
Maven依赖
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!-- 版本属性 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
操作Kudu表
代码太多,使用了//region
和//endregion
,在idea中可以折叠,还是多,又分开写的,合一起也可以。方便在需要的时候直接ctrl+v改参数。
package com.aa.kudu.table;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
//使用Java API对Kudu进行CRUD操作,包含创建表及删除表
public class kuduTableDemo {
//定义kuduClient实例对象
private KuduClient kuduClient = null;
//region Before操作初始化
@Before
public void init() {
//KuduMaster地址信息
String masterAddresses = "192.168.88.20:7051";
//初始化KuduClient实例对象
kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)//建造者模式
.defaultOperationTimeoutMs(10000)//设置Kudu操作的超时时间,默认30s
.defaultSocketReadTimeoutMs(6000)//设置从Socket读数据超时,默认10s
.build();//返回KuduClient类型
}
//endregion
//region 测试连接
@Test
public void testKuduClient() {
System.out.println("kuduClient = " + kuduClient);//kuduClient = org.apache.kudu.client.KuduClient@6e1ec318
}
//endregion
//region 测试创建表create table
/*
创建Kudu表:
create table aa_users(
id int,
name string,
age byte,
primary key(id)
)
*/
//封装方法
private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
//创建ColumnSchemaBuilder实例对象
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);//设置是否为主键
//构建ColumnSchema
return column.build();
}
@Test
public void createKuduTable() throws KuduException {
//定义各个列,添加到List列表
List<ColumnSchema> columns = new ArrayList<>();
//定义每个列、名称、类型及是否魏主键
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());//使用默认方法
columns.add(newColumnSchema("name", Type.STRING, false));//使用封装的方法
columns.add(newColumnSchema("age", Type.INT8, false));
//定义schema:public Schema(List<ColumnSchema> columns)
Schema schema = new Schema(columns);
//定义表的属性
CreateTableOptions options = new CreateTableOptions();
//设置分区策略
options.addHashPartitions(Arrays.asList("id"), 3);
//设置副本数目
options.setNumReplicas(1);
//创建Kudu表:public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
KuduTable kuduTable = kuduClient.createTable("aa_users", schema, options);
System.out.println("kuduTable.getTableId() = " + kuduTable.getTableId());//kuduTable.getTableId() = 291ebdc9de8e44c3a84a3601f13dcf94
}
//endregion
//region 测试删除表(delete table if exists)
@Test
// public void dropKuduTable(String tableName) throws KuduException {//可以传表名
// if(kuduClient.tableExists(tableName)){//if exists再删除,避免报错
// kuduClient.deleteTable(tableName);//按名称删除
// }
// }
public void dropKuduTable() throws KuduException {
if (kuduClient.tableExists("aa_users")) {//if exists再删除,避免报错
System.out.println("存在表aa_users");
kuduClient.deleteTable("aa_users");//按名称删除
System.out.println("已删除表aa_users");
} else {
System.out.println("不存在表aa_users");
}
}
//endregion
//region 创建表(范围分区)
@Test
public void createKuduTableByRange() throws KuduException{
//定义schema信息、列名称、列类型
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id",Type.INT32).key(true).build());
columns.add(newColumnSchema("name",Type.STRING,false));
columns.add(newColumnSchema("age",Type.INT8,false));
Schema schema = new Schema(columns);
//设置表的属性
CreateTableOptions options = new CreateTableOptions();
//设置分区策略
options.setRangePartitionColumns(Arrays.asList("id"));//设置范围分区字段名称
//id<100
PartialRow upper100 = new PartialRow(schema);
upper100.addInt("id",100);
options.addRangePartition(new PartialRow(schema),upper100);
//100<=id<500
PartialRow lower100 = new PartialRow(schema);
lower100.addInt("id",100);
PartialRow upper500 = new PartialRow(schema);
upper500.addInt("id",500);
options.addRangePartition(lower100,upper500);
//id>=500
PartialRow lower500 = new PartialRow(schema);
lower500.addInt("id",500);
options.addRangePartition(lower500,new PartialRow(schema));
//设置副本数目
options.setNumReplicas(1);
//传递参数,构建表
KuduTable kuduTable = kuduClient.createTable("aa_users_range",schema,options);
System.out.println("kuduTable.getTableId() = " + kuduTable.getTableId());
}
//endregion
//region 创建表(多级分区)
//先哈希再范围,或先哈希再哈希
@Test
public void createKuduTableMulti() throws KuduException{
//构建Schema信息
List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
columnSchemas.add(newColumnSchema("id",Type.INT32,true));
columnSchemas.add(newColumnSchema("age",Type.INT8,true));
columnSchemas.add(newColumnSchema("name",Type.STRING,false));
//定义Schema信息
Schema schema = new Schema(columnSchemas);
//Kudu表的分区策略及分区副本数目设置
CreateTableOptions tableOptions = new CreateTableOptions();
// TODO: 2021/6/29 设置哈希分区
List<String> columnsHash = new ArrayList<>();
columnsHash.add("id");
tableOptions.addHashPartitions(columnsHash,5);
// TODO: 2021/6/29 设置范围分区
List<String> columnsRange = new ArrayList<>();
columnsRange.add("age");
tableOptions.setRangePartitionColumns(columnsRange);
//添加范围分区
PartialRow upper21 = new PartialRow(schema);
upper21.addByte("age",(byte)21);//缺少会报错org.apache.kudu.client.NonRecoverableException: overlapping range partitions: first range partition: UNBOUNDED
tableOptions.addRangePartition(new PartialRow(schema),upper21);
//添加范围分区
PartialRow lower21 = new PartialRow(schema);
lower21.addByte("age",(byte)21);
PartialRow upper41 = new PartialRow(schema);
upper41.addByte("age",(byte)41);
tableOptions.addRangePartition(lower21,upper41);//缺少会报错org.apache.kudu.client.NonRecoverableException: overlapping range partitions
//添加范围分区
PartialRow lower41 = new PartialRow(schema);
lower41.addByte("age",(byte)41);
tableOptions.addRangePartition(lower41,new PartialRow(schema));
//副本数设置
tableOptions.setNumReplicas(1);
//在Kudu中创建表
KuduTable userTable = kuduClient.createTable("aa_users_multi",schema,tableOptions);
System.out.println(userTable.toString());//org.apache.kudu.client.KuduTable@17695df3
/*
node2:8051看到
HASH (id) PARTITIONS 5,
RANGE (age) (
PARTITION VALUES < 21,
PARTITION 21 <= VALUES < 41,
PARTITION VALUES >= 41
)
*/
}
//endregion
//region 添加列
@Test
public void alterKuduTableAddColumn() throws KuduException{
//添加列
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.addColumn("address",Type.STRING,"银河系");
//修改表
AlterTableResponse response = kuduClient.alterTable("aa_users",alterTableOptions);
System.out.println(response.getTableId());//80a90f5ff44a4432a21fff322c8f1659
}
//endregion
//region 删除列
@Test
public void alterKuduTableDropColumn() throws KuduException{
//删除列
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.dropColumn("address");
//修改表
AlterTableResponse response = kuduClient.alterTable("aa_users",alterTableOptions);
System.out.println(response.getTableId());
}
//endregion
//region 释放资源
@After
public void close() throws KuduException {
if (kuduClient != null) {
kuduClient.close();//测试完成后释放资源
}
}
//endregion
}
操作Kudu数据
package com.aa.kudu.data;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Random;
public class kuduDataDemo {
//定义kuduClient实例对象
private KuduClient kuduClient = null;
//region 封装方法
private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
//创建ColumnSchemaBuilder实例对象
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);//设置是否为主键
//构建ColumnSchema
return column.build();
}
//endregion
//region Before操作初始化
@Before
public void init() {
//KuduMaster地址信息
String masterAddresses = "192.168.88.20:7051";
//初始化KuduClient实例对象
kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)//建造者模式
.defaultOperationTimeoutMs(10000)//设置Kudu操作的超时时间,默认30s
.defaultSocketReadTimeoutMs(6000)//设置从Socket读数据超时,默认10s
.build();//返回KuduClient类型
}
//endregion
//region 测试单条插入insert
@Test
public void insertKuduSingleData() throws KuduException {
//获取操作句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取kuduSession的实例对象
KuduSession kuduSession = kuduClient.newSession();
//获取Insert对象
Insert insert = kuduTable.newInsert();
//获取Row对象
PartialRow insertRow = insert.getRow();
//设置值
insertRow.addInt("id", 10001);
insertRow.addString("name", "张三");
insertRow.addByte("age", (byte) 25);
//插入数据
kuduSession.apply(insert);
kuduSession.apply(insert);
//关闭连接
kuduSession.close();
}
//endregion
// region 测试批量插入insert
@Test
public void insertKuduBatchData() throws KuduException {
//获取操作句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
//设置手动提交、手动刷新数据
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
//设置缓存的数据量
kuduSession.setMutationBufferSpace(1000);
Random random = new Random();
for (int i = 0; i < 100; i++) {
//java.lang.IllegalStateException: This row was already applied and cannot be modified.
//插入数据前获取Insert对象...每次都需要重新获取对象,否则会报错非法状态异常
Insert insert = kuduTable.newInsert();
//获取Row对象
PartialRow insertRow = insert.getRow();
//设置值
insertRow.addInt("id", 100 + i);
insertRow.addString("name", "张三" + i);
insertRow.addByte("age", (byte) (random.nextInt(10) + 21));
//插入数据
kuduSession.apply(insert);
}
//手动提交
kuduSession.flush();
//关闭连接
kuduSession.close();
}
//endregion
//region 全量查询数据query
@Test
public void queryKuduFullData() throws KuduException {
//获取表的句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取扫描器对象
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
KuduScanner kuduScanner = scannerBuilder.build();
//遍历获取的数据
int i = 0;
while (kuduScanner.hasMoreRows()){//判断是否还有表的Table数据未获取
i++;
System.out.println("tablet index = " + i);
//获取每个tablet中扫描的数据
RowResultIterator rowResults = kuduScanner.nextRows();
//遍历每个Tablet中的数据
while (rowResults.hasNext()){
RowResult rowResult = rowResults.next();
System.out.println(
"id = " + rowResult.getInt("id") +
", name = " + rowResult.getString("name") +
", age = " + rowResult.getByte("age")
);
}
}
}
//endregion
//region Java过滤查询Kudu数据filter query
//Kudu/SQL中,选取字段称为project投影,选择字段
//Kudu/SQL中,过滤字段称为predicate谓词,过滤条件
@Test
public void queryKuduData() throws KuduException{
//获取表的句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取扫描器对象
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
//设置过滤条件
scannerBuilder.setProjectedColumnNames(Arrays.asList("id","age"));
scannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
newColumnSchema("id", Type.INT32,true),//列架构
KuduPredicate.ComparisonOp.GREATER,
150//id>150
)
);
scannerBuilder.addPredicate(
KuduPredicate.newComparisonPredicate(
newColumnSchema("age",Type.INT8,false),//列架构
KuduPredicate.ComparisonOp.LESS,
(byte)25//age<25
)
);
KuduScanner kuduScanner = scannerBuilder.build();//构造kudu扫描器对象
int i = 0;
while (kuduScanner.hasMoreRows()){//判断是否还有表的Tablet数据未获取
i++;
System.out.println("tablet index = " + i);
//获取每个tablet中扫描的数据
RowResultIterator rowResults = kuduScanner.nextRows();
//遍历每个Tablet中的数据
while (rowResults.hasNext()){
RowResult rowResult = rowResults.next();
System.out.println(
"id = " + rowResult.getInt("id") +
", age = " + rowResult.getByte("age")
);
}
}
}
//endregion
//region 更新Kudu表数据update
@Test
public void updateKuduData() throws KuduException{
//获取操作句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
//获取更新数据update对象
Update newUpdate = kuduTable.newUpdate();
//获取Row对象
PartialRow updateRow = newUpdate.getRow();
//设置更新的数据
updateRow.addInt("id",153);
updateRow.addString("name","zhangsan153");
//更新数据
kuduSession.apply(newUpdate);
//关闭连接
kuduSession.close();
}
//endregion
//region 主键存在更新数据/主键不存在插入数据upsert
@Test
public void upsertKuduData() throws KuduException{
//获取操作表的句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取KuduSession实例对象
KuduSession kuduSession = kuduClient.newSession();
//获取更新数据update对象
Upsert newUpsert = kuduTable.newUpsert();
//获取Row对象
PartialRow upsertRow = newUpsert.getRow();
//设置更新的数据
upsertRow.addInt("id",25);
upsertRow.addString("name","李四");
upsertRow.addByte("age",(byte)50);
//更新数据
kuduSession.apply(newUpsert);
kuduSession.flush();//手动刷新
//关闭连接
kuduSession.close();
}
//endregion
//region 按照主键id删除Kudu表数据
@Test
public void deleteKuduData() throws KuduException{
//获取操作表句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取KuduSession对象
KuduSession kuduSession = kuduClient.newSession();
//获取删除数据对象
Delete newDelete = kuduTable.newDelete();
//获取Row对象
PartialRow deleteRow = newDelete.getRow();
//设置主键
deleteRow.addInt("id",153);
//更新数据
kuduSession.apply(newDelete);
kuduSession.flush();
//关闭连接
kuduSession.close();
}
//endregion
//region 释放资源
@After
public void close() throws KuduException {
if (kuduClient != null) {
kuduClient.close();//测试完成后释放资源
}
}
//endregion
}
最后
以上就是体贴钢铁侠为你收集整理的使用Java API 操作Kudu概述Maven依赖操作Kudu表操作Kudu数据的全部内容,希望文章能够帮你解决使用Java API 操作Kudu概述Maven依赖操作Kudu表操作Kudu数据所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复