概述
在阿里云的open api调用界面,我们可以找到请求消息发送的页面,但如果每次都去访问页面调试效率不高,所以做了这个RRpc请求消息发送端,可以更快的向设备发送请求消息。应用界面如下:
线程中的RRpc发送程序是从阿里云的实例SDK上截取的,这里是利用Pyqt5给它做了发送界面。
源代码如下:
# -*- coding: utf-8 -*- # Form implementation generated from reading ui file 'untitled1.ui' # # Created by: PyQt5 UI code generator 5.14.0 # # WARNING! All changes made in this file will be lost! from PyQt5 import QtCore, QtWidgets import base64 from typing import List from alibabacloud_iot20180120.client import Client as Iot20180120Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_iot20180120 import models as iot_20180120_models from alibabacloud_tea_util import models as util_models import time from PyQt5.QtGui import QIcon from PyQt5.QtCore import QThread, pyqtSignal userdata=[1,2,3,4,5,6,7] # 创建线程1的类 class Worker(QThread): sig = pyqtSignal(str) def __init__(self, parent=None): super(Worker, self).__init__(parent) self.count = 0 def run(self): try: # -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. class Sample: def __init__(self): pass @staticmethod def create_client( access_key_id: str, access_key_secret: str, ) -> Iot20180120Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 必填,您的 AccessKey ID, access_key_id=str(userdata[0]), # 必填,您的 AccessKey Secret, access_key_secret=str(userdata[1]) ) # 访问的域名 config.endpoint = f'iot.cn-shanghai.aliyuncs.com' return Iot20180120Client(config) @staticmethod def main( args: List[str], ) -> None: # 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html # 获取 AK 链接:https://usercenter.console.aliyun.com client = Sample.create_client('accessKeyId', 'accessKeySecret') rrpc_request = iot_20180120_models.RRpcRequest( product_key=str(userdata[2]), device_name=str(userdata[3]), timeout=str(userdata[4]), request_base_64byte=str(userdata[5]), iot_instance_id=str(userdata[6]) ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 self.sig.emit(base64.b64decode( client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode()) except Exception as error: # 如有需要,请打印 error self.sig.emit(error.message) @staticmethod async def main_async( args: List[str], ) -> None: # 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html # 获取 AK 链接:https://usercenter.console.aliyun.com client = Sample.create_client('accessKeyId', 'accessKeySecret') rrpc_request = iot_20180120_models.RRpcRequest( product_key=str(userdata[2]), device_name=str(userdata[3]), timeout=str(userdata[4]), request_base_64byte=str(userdata[5]), iot_instance_id=str(userdata[6]) ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 await self.sig.emit(base64.b64decode( client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode()) except Exception as error: # 如有需要,请打印 error self.sig.emit(error.message) if __name__ == '__main__': Sample.main(sys.argv[1:]) except:self.sig.emit('error:请查看输入信息是否正确') class Ui_MainWindow(object): def __init__(self): super(Ui_MainWindow,self).__init__() self.thread = Worker() self.thread.sig.connect(self.updateLabel) def setupUi(self, MainWindow): MainWindow.setWindowIcon(QIcon("./birdicon.ico")) MainWindow.setObjectName("MainWindow") MainWindow.resize(800, 800) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") self.pushButton = QtWidgets.QPushButton(self.centralwidget) self.pushButton.setGeometry(QtCore.QRect(80, 610, 141, 51)) self.pushButton.setObjectName("pushButton") self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget) self.pushButton_2.setGeometry(QtCore.QRect(80, 680, 141, 51)) self.pushButton_2.setObjectName("pushButton_2") self.label = QtWidgets.QLabel(self.centralwidget) self.label.setGeometry(QtCore.QRect(10, 40, 150, 31)) self.label.setObjectName("label") self.label_2 = QtWidgets.QLabel(self.centralwidget) self.label_2.setGeometry(QtCore.QRect(10, 140, 150, 31)) self.label_2.setObjectName("label_2") self.label_3 = QtWidgets.QLabel(self.centralwidget) self.label_3.setGeometry(QtCore.QRect(10, 240, 150, 31)) self.label_3.setObjectName("label_3") self.label_4 = QtWidgets.QLabel(self.centralwidget) self.label_4.setGeometry(QtCore.QRect(10, 340, 150, 31)) self.label_4.setObjectName("label_4") self.label_timeout = QtWidgets.QLabel(self.centralwidget) self.label_timeout.setGeometry(QtCore.QRect(440, 540, 150, 31)) self.label_timeout.setObjectName("label_timeout") self.label_6 = QtWidgets.QLabel(self.centralwidget) self.label_6.setGeometry(QtCore.QRect(10, 440, 150, 31)) self.label_6.setObjectName("label_6") self.label_7 = QtWidgets.QLabel(self.centralwidget) self.label_7.setGeometry(QtCore.QRect(10, 540, 150, 31)) self.label_7.setObjectName("label_7") self.lineEdit = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit.setGeometry(QtCore.QRect(160, 40, 260, 41)) self.lineEdit.setObjectName("lineEdit") self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_2.setGeometry(QtCore.QRect(160, 140, 260, 41)) self.lineEdit_2.setObjectName("lineEdit_2") self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_3.setGeometry(QtCore.QRect(160, 240, 260, 41)) self.lineEdit_3.setObjectName("lineEdit_3") self.lineEdit_4 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_4.setGeometry(QtCore.QRect(160, 340, 260, 41)) self.lineEdit_4.setObjectName("lineEdit_5") self.lineEdit_timeout = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_timeout.setGeometry(QtCore.QRect(600, 540, 151, 41)) self.lineEdit_timeout.setObjectName("lineEdit_timeout") self.lineEdit_6 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_6.setGeometry(QtCore.QRect(160, 440, 260, 41)) self.lineEdit_6.setObjectName("lineEdit_6") self.lineEdit_7 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_7.setGeometry(QtCore.QRect(160, 540, 260, 41)) self.lineEdit_7.setObjectName("lineEdit_7") self.textBrowser = QtWidgets.QTextBrowser(self.centralwidget) self.textBrowser.setGeometry(QtCore.QRect(440, 40, 311, 431)) self.textBrowser.setObjectName("textBrowser") MainWindow.setCentralWidget(self.centralwidget) self.menubar = QtWidgets.QMenuBar(MainWindow) self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 26)) self.menubar.setObjectName("menubar") MainWindow.setMenuBar(self.menubar) self.statusbar = QtWidgets.QStatusBar(MainWindow) self.statusbar.setObjectName("statusbar") MainWindow.setStatusBar(self.statusbar) self.retranslateUi(MainWindow) QtCore.QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): _translate = QtCore.QCoreApplication.translate MainWindow.setWindowTitle(_translate("MainWindow", "阿里云物联网平台RRPC请求消息发送调试工具 元孜然 V:1.0.1")) self.pushButton.setText(_translate("MainWindow", "发送")) self.pushButton.clicked.connect(self.test) self.pushButton_2.setText(_translate("MainWindow", "退出")) self.pushButton_2.clicked.connect(self.ex) self.label.setText(_translate("MainWindow", "用户名AccessKey")) self.label_2.setText(_translate("MainWindow", "密匙AccessSecret")) self.label_3.setText(_translate("MainWindow", "产品名ProductKey")) self.label_4.setText(_translate("MainWindow", "设备名DeviceName")) self.label_timeout.setText(_translate("MainWindow", "等待响应时间/msnTimeout(<8000)")) self.label_6.setText(_translate("MainWindow", "Base64编码消息nRequestBase64Byte")) self.label_7.setText(_translate("MainWindow", "实例名iotInstanceId")) def test(self): userdata[0] = str(self.lineEdit.text()) userdata[1] = str(self.lineEdit_2.text()) userdata[2] = str(self.lineEdit_3.text()) userdata[3] = str(self.lineEdit_4.text()) userdata[4] = str(self.lineEdit_timeout.text()) userdata[5] = str(self.lineEdit_6.text()) userdata[6] = str(self.lineEdit_7.text()) self.thread.start() def updateLabel(self, text): self.textBrowser.append('received a message: '+text) self.cursor = self.textBrowser.textCursor() self.textBrowser.moveCursor(self.cursor.End) time.sleep(0.2) # 光标移到最后,这样就会自动显示出来 def ex(self): sys.exit() if __name__ == '__main__': import sys from PyQt5.QtWidgets import QApplication, QMainWindow app = QApplication(sys.argv) MainWindow = QMainWindow() MainWindow.setStyleSheet("#MainWindow{border-image:url(bak.jpg)}") ui = Ui_MainWindow() ui.setupUi(MainWindow) MainWindow.show() sys.exit(app.exec_())
百度网盘:百度网盘 请输入提取码
提取码:yzr1
最后
以上就是贤惠白羊为你收集整理的阿里云RRpc请求消息普适发送端的全部内容,希望文章能够帮你解决阿里云RRpc请求消息普适发送端所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复