1.同步调用场景
1.1 背景
MQTT协议是基于PUB/SUB的异步通信模式,无法实现服务端下发指令给设备端,同时需要设备端返回响应结果的场景。
IoT物联网平台基于MQTT协议制定了一套请求和响应的同步机制,无需改动MQTT协议即可实现同步通信。应用服务器通过POP API发起Rrpc调用,IoT设备端只需要在Timeout内,按照固定的格式回复Pub消息,服务端即可同步获取IoT设备端的响应结果。
具体流程如下:

1.2 Topic格式约定
请求:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}
$表示变量,每个设备不同messageId为IoT平台生成的消息ID,设备端回复responseTopic里的messageId要与requestTopic一致
示例:
设备端需要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设备收到Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到消息后,在timeout时间内回复Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
2.同步调用RRPC示例
2.1 设备端代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23const mqtt = require('aliyun-iot-mqtt'); //设备属性 const options = require("./iot-device-config.json"); //建立连接 const client = mqtt.getAliyunIotMqttClient(options); client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`) client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) } }) function handleRrpc(topic, message){ topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //普通Rrpc,响应payload自定义 const payloadJson = {code:200,msg:"handle ok"}; client.publish(topic, JSON.stringify(payloadJson)); }
2.2 服务端POP调用Rrpc
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32const co = require('co'); const RPCClient = require('@alicloud/pop-core').RPCClient; const options = require("./iot-ak-config.json"); //1.初始化client const client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2017-04-20' }); const payload = { "msg": "hello Rrpc" }; //2.构建request const params = { ProductKey:"a1gMu82K4m2", DeviceName:"h5@nuwr5r9hf6l@1532088166923", RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"), Timeout:3000 }; co(function*() { //3.发起API调用 const response = yield client.request('Rrpc', params); console.log(JSON.stringify(response)); });
rrpc响应:
复制代码
1
2
3
4
5
6
7
8
9
10{ "MessageId": "1037292594536681472", "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432", "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9", "Success": true, "RrpcCode": "SUCCESS" } // PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
3.物模型-服务同步调用InvokeThingService示例
注意:物模型 服务调用 接口InvokeThingService,不是Rrpc
3.1 物模型-同步服务定义

3.2 设备端实现
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32const mqtt = require('aliyun-iot-mqtt'); //设备属性 const options = require("./iot-device-config.json"); //建立连接 const client = mqtt.getAliyunIotMqttClient(options); client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`) client.on('message', function(topic, message) { if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){ handleRrpc(topic, message) } }) /* * 如果存在多个同步调用服务,需要通过payload里的method区分 */ function handleRrpc(topic, message){ topic = topic.replace('/request/','/response/'); console.log("topic=" + topic) //物模型 同步服务调用,响应payload结构: const payloadJson = { id: Date.now(), code:200, data: { currentMode: Math.floor((Math.random() * 20) + 10) } } client.publish(topic, JSON.stringify(payloadJson)); }
注意:设备端响应的payload要满足物模型定义的出参结构
3.3 服务端POP 接口InvokeThingService
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31const co = require('co'); const RPCClient = require('@alicloud/pop-core').RPCClient; const options = require("./iot-ak-config.json"); //1.初始化client const client = new RPCClient({ accessKeyId: options.accessKey, secretAccessKey: options.accessKeySecret, endpoint: 'https://iot.cn-shanghai.aliyuncs.com', apiVersion: '2018-01-20' }); const params = { ProductKey: "a1gMu82K4m2", DeviceName: "h5@nuwr5r9hf6l@1532088166923", Args: JSON.stringify({ "mode": "1" }), Identifier: "thing.service.setMode" }; co(function*() { try { //3.发起API调用 const response = yield client.request('InvokeThingService', params); console.log(JSON.stringify(response)); } catch (err) { console.log(err); } });
调用结果:
复制代码
1
2
3
4
5
6
7
8
9{ "Data":{ "Result": "{"currentMode":12}", "MessageId": "1536145625658" }, "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F", "Success": true }
最后
以上就是精明故事最近收集整理的关于基于Pub/Sub模式的阿里云IoT同步调用详解的全部内容,更多相关基于Pub/Sub模式内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复