我是靠谱客的博主 畅快荔枝,最近开发中收集的这篇文章主要介绍python读取mqtt数据_Python之mqtt接收异步消息,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

由于系统上传图片有时候C端没有接收到消息,需要做一个同步功能。C端加载图片的时候不用请求远程图片库而是加载本地的图片,相当于做了个缓存,大大减少了C端加载图片的时间,提高了用户体验。

一、功能作用

mqtt是rabbitmq服务器的一个插件,可以用它发布与订阅主题。

这个同步功能,其实就是用了rabbitmq的应用场景之一异步处理。

二、流程步骤

1、设置mqtt唯一ID,因客户端id不能重复,所以选当前时间为唯一ID

client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

client= mqtt.Client(client_id) #ClientId不能重复,所以使用当前时间

2、设置rabbitmq服务器的用户名和密码

client.username_pw_set("dev", "YTc4Mj")

3、订阅主题

client.subscribe("sync")

4、接收消息

recvmsg = msg.payload.decode("utf-8")

5、处理消息

三、demo源码

1、mqtt连接rabbitmq服务器

importpaho.mqtt.client as mqttimporttimeimportsocketclientimportloggerimportdemjsonimportcommonimportsyncfile

log= logger.Logger("info")

HOST = "127.0.0.1"PORT= 1883

defclient_loop():

client_id= time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

client= mqtt.Client(client_id) #ClientId不能重复,所以使用当前时间

client.username_pw_set("dev", "YTc4Mj") #必须设置,否则会返回「Connected with result code 4」

client.on_connect =on_connect

client.on_message=on_message

log.info('开始连接mqtt' + HOST + ':' +str(PORT))

client.connect(HOST, PORT,60)

log.info('mqtt连接完成' + HOST + ':' +str(PORT))

client.loop_forever()defon_connect(client, userdata, flags, rc):

log.info("Connected with result code" +str(rc))

client.subscribe("projector-remote-control")

log.info("订阅消息 projector-remote-control")

client.subscribe("holo-file-sync")

log.info("订阅消息 holo-file-sync")

client.subscribe("sync")

log.info("订阅消息 sync")

client.subscribe("sync-single-work")

log.info("订阅消息 sync-single-work")defon_message(client, userdata, msg):

recvmsg= msg.payload.decode("utf-8")

log.info("收到消息" + recvmsg + ",开始执行命令")print(msg.topic + "" +recvmsg)if(msg.topic=='sync-single-work'):

common.insert_sql(recvmsg)elif(msg.topic=='sync'):

common.insert_sql(recvmsg)elif(msg.topic=='holo-file-sync'):

common.insert_sql(recvmsg)elif(msg.topic=='projector-remote-control'):

text=demjson.decode(msg)

command= text['command']

projectors= text['projectors']#socketclient.sendSocket(recvmsg)

socketclient.handlerMsg(command,projectors)if __name__ == '__main__':

syncfile.scheduletask()

syncfile.scheduleuncompletedtask()

client_loop()

2、消息处理

importloggerfrom threading importTimerimportosimportrequestsfrom io importBytesIOfrom PIL importImageimportcommon

log= logger.Logger("info")defscheduletask():

t= Timer(10,scheduletask)

t.start()#print('定时任务已开启,等待接收参数中...')

list = common.select_sql(3)if(list ==None):returngetimagesize(list[0],list[2])defscheduleuncompletedtask():

t= Timer(10,scheduleuncompletedtask)

t.start()#print('定时任务已开启,等待接收参数中...')

list = common.select_sql(2)if(list ==None):returngetimagesize(list[0],list[2])defgetimagesize(id,url):

savepath = common.readJson()['holoImageUrl']try:

response=requests.get(url)except:

common.delete_sql(id)returntmpIm=BytesIO(response.content)

im=Image.open(tmpIm)

imgpath= url[url.index('.com/')+5:]

dirpath= imgpath[:imgpath.rindex('/')].replace('/','\')

filename= imgpath[imgpath.rfind('/')+1:]

targetpath= savepath+'\'+dirpath+'\'filename1= filename[:filename.find('!')]if (os.path.exists(targetpath)):pass

else:os.makedirs(targetpath)

im.save(targetpath+filename1)if(os.path.exists(targetpath +filename)):

os.remove(targetpath+filename)

os.rename(targetpath+ filename1, targetpath +filename)#localfilesize = os.path.getsize(targetpath+filename)

#remotefilesize = dict(response.headers).get('Content-Length', 0)

#if(localfilesize == remotefilesize):

if(os.path.exists(targetpath+filename) and os.path.getsize(targetpath+filename) !=0):

common.delete_sql(id)

log.info("增量图片下载完成...")else:

common.update_sql(id)

log.info("增量图片部分下载 ...")if __name__ == '__main__':

scheduletask()

3、公共模块

importjsonimportsqlite3importrefrom threading importTimerimportlogger

log= logger.Logger("info")defreadJson():

with open('config.json', 'rt') as jsonFile:

val=jsonFile.read()

Config=json.loads(val)returnConfigdefcreate_table():

conn= sqlite3.connect('imagesnyc.xs')

curs=conn.cursor() #获取游标

try:

create_tb_cmd='''CREATE TABLE IF NOT EXISTS image_sync(id INTEGER PRIMARY KEY AUTOINCREMENT,imgsize INTEGER,url TEXT,status INTEGER)'''

#主要就是上面的语句

curs.execute(create_tb_cmd)

conn.commit()except:

log.info('Create table failed')returnFalsefinally:returnconndefinsert_sql(url):

conn=create_table()

curs=conn.cursor() #获取游标

imgurls = re.split(',',url)for imageurl inimgurls:

curs.execute("INSERT INTO image_sync(imgsize,url,status) VALUES('{}','{}','{}');".format(0,imageurl,3))#添加记录

conn.commit()

log.info("插入完成")defselect_sql(status):

conn=create_table()

curs= conn.cursor() #获取游标

curs.execute("select * from image_sync where status ='%s' order by id ASC LIMIT 1"%status)#查询记录

list =curs.fetchone()

conn.commit()returnlistdefupdate_sql(id):

conn=create_table()

curs=conn.cursor() #获取游标

try:

curs.execute("update image_sync set status = 2 where id = '%s'"%id)#更新记录

conn.commit()except:

log.info('根据主键id更新失败')finally:print('continue')defdelete_sql(id):

conn=create_table()

curs=conn.cursor() #获取游标

curs.execute("delete from image_sync where id = '%s'"%id)#删除记录

conn.commit()#log.info('成功删除已下载完成的记录')

四、技术难点

1、同步一个作品,接收到的作品字符串中可能包含多个图片地址,则需要分割字符串然后存储到内存数据库或者sqllite免安装数据库,如果同时下载多张图片会造成线程阻塞,所以用了python的定时器功能,设置好图片预计需要下载的时间。

2、由于接受到的消息字符串地址是预览图片格式,最后有!的,Image不能保存该图片格式的地址,所以先截取最后面一个!,然后保存,保存成功后再改变为预览地址,C端便可成功加载。

最后

以上就是畅快荔枝为你收集整理的python读取mqtt数据_Python之mqtt接收异步消息的全部内容,希望文章能够帮你解决python读取mqtt数据_Python之mqtt接收异步消息所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部