概述
为了确保遥测数据被thingsboard接收到,在设备以及thingsboard之间传递时序数据是有确认机制的,具体在transport-mqtt模块中,如下所示:
下面看下MqttTransportHandler类相关部分代码
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
return;
}
String topicName = mqttMsg.variableHeader().topicName();
int msgId = mqttMsg.variableHeader().packetId();
log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
// 通过网关上传数据
handleGatewayPublishMsg(topicName, msgId, mqttMsg);
transportService.reportActivity(sessionInfo);
}
} else {
// 通过设备上传数据
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
}
下面看下processDevicePublish相关部分,从下面的代码可以看到端倪:将遥测数据发送队列后会执行getPubAckCallback进行数据ack操作
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
// 发送遥测数据
if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
// getPubAckCallback是成功回调
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
}
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
return new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void dummy) {
log.trace("[{}] Published msg: {}", sessionId, msg);
if (msgId > 0) {
// 发送回执
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
}
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
processDisconnect(ctx);
}
};
}
最后
以上就是高挑乌冬面为你收集整理的thingsboard遥测数据确认机制的全部内容,希望文章能够帮你解决thingsboard遥测数据确认机制所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复