在阿里云的open api调用界面,我们可以找到请求消息发送的页面,但如果每次都去访问页面调试效率不高,所以做了这个RRpc请求消息发送端,可以更快的向设备发送请求消息。应用界面如下:
线程中的RRpc发送程序是从阿里云的实例SDK上截取的,这里是利用Pyqt5给它做了发送界面。
源代码如下:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243# -*- coding: utf-8 -*- # Form implementation generated from reading ui file 'untitled1.ui' # # Created by: PyQt5 UI code generator 5.14.0 # # WARNING! All changes made in this file will be lost! from PyQt5 import QtCore, QtWidgets import base64 from typing import List from alibabacloud_iot20180120.client import Client as Iot20180120Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_iot20180120 import models as iot_20180120_models from alibabacloud_tea_util import models as util_models import time from PyQt5.QtGui import QIcon from PyQt5.QtCore import QThread, pyqtSignal userdata=[1,2,3,4,5,6,7] # 创建线程1的类 class Worker(QThread): sig = pyqtSignal(str) def __init__(self, parent=None): super(Worker, self).__init__(parent) self.count = 0 def run(self): try: # -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. class Sample: def __init__(self): pass @staticmethod def create_client( access_key_id: str, access_key_secret: str, ) -> Iot20180120Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 必填,您的 AccessKey ID, access_key_id=str(userdata[0]), # 必填,您的 AccessKey Secret, access_key_secret=str(userdata[1]) ) # 访问的域名 config.endpoint = f'iot.cn-shanghai.aliyuncs.com' return Iot20180120Client(config) @staticmethod def main( args: List[str], ) -> None: # 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html # 获取 AK 链接:https://usercenter.console.aliyun.com client = Sample.create_client('accessKeyId', 'accessKeySecret') rrpc_request = iot_20180120_models.RRpcRequest( product_key=str(userdata[2]), device_name=str(userdata[3]), timeout=str(userdata[4]), request_base_64byte=str(userdata[5]), iot_instance_id=str(userdata[6]) ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 self.sig.emit(base64.b64decode( client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode()) except Exception as error: # 如有需要,请打印 error self.sig.emit(error.message) @staticmethod async def main_async( args: List[str], ) -> None: # 初始化 Client,采用 AK&SK 鉴权访问的方式,此方式可能会存在泄漏风险,建议使用 STS 方式。鉴权访问方式请参考:https://help.aliyun.com/document_detail/378659.html # 获取 AK 链接:https://usercenter.console.aliyun.com client = Sample.create_client('accessKeyId', 'accessKeySecret') rrpc_request = iot_20180120_models.RRpcRequest( product_key=str(userdata[2]), device_name=str(userdata[3]), timeout=str(userdata[4]), request_base_64byte=str(userdata[5]), iot_instance_id=str(userdata[6]) ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 await self.sig.emit(base64.b64decode( client.r_rpc_with_options(rrpc_request, runtime).body.payload_base_64byte).decode()) except Exception as error: # 如有需要,请打印 error self.sig.emit(error.message) if __name__ == '__main__': Sample.main(sys.argv[1:]) except:self.sig.emit('error:请查看输入信息是否正确') class Ui_MainWindow(object): def __init__(self): super(Ui_MainWindow,self).__init__() self.thread = Worker() self.thread.sig.connect(self.updateLabel) def setupUi(self, MainWindow): MainWindow.setWindowIcon(QIcon("./birdicon.ico")) MainWindow.setObjectName("MainWindow") MainWindow.resize(800, 800) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") self.pushButton = QtWidgets.QPushButton(self.centralwidget) self.pushButton.setGeometry(QtCore.QRect(80, 610, 141, 51)) self.pushButton.setObjectName("pushButton") self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget) self.pushButton_2.setGeometry(QtCore.QRect(80, 680, 141, 51)) self.pushButton_2.setObjectName("pushButton_2") self.label = QtWidgets.QLabel(self.centralwidget) self.label.setGeometry(QtCore.QRect(10, 40, 150, 31)) self.label.setObjectName("label") self.label_2 = QtWidgets.QLabel(self.centralwidget) self.label_2.setGeometry(QtCore.QRect(10, 140, 150, 31)) self.label_2.setObjectName("label_2") self.label_3 = QtWidgets.QLabel(self.centralwidget) self.label_3.setGeometry(QtCore.QRect(10, 240, 150, 31)) self.label_3.setObjectName("label_3") self.label_4 = QtWidgets.QLabel(self.centralwidget) self.label_4.setGeometry(QtCore.QRect(10, 340, 150, 31)) self.label_4.setObjectName("label_4") self.label_timeout = QtWidgets.QLabel(self.centralwidget) self.label_timeout.setGeometry(QtCore.QRect(440, 540, 150, 31)) self.label_timeout.setObjectName("label_timeout") self.label_6 = QtWidgets.QLabel(self.centralwidget) self.label_6.setGeometry(QtCore.QRect(10, 440, 150, 31)) self.label_6.setObjectName("label_6") self.label_7 = QtWidgets.QLabel(self.centralwidget) self.label_7.setGeometry(QtCore.QRect(10, 540, 150, 31)) self.label_7.setObjectName("label_7") self.lineEdit = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit.setGeometry(QtCore.QRect(160, 40, 260, 41)) self.lineEdit.setObjectName("lineEdit") self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_2.setGeometry(QtCore.QRect(160, 140, 260, 41)) self.lineEdit_2.setObjectName("lineEdit_2") self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_3.setGeometry(QtCore.QRect(160, 240, 260, 41)) self.lineEdit_3.setObjectName("lineEdit_3") self.lineEdit_4 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_4.setGeometry(QtCore.QRect(160, 340, 260, 41)) self.lineEdit_4.setObjectName("lineEdit_5") self.lineEdit_timeout = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_timeout.setGeometry(QtCore.QRect(600, 540, 151, 41)) self.lineEdit_timeout.setObjectName("lineEdit_timeout") self.lineEdit_6 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_6.setGeometry(QtCore.QRect(160, 440, 260, 41)) self.lineEdit_6.setObjectName("lineEdit_6") self.lineEdit_7 = QtWidgets.QLineEdit(self.centralwidget) self.lineEdit_7.setGeometry(QtCore.QRect(160, 540, 260, 41)) self.lineEdit_7.setObjectName("lineEdit_7") self.textBrowser = QtWidgets.QTextBrowser(self.centralwidget) self.textBrowser.setGeometry(QtCore.QRect(440, 40, 311, 431)) self.textBrowser.setObjectName("textBrowser") MainWindow.setCentralWidget(self.centralwidget) self.menubar = QtWidgets.QMenuBar(MainWindow) self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 26)) self.menubar.setObjectName("menubar") MainWindow.setMenuBar(self.menubar) self.statusbar = QtWidgets.QStatusBar(MainWindow) self.statusbar.setObjectName("statusbar") MainWindow.setStatusBar(self.statusbar) self.retranslateUi(MainWindow) QtCore.QMetaObject.connectSlotsByName(MainWindow) def retranslateUi(self, MainWindow): _translate = QtCore.QCoreApplication.translate MainWindow.setWindowTitle(_translate("MainWindow", "阿里云物联网平台RRPC请求消息发送调试工具 元孜然 V:1.0.1")) self.pushButton.setText(_translate("MainWindow", "发送")) self.pushButton.clicked.connect(self.test) self.pushButton_2.setText(_translate("MainWindow", "退出")) self.pushButton_2.clicked.connect(self.ex) self.label.setText(_translate("MainWindow", "用户名AccessKey")) self.label_2.setText(_translate("MainWindow", "密匙AccessSecret")) self.label_3.setText(_translate("MainWindow", "产品名ProductKey")) self.label_4.setText(_translate("MainWindow", "设备名DeviceName")) self.label_timeout.setText(_translate("MainWindow", "等待响应时间/msnTimeout(<8000)")) self.label_6.setText(_translate("MainWindow", "Base64编码消息nRequestBase64Byte")) self.label_7.setText(_translate("MainWindow", "实例名iotInstanceId")) def test(self): userdata[0] = str(self.lineEdit.text()) userdata[1] = str(self.lineEdit_2.text()) userdata[2] = str(self.lineEdit_3.text()) userdata[3] = str(self.lineEdit_4.text()) userdata[4] = str(self.lineEdit_timeout.text()) userdata[5] = str(self.lineEdit_6.text()) userdata[6] = str(self.lineEdit_7.text()) self.thread.start() def updateLabel(self, text): self.textBrowser.append('received a message: '+text) self.cursor = self.textBrowser.textCursor() self.textBrowser.moveCursor(self.cursor.End) time.sleep(0.2) # 光标移到最后,这样就会自动显示出来 def ex(self): sys.exit() if __name__ == '__main__': import sys from PyQt5.QtWidgets import QApplication, QMainWindow app = QApplication(sys.argv) MainWindow = QMainWindow() MainWindow.setStyleSheet("#MainWindow{border-image:url(bak.jpg)}") ui = Ui_MainWindow() ui.setupUi(MainWindow) MainWindow.show() sys.exit(app.exec_())
百度网盘:百度网盘 请输入提取码
提取码:yzr1
最后
以上就是贤惠白羊最近收集整理的关于阿里云RRpc请求消息普适发送端的全部内容,更多相关阿里云RRpc请求消息普适发送端内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复