我是靠谱客的博主 帅气果汁,最近开发中收集的这篇文章主要介绍关于multiprocessing的Queue效率问题,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

今天大半天都在折腾着一个问题,就是从kafka消费消息后,后面的业务处理一直处理不过来,总是延后几个小时。为了解决这个问题,不断去调试代码,查找到相对耗时的位置,最终定位是Qeueue的问题。先上一段简化版的代码。

#!/usr/bin/env python
#-*- coding:utf-8 -*-
from cachetools import TTLCache
import os
import sys
import subprocess
from multiprocessing import Process,Queue
from datetime import datetime, timedelta
import config
from kafka import KafkaConsumer
from datetime import datetime, timedelta
import time
import socket
import struct
import logging
from logging.handlers import TimedRotatingFileHandler
def init_log():
# 建立一个 TimedRotatingFileHandler 对象
time_handler = TimedRotatingFileHandler(filename="./log/online_datarev.log", when="H", interval=1, backupCount=3)
# 设定 time_handler 文件扩展名的保存格式
# time_handler.suffix = '%Y-%m-%d-%H-%M-%S.log'
# 设定 WARNING 以上的内容输出到文件里面
time_handler.setLevel(logging.WARNING)
# 设定 time_handler 的输出参数
formatter = logging.Formatter('[%(levelname)s %(asctime)s %(filename)s %(lineno)s]:%(message)s')
time_handler.setFormatter(formatter)
# 创建一个 logger
logger = logging.getLogger()
# 将 time_handler 添加到 logger 中
logger.addHandler(time_handler)
return logger
logging = init_log()
def get_log_from_kafka(q, appid, app_boostrap_server):
while True:
try:
topic = 'ysec_anticheat_log_' + appid
consumer = KafkaConsumer(topic, bootstrap_servers=app_boostrap_server, request_timeout_ms=config.REQUEST_TIMEOUT_MS,auto_offset_reset=config.AUTO_OFFSET_RESET, group_id='anticheat_pic1')
for msg in consumer:
q.put(msg.value) # 将读取得到的数据,写入队列中
except Exception as e:
logging.error(str(e))
# 进行kafka数据的处理
def deal_log_task(q):
while True:
ip = ""
region = ""
code = 0
msg_list = []
try:
first_time = time.time()
print(q.qsize())
end_time = time.time()
print(end_time-first_time)
except Exception as e:
logging.error(e)
continue
if __name__ == '__main__':
q = Queue(200000000)
task_list = []
appid = 'act_present'
p = Process(target = get_log_from_kafka, args = (q, appid, config.log_boostrap_server,))
p.start()
task_list.append(p)
for i in range(5):
task = Process(target = deal_log_task, args = (q,))
task.start()
task_list.append(task)
for p in task_list:
p.join()

因为此业务每分钟维持在40万左右,而队列中的数据越来越多,最终占用了系统90%的内存了。我将接收进程接收消息的时间打印出来,没想到竟然需要60毫秒到200毫秒之间,接收一条消息竟然需要那么长时间,肯定有问题,直接去问google,发现Qeueue就个安全队列,那么想到put和get肯定都会加锁,这样频繁的加锁和解锁,那么时间都浪费在加解锁上了。

那我们是需要解决这加解锁的问题,那我们是不是可以减少在队列的交互次数呢?于是想到下面的一种方法来解决。直接上代码。

#!/usr/bin/env python
#-*- coding:utf-8 -*-
from cachetools import TTLCache
import os
import sys
import subprocess
from multiprocessing import Process,Queue
from datetime import datetime, timedelta
import config
from kafka import KafkaConsumer
from datetime import datetime, timedelta
import time
import socket
import struct
import logging
from logging.handlers import TimedRotatingFileHandler
def init_log():
# 建立一个 TimedRotatingFileHandler 对象
time_handler = TimedRotatingFileHandler(filename="./log/online_datarev.log", when="H", interval=1, backupCount=3)
# 设定 time_handler 文件扩展名的保存格式
# time_handler.suffix = '%Y-%m-%d-%H-%M-%S.log'
# 设定 WARNING 以上的内容输出到文件里面
time_handler.setLevel(logging.WARNING)
# 设定 time_handler 的输出参数
formatter = logging.Formatter('[%(levelname)s %(asctime)s %(filename)s %(lineno)s]:%(message)s')
time_handler.setFormatter(formatter)
# 创建一个 logger
logger = logging.getLogger()
# 将 time_handler 添加到 logger 中
logger.addHandler(time_handler)
return logger
logging = init_log()
def get_log_from_kafka(q, appid, app_boostrap_server, max_num):
while True:
try:
topic = 'ysec_anticheat_log_' + appid
consumer = KafkaConsumer(topic, bootstrap_servers=app_boostrap_server, request_timeout_ms=config.REQUEST_TIMEOUT_MS,auto_offset_reset=config.AUTO_OFFSET_RESET, group_id='anticheat_pic1')
msg_list = []
for msg in consumer:
msg_list.append(msg.value)
if len(msg_list) == max_num:
q.put(msg_list) # 将读取得到的数据,写入队列中
msg_list = []
except Exception as e:
logging.error(str(e))
# 进行kafka数据的处理
def deal_log_task(q):
while True:
ip = ""
region = ""
code = 0
msg_list = []
try:
first_time = time.time()
msg_list = q.get()
# 从队列获取信息
print(q.qsize())
end_time = time.time()
print(end_time-first_time)
except Exception as e:
logging.error(e)
continue
for msg in msg_list:
print(msg)
if __name__ == '__main__':
q = Queue(2000000)
task_list = []
appid = 'act_present'
p = Process(target = get_log_from_kafka, args = (q, appid, config.log_boostrap_server,1000,))
p.start()
task_list.append(p)
for i in range(5):
task = Process(target = deal_log_task, args = (q,))
task.start()
task_list.append(task)
for p in task_list:
p.join()

改版后的代码性能大大提高,原因是减少消息在队列中交互的数量,增加的是每条消息的大小而已,问题解决。在使用multiprocessing的Queue时需要注意,在数据交互频率较大时,不建议使用这种方式,非常影响性能。

最后

以上就是帅气果汁为你收集整理的关于multiprocessing的Queue效率问题的全部内容,希望文章能够帮你解决关于multiprocessing的Queue效率问题所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部