概述
1.声明
当前内容主要为基于Session方式操作当前的IoTDB
2.pom依赖
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.11.1</version>
</dependency>
3.基本demo
主要的Recordable接口
public interface Recordable {
Record toRecord();
}
基本的Record类
public class Record{
private String deviceId;
private long time;
private List<String> measurements;
private List<String> values;
// 省略get、set、方法
}
主要的实体类
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
public class Machine implements Recordable {
private String name;
private Float temperature;
private Boolean status;
private Date timestamp;
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Float getTemperature() {
return temperature;
}
public void setTemperature(Float temperature) {
this.temperature = temperature;
}
public Boolean getStatus() {
return status;
}
@Override
public String toString() {
return "Machine [name=" + name + ", temperature=" + temperature + ", status=" + status + "]";
}
public void setStatus(Boolean status) {
this.status = status;
}
public Machine() {
super();
// TODO Auto-generated constructor stub
}
public Machine(String name, Float temperature, Boolean status) {
super();
this.name = name;
this.temperature = temperature;
this.status = status;
this.timestamp = new Date();
}
public Record toRecord() {
// TODO Auto-generated method stub
DateFormat format = new SimpleDateFormat("yyyyMMddHHmmssSSS");
Record record = new Record();
record.setDeviceId("root.test.machine");
record.setTime(Long.parseLong(format.format(this.getTimestamp())));
record.setMeasurements(Arrays.asList("name", "temperature", "status"));
record.setValues(Arrays.asList(this.getName(), this.getTemperature().toString(), this.getStatus().toString()));
return record;
}
}
主要的测试类:
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.SessionUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import com.hy.springboot.iotdb.Machine;
import com.hy.springboot.iotdb.Record;
import com.hy.springboot.iotdb.Recordable;
/**
*
* @author hy
* @createTime 2021-03-06 15:04:38
* @description 当前内容为主要使用iotdb的session方式操作iotdb时序数据库(存在问题,无法搭建iotDB集群问题,很严重)
*
*/
public class IOTDBSessionConnectionTest {
private static final String host = "192.168.1.101";
private static final int rpcPort = 6667;
private static final String username = "root";
private static final String password = "root";
public static void main(String[] args) throws InterruptedException {
Session session = new Session(host, rpcPort, username, password);
try {
session.open();
String storageGroupId = "root.test";
// 1.创建一个存储组
session.setStorageGroup(storageGroupId);
// 2.创建一个时序
session.createTimeseries(storageGroupId + ".machine.name", TSDataType.TEXT, TSEncoding.PLAIN,
CompressionType.LZ4);
session.createTimeseries(storageGroupId + ".machine.temperature", TSDataType.FLOAT, TSEncoding.RLE,
CompressionType.LZ4);
session.createTimeseries(storageGroupId + ".machine.status", TSDataType.BOOLEAN, TSEncoding.PLAIN,
CompressionType.LZ4);
// 3.开始添加数据,这里需要休眠,否则数据都是一起的
List<Machine> machines = new ArrayList<Machine>();
machines.add(new Machine("机器1", 400.5f, true));
Thread.sleep(100);
machines.add(new Machine("机器1", 380.5f, true));
Thread.sleep(100);
machines.add(new Machine("机器1", 420.5f, true));
Thread.sleep(100);
machines.add(new Machine("机器1", 390.5f, true));
Thread.sleep(100);
// 4.开始添加数据
// session.insertRecord("root.test.machine", new Date().getTime(), measurements,
// values);
session.insertRecords(getDeviceIds(machines), getTimes(machines), getMeasurementsList(machines),
getValuesList(machines));
//session.insertTablet(tablet);
String sql = String.format("select * from %s.%s", storageGroupId, "machine");
SessionDataSet sessionDataSet = session.executeQueryStatement(sql);
int fetchSize = sessionDataSet.getFetchSize();
List<String> columnNames = sessionDataSet.getColumnNames();
List<TSDataType> columnTypes = sessionDataSet.getColumnTypes();
System.out.println(columnNames);
System.out.println(columnTypes);
if (fetchSize > 0) {
while (sessionDataSet.hasNext()) {
RowRecord next = sessionDataSet.next();
List<Field> fields = next.getFields();
// 查询结果第一个为时间搓
long timestamp = next.getTimestamp();
System.out.println(timestamp + "t");
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
// 这里的需要按照类型获取
System.out.println(field.getObjectValue(field.getDataType()));
}
System.out.println();
}
}
sessionDataSet.closeOperationHandle();
// 删除当前的storage group
sql = String.format("delete storage grup %s", storageGroupId);
session.executeNonQueryStatement(sql);
} catch (IoTDBConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (StatementExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
session.close();
} catch (IoTDBConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private static List<String> getDeviceIds(List<? extends Recordable> records) {
List<String> deviceIds = new ArrayList<String>();
for (Recordable recordable : records) {
Record record = recordable.toRecord();
String deviceId = record.getDeviceId();
deviceIds.add(deviceId);
}
return deviceIds;
}
private static List<Long> getTimes(List<? extends Recordable> records) {
List<Long> times = new ArrayList<Long>();
for (Recordable recordable : records) {
Record record = recordable.toRecord();
times.add(record.getTime());
}
return times;
}
private static List<List<String>> getMeasurementsList(List<? extends Recordable> records) {
List<List<String>> measurementsList = new ArrayList<List<String>>();
for (Recordable recordable : records) {
Record record = recordable.toRecord();
measurementsList.add(record.getMeasurements());
}
return measurementsList;
}
private static List<List<String>> getValuesList(List<? extends Recordable> records) {
List<List<String>> valuesList = new ArrayList<List<String>>();
for (Recordable recordable : records) {
Record record = recordable.toRecord();
valuesList.add(record.getValues());
}
return valuesList;
}
}
执行结果:
[Time, root.test.machine.name, root.test.machine.temperature, root.test.machine.status]
[INT64, TEXT, FLOAT, BOOLEAN]
20210307084508369
机器1
400.5
true
20210307084508479
机器1
380.5
true
20210307084508589
机器1
420.5
true
20210307084508698
机器1
390.5
true
4.总结
1.在创建timeseries的时候还是需要填写全名称:root.储存组.存储数据库.字段
2.感觉session方式就是一种直接解析方式,不需要解析sql
3.通过查看官方文档,发现官方文档与实际的发布东西有点不匹配,无法搭建集群无法测试集群,并且发行版总没有一些启动或者配置文件(时间2021/03/07)
最后
以上就是疯狂裙子为你收集整理的Apache IoTDB:使用Session方式执行操作的全部内容,希望文章能够帮你解决Apache IoTDB:使用Session方式执行操作所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复