概述
由于系统上传图片有时候C端没有接收到消息,需要做一个同步功能。C端加载图片的时候不用请求远程图片库而是加载本地的图片,相当于做了个缓存,大大减少了C端加载图片的时间,提高了用户体验。
一、功能作用
mqtt是rabbitmq服务器的一个插件,可以用它发布与订阅主题。
这个同步功能,其实就是用了rabbitmq的应用场景之一异步处理。
二、流程步骤
1、设置mqtt唯一ID,因客户端id不能重复,所以选当前时间为唯一ID
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
client= mqtt.Client(client_id) #ClientId不能重复,所以使用当前时间
2、设置rabbitmq服务器的用户名和密码
client.username_pw_set("dev", "YTc4Mj")
3、订阅主题
client.subscribe("sync")
4、接收消息
recvmsg = msg.payload.decode("utf-8")
5、处理消息
三、demo源码
1、mqtt连接rabbitmq服务器
importpaho.mqtt.client as mqttimporttimeimportsocketclientimportloggerimportdemjsonimportcommonimportsyncfile
log= logger.Logger("info")
HOST = "127.0.0.1"PORT= 1883
defclient_loop():
client_id= time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
client= mqtt.Client(client_id) #ClientId不能重复,所以使用当前时间
client.username_pw_set("dev", "YTc4Mj") #必须设置,否则会返回「Connected with result code 4」
client.on_connect =on_connect
client.on_message=on_message
log.info('开始连接mqtt' + HOST + ':' +str(PORT))
client.connect(HOST, PORT,60)
log.info('mqtt连接完成' + HOST + ':' +str(PORT))
client.loop_forever()defon_connect(client, userdata, flags, rc):
log.info("Connected with result code" +str(rc))
client.subscribe("projector-remote-control")
log.info("订阅消息 projector-remote-control")
client.subscribe("holo-file-sync")
log.info("订阅消息 holo-file-sync")
client.subscribe("sync")
log.info("订阅消息 sync")
client.subscribe("sync-single-work")
log.info("订阅消息 sync-single-work")defon_message(client, userdata, msg):
recvmsg= msg.payload.decode("utf-8")
log.info("收到消息" + recvmsg + ",开始执行命令")print(msg.topic + "" +recvmsg)if(msg.topic=='sync-single-work'):
common.insert_sql(recvmsg)elif(msg.topic=='sync'):
common.insert_sql(recvmsg)elif(msg.topic=='holo-file-sync'):
common.insert_sql(recvmsg)elif(msg.topic=='projector-remote-control'):
text=demjson.decode(msg)
command= text['command']
projectors= text['projectors']#socketclient.sendSocket(recvmsg)
socketclient.handlerMsg(command,projectors)if __name__ == '__main__':
syncfile.scheduletask()
syncfile.scheduleuncompletedtask()
client_loop()
2、消息处理
importloggerfrom threading importTimerimportosimportrequestsfrom io importBytesIOfrom PIL importImageimportcommon
log= logger.Logger("info")defscheduletask():
t= Timer(10,scheduletask)
t.start()#print('定时任务已开启,等待接收参数中...')
list = common.select_sql(3)if(list ==None):returngetimagesize(list[0],list[2])defscheduleuncompletedtask():
t= Timer(10,scheduleuncompletedtask)
t.start()#print('定时任务已开启,等待接收参数中...')
list = common.select_sql(2)if(list ==None):returngetimagesize(list[0],list[2])defgetimagesize(id,url):
savepath = common.readJson()['holoImageUrl']try:
response=requests.get(url)except:
common.delete_sql(id)returntmpIm=BytesIO(response.content)
im=Image.open(tmpIm)
imgpath= url[url.index('.com/')+5:]
dirpath= imgpath[:imgpath.rindex('/')].replace('/','\')
filename= imgpath[imgpath.rfind('/')+1:]
targetpath= savepath+'\'+dirpath+'\'filename1= filename[:filename.find('!')]if (os.path.exists(targetpath)):pass
else:os.makedirs(targetpath)
im.save(targetpath+filename1)if(os.path.exists(targetpath +filename)):
os.remove(targetpath+filename)
os.rename(targetpath+ filename1, targetpath +filename)#localfilesize = os.path.getsize(targetpath+filename)
#remotefilesize = dict(response.headers).get('Content-Length', 0)
#if(localfilesize == remotefilesize):
if(os.path.exists(targetpath+filename) and os.path.getsize(targetpath+filename) !=0):
common.delete_sql(id)
log.info("增量图片下载完成...")else:
common.update_sql(id)
log.info("增量图片部分下载 ...")if __name__ == '__main__':
scheduletask()
3、公共模块
importjsonimportsqlite3importrefrom threading importTimerimportlogger
log= logger.Logger("info")defreadJson():
with open('config.json', 'rt') as jsonFile:
val=jsonFile.read()
Config=json.loads(val)returnConfigdefcreate_table():
conn= sqlite3.connect('imagesnyc.xs')
curs=conn.cursor() #获取游标
try:
create_tb_cmd='''CREATE TABLE IF NOT EXISTS image_sync(id INTEGER PRIMARY KEY AUTOINCREMENT,imgsize INTEGER,url TEXT,status INTEGER)'''
#主要就是上面的语句
curs.execute(create_tb_cmd)
conn.commit()except:
log.info('Create table failed')returnFalsefinally:returnconndefinsert_sql(url):
conn=create_table()
curs=conn.cursor() #获取游标
imgurls = re.split(',',url)for imageurl inimgurls:
curs.execute("INSERT INTO image_sync(imgsize,url,status) VALUES('{}','{}','{}');".format(0,imageurl,3))#添加记录
conn.commit()
log.info("插入完成")defselect_sql(status):
conn=create_table()
curs= conn.cursor() #获取游标
curs.execute("select * from image_sync where status ='%s' order by id ASC LIMIT 1"%status)#查询记录
list =curs.fetchone()
conn.commit()returnlistdefupdate_sql(id):
conn=create_table()
curs=conn.cursor() #获取游标
try:
curs.execute("update image_sync set status = 2 where id = '%s'"%id)#更新记录
conn.commit()except:
log.info('根据主键id更新失败')finally:print('continue')defdelete_sql(id):
conn=create_table()
curs=conn.cursor() #获取游标
curs.execute("delete from image_sync where id = '%s'"%id)#删除记录
conn.commit()#log.info('成功删除已下载完成的记录')
四、技术难点
1、同步一个作品,接收到的作品字符串中可能包含多个图片地址,则需要分割字符串然后存储到内存数据库或者sqllite免安装数据库,如果同时下载多张图片会造成线程阻塞,所以用了python的定时器功能,设置好图片预计需要下载的时间。
2、由于接受到的消息字符串地址是预览图片格式,最后有!的,Image不能保存该图片格式的地址,所以先截取最后面一个!,然后保存,保存成功后再改变为预览地址,C端便可成功加载。
最后
以上就是畅快荔枝为你收集整理的python读取mqtt数据_Python之mqtt接收异步消息的全部内容,希望文章能够帮你解决python读取mqtt数据_Python之mqtt接收异步消息所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复