我是靠谱客的博主 想人陪酒窝,最近开发中收集的这篇文章主要介绍python 系列之 - 多进程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

多进程 (multiprocessing)

1 创建一个多进程
创建: multiprocessing.Process([target=函数名], [ name = 别名], [args=(参数,)], [kwargs=调用对象的字典,])
方法:
is_alive() : 判断当前进程是否为活动状态
join([timeout]): 和多线程一样,等待其它子进程结束后主进程才继续执行
run(): 调用start()方法后自动调用run()方法
start(): Process以start()启动某个进程。
terminate(): 结束进程方法
属性:
daemon: 设置子进程为后台进程,默认为前台进程(False) ,父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
pid: 获取进程的ID号
name: 获取进程的别名
exitcode: 进程在运行时为None、如果为–N,表示被信号N结束

import multiprocessing
import time
def do_process(num):
print("func in process %s" % num)
if __name__ == "__main__":
for i in range(3):
p = multiprocessing.Process(target=do_process, name="My_name-{0}".format(str(i)), args=(str(i), ))
p.start()
print("Process {0} is started, PID = {1}, is_alive = {2} ".format(p.name, p.pid, p.is_alive()))
run result:
Process My_name-0 is started, PID = 3440, is_alive = True
func in process 0
Process My_name-1 is started, PID = 11904, is_alive = True
func in process 1
Process My_name-2 is started, PID = 24396, is_alive = True
func in process 2

自定义进程类

import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
super(ClockProcess, self).__init__()
self.interval = interval
def run(self):
n = 5
while n > 0:
print("the time is {0}".format(time.ctime()))
time.sleep(self.interval)
n -= 1
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
注:进程p调用start()时,自动调用run()
run result:
the time is Tue Mar
8 14:34:21 2016
the time is Tue Mar
8 14:34:24 2016
the time is Tue Mar
8 14:34:27 2016
the time is Tue Mar
8 14:34:30 2016
the time is Tue Mar
8 14:34:33 2016

2 进程间数据共享
1) Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
put(obj, block=True, timeout=None) 方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
如果blocked为True(默认值),无timeout参数,则阻塞,直到取走后再继续put
如果blocked为True,并且timeout为正值,当队列满了之后,该方法会阻塞timeout 的时间,直到该队列有剩余的空间。如果超时,队列仍满着,会抛出Queue.Full异常。
如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常,与 put_nowait()方法等价

get(block=True, timeout=None): 方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。
如果blocked为True(默认值),timeout无值,则阻塞等待有值以后再取
如果blocked为True,并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
如果blocked为False,有两种情况存在: 如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常

import multiprocessing
from multiprocessing import Queue
import time
def dofunc(qname, num):
qname.put(num)
print("put %s to qname" % num)
time.sleep(1)
def get_queue(q1):
value = q1.get()
print("get value %s from q" % value)
time.sleep(2)
if __name__ == '__main__':
q = Queue(5)
for i in range(10):
p = multiprocessing.Process(target=dofunc, args=(q, i))
p.start()
while 1:
get_queue(q)
if q.empty():
break
print("Finished")

2)Pipe
Pipe方法返回(conn1, conn2)代表一个管道的两个端。
Pipe方法有duplex参数:
duplex 为 True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。
duplex 为 False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。
在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

import multiprocessing
from multiprocessing import Queue, Pipe
import time
def pip_send(pip):
for i in range(10):
print("send to pip %s" % i)
pip.send(i)
time.sleep(0.5)
def pip_recv(pip):
while True:
value = pip.recv()
print("recv from pip %s" % value)
slice(1)
if not pip.recv():
break
if __name__ == "__main__":
pip = Pipe()
p1 = multiprocessing.Process(target=pip_send, args=(pip[0], ))
p2 = multiprocessing.Process(target=pip_recv, args=(pip[1], ))
p1.start()
p2.start()
p1.join()
p2.join()

3) Managers
Manager提供了一种不同进程间创建共享数据的方式,由Manager()创建来保存Python对象并允许其它进程来进行访问。支持的类型有 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array

import multiprocessing
import time
def worker(d, key, value):
d[key] = value
if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [multiprocessing.Process(target=worker, args=(d, i, i * 2)) for i in range(10)]
print(jobs)
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Results:')
print(d)
for key, value in d.items():
print("%s=%s" % (key, value))
run result:
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
0=0
1=2
2=4
3=6
4=8
5=10
6=12
7=14
8=16
9=18

4) Pool
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。
当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,不加限制的开启进程恐怕服务器就该挂了,
但我们手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

进程池方法:
apply(func[, args[, kwds]]): 阻塞的执行,比如创建一个有3个线程的线程池,当执行时是创建完一个 执行完函数 在创建另一个,变成一个线性的执行
apply_async(func[, args[, kwds[, callback]]]) : 它是非阻塞执行,同时创建3个线程的线城池,同时执行,只要有一个执行完立刻放回池子待下一个执行,并行的执行
close(): 关闭pool,使其不在接受新的任务。
terminate() : 结束工作进程,不在处理未完成的任务。
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

非阻塞效果

import multiprocessing
import time
def pool_func(i):
print("start %s" %i)
time.sleep(3)
print("end -
%s" %i)
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
for i in range(5):
pool.apply_async(pool_func, args=(i,))
#Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
pool.close()
pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
run result:
start 0
start 1
start 2
end -
0
start 3
end -
1
start 4
end -
2
end -
3
end -
4

阻塞效果:

import multiprocessing
import time
def pool_func(i):
print("start %s" %i)
time.sleep(3)
print("end -
%s" %i)
if __name__ == "__main__":
pool = multiprocessing.Pool(3)
for i in range(5):
pool.apply(pool_func, args=(i,))
pool.close()
pool.join()
run result:
start 0
end -
0
start 1
end -
1
start 2
end -
2
start 3
end -
3
start 4
end -
4

最后

以上就是想人陪酒窝为你收集整理的python 系列之 - 多进程的全部内容,希望文章能够帮你解决python 系列之 - 多进程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部