我是靠谱客的博主 贤惠白羊,最近开发中收集的这篇文章主要介绍阿里云RRpc请求消息普适发送端,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在阿里云的open api调用界面,我们可以找到请求消息发送的页面,但如果每次都去访问页面调试效率不高,所以做了这个RRpc请求消息发送端,可以更快的向设备发送请求消息。应用界面如下:

线程中的RRpc发送程序是从阿里云的实例SDK上截取的,这里是利用Pyqt5给它做了发送界面。

源代码如下:

# -*- coding: utf-8 -*-

# Form implementation generated from reading ui file 'untitled1.ui'
#
# Created by: PyQt5 UI code generator 5.14.0
#
# WARNING! All changes made in this file will be lost!


from PyQt5 import QtCore,  QtWidgets
import base64
from typing import List
from alibabacloud_iot20180120.client import Client as Iot20180120Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_iot20180120 import models as iot_20180120_models
from alibabacloud_tea_util import models as util_models
import time
from PyQt5.QtGui import QIcon
from PyQt5.QtCore import QThread, pyqtSignal


userdata=[1,2,3,4,5,6,7]
# 创建线程1的类
class Worker(QThread):
    sig = pyqtSignal(str)

    def __init__(self, parent=None):
        super(Worker, self).__init__(parent)
        self.count = 0

    def run(self):

        try:
            # -*- coding: utf-8 -*-
            # This file is auto-generated, don't edit it. Thanks.

            class Sample:
                def __init__(self):
                    pass

                @staticmethod
                def create_client(
                        access_key_id: str,
                        access_key_secret: str,
                ) -> Iot20180120Client:
                    """
                    使用AK&SK初始化账号Client
                    @param access_key_id:
                    @param access_key_secret:
                    @return: Client
                    @throws Exception
                    """
                    config = open_api_models.Config(
                        # 必填,您的 AccessKey ID,
                        access_key_id=str(userdata[0]),
                        # 必填,您的 AccessKey Secret,
                        access_key_secret=str(userdata[1])
                    )
                    # 访问的域名
                    config.endpoint = f'iot.cn-shanghai.aliyuncs.com'
                    return Iot20180120Client(config)

                @staticmethod
                def main(
                        args: List[str],
                ) -> None:
                    # 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html
                    # 获取 AK 链接:https://usercenter.console.aliyun.com
                    client = Sample.create_client('accessKeyId', 'accessKeySecret')
                    rrpc_request = iot_20180120_models.RRpcRequest(
                        product_key=str(userdata[2]),
                        device_name=str(userdata[3]),
                        timeout=str(userdata[4]),
                        request_base_64byte=str(userdata[5]),
                        iot_instance_id=str(userdata[6])
                    )
                    runtime = util_models.RuntimeOptions()
                    try:
                        # 复制代码运行请自行打印 API 的返回值
                        self.sig.emit(base64.b64decode(
                            client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode())

                    except Exception as error:
                        # 如有需要,请打印 error
                        self.sig.emit(error.message)

                @staticmethod
                async def main_async(
                        args: List[str],
                ) -> None:
                    # 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html
                    # 获取 AK 链接:https://usercenter.console.aliyun.com
                    client = Sample.create_client('accessKeyId', 'accessKeySecret')
                    rrpc_request = iot_20180120_models.RRpcRequest(
                        product_key=str(userdata[2]),
                        device_name=str(userdata[3]),
                        timeout=str(userdata[4]),
                        request_base_64byte=str(userdata[5]),
                        iot_instance_id=str(userdata[6])
                    )
                    runtime = util_models.RuntimeOptions()
                    try:
                        # 复制代码运行请自行打印 API 的返回值
                        await self.sig.emit(base64.b64decode(
                            client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode())

                    except Exception as error:
                        # 如有需要,请打印 error
                        self.sig.emit(error.message)

            if __name__ == '__main__':
                Sample.main(sys.argv[1:])
        except:self.sig.emit('error:请查看输入信息是否正确')


class Ui_MainWindow(object):
    def __init__(self):
        super(Ui_MainWindow,self).__init__()
        self.thread = Worker()
        self.thread.sig.connect(self.updateLabel)


    def setupUi(self, MainWindow):
        MainWindow.setWindowIcon(QIcon("./birdicon.ico"))
        MainWindow.setObjectName("MainWindow")
        MainWindow.resize(800, 800)
        self.centralwidget = QtWidgets.QWidget(MainWindow)
        self.centralwidget.setObjectName("centralwidget")
        self.pushButton = QtWidgets.QPushButton(self.centralwidget)
        self.pushButton.setGeometry(QtCore.QRect(80, 610, 141, 51))
        self.pushButton.setObjectName("pushButton")
        self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
        self.pushButton_2.setGeometry(QtCore.QRect(80, 680, 141, 51))
        self.pushButton_2.setObjectName("pushButton_2")
        self.label = QtWidgets.QLabel(self.centralwidget)
        self.label.setGeometry(QtCore.QRect(10, 40, 150, 31))
        self.label.setObjectName("label")
        self.label_2 = QtWidgets.QLabel(self.centralwidget)
        self.label_2.setGeometry(QtCore.QRect(10, 140, 150, 31))
        self.label_2.setObjectName("label_2")
        self.label_3 = QtWidgets.QLabel(self.centralwidget)
        self.label_3.setGeometry(QtCore.QRect(10, 240, 150, 31))
        self.label_3.setObjectName("label_3")
        self.label_4 = QtWidgets.QLabel(self.centralwidget)
        self.label_4.setGeometry(QtCore.QRect(10, 340, 150, 31))
        self.label_4.setObjectName("label_4")
        self.label_timeout = QtWidgets.QLabel(self.centralwidget)
        self.label_timeout.setGeometry(QtCore.QRect(440, 540, 150, 31))
        self.label_timeout.setObjectName("label_timeout")
        self.label_6 = QtWidgets.QLabel(self.centralwidget)
        self.label_6.setGeometry(QtCore.QRect(10, 440, 150, 31))
        self.label_6.setObjectName("label_6")
        self.label_7 = QtWidgets.QLabel(self.centralwidget)
        self.label_7.setGeometry(QtCore.QRect(10, 540, 150, 31))
        self.label_7.setObjectName("label_7")
        self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit.setGeometry(QtCore.QRect(160, 40, 260, 41))
        self.lineEdit.setObjectName("lineEdit")
        self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_2.setGeometry(QtCore.QRect(160, 140, 260, 41))
        self.lineEdit_2.setObjectName("lineEdit_2")
        self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_3.setGeometry(QtCore.QRect(160, 240, 260, 41))
        self.lineEdit_3.setObjectName("lineEdit_3")
        self.lineEdit_4 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_4.setGeometry(QtCore.QRect(160, 340, 260, 41))
        self.lineEdit_4.setObjectName("lineEdit_5")
        self.lineEdit_timeout = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_timeout.setGeometry(QtCore.QRect(600, 540, 151, 41))
        self.lineEdit_timeout.setObjectName("lineEdit_timeout")
        self.lineEdit_6 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_6.setGeometry(QtCore.QRect(160, 440, 260, 41))
        self.lineEdit_6.setObjectName("lineEdit_6")
        self.lineEdit_7 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_7.setGeometry(QtCore.QRect(160, 540, 260, 41))
        self.lineEdit_7.setObjectName("lineEdit_7")
        self.textBrowser = QtWidgets.QTextBrowser(self.centralwidget)
        self.textBrowser.setGeometry(QtCore.QRect(440, 40, 311, 431))
        self.textBrowser.setObjectName("textBrowser")
        MainWindow.setCentralWidget(self.centralwidget)
        self.menubar = QtWidgets.QMenuBar(MainWindow)
        self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 26))
        self.menubar.setObjectName("menubar")
        MainWindow.setMenuBar(self.menubar)
        self.statusbar = QtWidgets.QStatusBar(MainWindow)
        self.statusbar.setObjectName("statusbar")
        MainWindow.setStatusBar(self.statusbar)

        self.retranslateUi(MainWindow)
        QtCore.QMetaObject.connectSlotsByName(MainWindow)

    def retranslateUi(self, MainWindow):
        _translate = QtCore.QCoreApplication.translate
        MainWindow.setWindowTitle(_translate("MainWindow", "阿里云物联网平台RRPC请求消息发送调试工具 元孜然 V:1.0.1"))
        self.pushButton.setText(_translate("MainWindow", "发送"))
        self.pushButton.clicked.connect(self.test)
        self.pushButton_2.setText(_translate("MainWindow", "退出"))
        self.pushButton_2.clicked.connect(self.ex)
        self.label.setText(_translate("MainWindow", "用户名AccessKey"))
        self.label_2.setText(_translate("MainWindow", "密匙AccessSecret"))
        self.label_3.setText(_translate("MainWindow", "产品名ProductKey"))
        self.label_4.setText(_translate("MainWindow", "设备名DeviceName"))
        self.label_timeout.setText(_translate("MainWindow", "等待响应时间/msnTimeout(<8000)"))
        self.label_6.setText(_translate("MainWindow", "Base64编码消息nRequestBase64Byte"))
        self.label_7.setText(_translate("MainWindow", "实例名iotInstanceId"))



    def test(self):
        userdata[0] = str(self.lineEdit.text())
        userdata[1] = str(self.lineEdit_2.text())
        userdata[2] = str(self.lineEdit_3.text())
        userdata[3] = str(self.lineEdit_4.text())
        userdata[4] = str(self.lineEdit_timeout.text())
        userdata[5] = str(self.lineEdit_6.text())
        userdata[6] = str(self.lineEdit_7.text())
        self.thread.start()

    def updateLabel(self, text):
        self.textBrowser.append('received a message: '+text)
        self.cursor = self.textBrowser.textCursor()
        self.textBrowser.moveCursor(self.cursor.End)
        time.sleep(0.2)  # 光标移到最后,这样就会自动显示出来


    def ex(self):
        sys.exit()






if __name__ == '__main__':
    import sys
    from PyQt5.QtWidgets import QApplication, QMainWindow
    app = QApplication(sys.argv)
    MainWindow = QMainWindow()
    MainWindow.setStyleSheet("#MainWindow{border-image:url(bak.jpg)}")
    ui = Ui_MainWindow()
    ui.setupUi(MainWindow)
    MainWindow.show()
    sys.exit(app.exec_())

百度网盘:百度网盘 请输入提取码

 提取码:yzr1

最后

以上就是贤惠白羊为你收集整理的阿里云RRpc请求消息普适发送端的全部内容,希望文章能够帮你解决阿里云RRpc请求消息普适发送端所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部