在阿里云的open api调用界面,我们可以找到请求消息发送的页面,但如果每次都去访问页面调试效率不高,所以做了这个RRpc请求消息发送端,可以更快的向设备发送请求消息。应用界面如下:

线程中的RRpc发送程序是从阿里云的实例SDK上截取的,这里是利用Pyqt5给它做了发送界面。
源代码如下:
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'untitled1.ui'
#
# Created by: PyQt5 UI code generator 5.14.0
#
# WARNING! All changes made in this file will be lost!
from PyQt5 import QtCore, QtWidgets
import base64
from typing import List
from alibabacloud_iot20180120.client import Client as Iot20180120Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_iot20180120 import models as iot_20180120_models
from alibabacloud_tea_util import models as util_models
import time
from PyQt5.QtGui import QIcon
from PyQt5.QtCore import QThread, pyqtSignal
userdata=[1,2,3,4,5,6,7]
# 创建线程1的类
class Worker(QThread):
sig = pyqtSignal(str)
def __init__(self, parent=None):
super(Worker, self).__init__(parent)
self.count = 0
def run(self):
try:
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Iot20180120Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
# 必填,您的 AccessKey ID,
access_key_id=str(userdata[0]),
# 必填,您的 AccessKey Secret,
access_key_secret=str(userdata[1])
)
# 访问的域名
config.endpoint = f'iot.cn-shanghai.aliyuncs.com'
return Iot20180120Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
# 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html
# 获取 AK 链接:https://usercenter.console.aliyun.com
client = Sample.create_client('accessKeyId', 'accessKeySecret')
rrpc_request = iot_20180120_models.RRpcRequest(
product_key=str(userdata[2]),
device_name=str(userdata[3]),
timeout=str(userdata[4]),
request_base_64byte=str(userdata[5]),
iot_instance_id=str(userdata[6])
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
self.sig.emit(base64.b64decode(
client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode())
except Exception as error:
# 如有需要,请打印 error
self.sig.emit(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
# 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html
# 获取 AK 链接:https://usercenter.console.aliyun.com
client = Sample.create_client('accessKeyId', 'accessKeySecret')
rrpc_request = iot_20180120_models.RRpcRequest(
product_key=str(userdata[2]),
device_name=str(userdata[3]),
timeout=str(userdata[4]),
request_base_64byte=str(userdata[5]),
iot_instance_id=str(userdata[6])
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
await self.sig.emit(base64.b64decode(
client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode())
except Exception as error:
# 如有需要,请打印 error
self.sig.emit(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
except:self.sig.emit('error:请查看输入信息是否正确')
class Ui_MainWindow(object):
def __init__(self):
super(Ui_MainWindow,self).__init__()
self.thread = Worker()
self.thread.sig.connect(self.updateLabel)
def setupUi(self, MainWindow):
MainWindow.setWindowIcon(QIcon("./birdicon.ico"))
MainWindow.setObjectName("MainWindow")
MainWindow.resize(800, 800)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.pushButton = QtWidgets.QPushButton(self.centralwidget)
self.pushButton.setGeometry(QtCore.QRect(80, 610, 141, 51))
self.pushButton.setObjectName("pushButton")
self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
self.pushButton_2.setGeometry(QtCore.QRect(80, 680, 141, 51))
self.pushButton_2.setObjectName("pushButton_2")
self.label = QtWidgets.QLabel(self.centralwidget)
self.label.setGeometry(QtCore.QRect(10, 40, 150, 31))
self.label.setObjectName("label")
self.label_2 = QtWidgets.QLabel(self.centralwidget)
self.label_2.setGeometry(QtCore.QRect(10, 140, 150, 31))
self.label_2.setObjectName("label_2")
self.label_3 = QtWidgets.QLabel(self.centralwidget)
self.label_3.setGeometry(QtCore.QRect(10, 240, 150, 31))
self.label_3.setObjectName("label_3")
self.label_4 = QtWidgets.QLabel(self.centralwidget)
self.label_4.setGeometry(QtCore.QRect(10, 340, 150, 31))
self.label_4.setObjectName("label_4")
self.label_timeout = QtWidgets.QLabel(self.centralwidget)
self.label_timeout.setGeometry(QtCore.QRect(440, 540, 150, 31))
self.label_timeout.setObjectName("label_timeout")
self.label_6 = QtWidgets.QLabel(self.centralwidget)
self.label_6.setGeometry(QtCore.QRect(10, 440, 150, 31))
self.label_6.setObjectName("label_6")
self.label_7 = QtWidgets.QLabel(self.centralwidget)
self.label_7.setGeometry(QtCore.QRect(10, 540, 150, 31))
self.label_7.setObjectName("label_7")
self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit.setGeometry(QtCore.QRect(160, 40, 260, 41))
self.lineEdit.setObjectName("lineEdit")
self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_2.setGeometry(QtCore.QRect(160, 140, 260, 41))
self.lineEdit_2.setObjectName("lineEdit_2")
self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_3.setGeometry(QtCore.QRect(160, 240, 260, 41))
self.lineEdit_3.setObjectName("lineEdit_3")
self.lineEdit_4 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_4.setGeometry(QtCore.QRect(160, 340, 260, 41))
self.lineEdit_4.setObjectName("lineEdit_5")
self.lineEdit_timeout = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_timeout.setGeometry(QtCore.QRect(600, 540, 151, 41))
self.lineEdit_timeout.setObjectName("lineEdit_timeout")
self.lineEdit_6 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_6.setGeometry(QtCore.QRect(160, 440, 260, 41))
self.lineEdit_6.setObjectName("lineEdit_6")
self.lineEdit_7 = QtWidgets.QLineEdit(self.centralwidget)
self.lineEdit_7.setGeometry(QtCore.QRect(160, 540, 260, 41))
self.lineEdit_7.setObjectName("lineEdit_7")
self.textBrowser = QtWidgets.QTextBrowser(self.centralwidget)
self.textBrowser.setGeometry(QtCore.QRect(440, 40, 311, 431))
self.textBrowser.setObjectName("textBrowser")
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 26))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "阿里云物联网平台RRPC请求消息发送调试工具 元孜然 V:1.0.1"))
self.pushButton.setText(_translate("MainWindow", "发送"))
self.pushButton.clicked.connect(self.test)
self.pushButton_2.setText(_translate("MainWindow", "退出"))
self.pushButton_2.clicked.connect(self.ex)
self.label.setText(_translate("MainWindow", "用户名AccessKey"))
self.label_2.setText(_translate("MainWindow", "密匙AccessSecret"))
self.label_3.setText(_translate("MainWindow", "产品名ProductKey"))
self.label_4.setText(_translate("MainWindow", "设备名DeviceName"))
self.label_timeout.setText(_translate("MainWindow", "等待响应时间/msnTimeout(<8000)"))
self.label_6.setText(_translate("MainWindow", "Base64编码消息nRequestBase64Byte"))
self.label_7.setText(_translate("MainWindow", "实例名iotInstanceId"))
def test(self):
userdata[0] = str(self.lineEdit.text())
userdata[1] = str(self.lineEdit_2.text())
userdata[2] = str(self.lineEdit_3.text())
userdata[3] = str(self.lineEdit_4.text())
userdata[4] = str(self.lineEdit_timeout.text())
userdata[5] = str(self.lineEdit_6.text())
userdata[6] = str(self.lineEdit_7.text())
self.thread.start()
def updateLabel(self, text):
self.textBrowser.append('received a message: '+text)
self.cursor = self.textBrowser.textCursor()
self.textBrowser.moveCursor(self.cursor.End)
time.sleep(0.2) # 光标移到最后,这样就会自动显示出来
def ex(self):
sys.exit()
if __name__ == '__main__':
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
app = QApplication(sys.argv)
MainWindow = QMainWindow()
MainWindow.setStyleSheet("#MainWindow{border-image:url(bak.jpg)}")
ui = Ui_MainWindow()
ui.setupUi(MainWindow)
MainWindow.show()
sys.exit(app.exec_())
百度网盘:百度网盘 请输入提取码
提取码:yzr1
最后
以上就是贤惠白羊最近收集整理的关于阿里云RRpc请求消息普适发送端的全部内容,更多相关阿里云RRpc请求消息普适发送端内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复