概述
一、提要
multiprocessing— 基于进程的并行性
多处理是一个支持使用类似于线程模块的 API 生成进程的包。多处理包提供本地和远程并发,通过使用子进程而不是线程来有效地避开全局解释器锁。因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它可以在 Unix 和 Windows 上运行。
二、多种方式
2.1 最简单Pool
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
output:
[1, 4, 9]
concurrent.futures.ProcessPoolExecutor 提供更高级别的接口来将任务推送到后台进程,而不会阻塞调用进程的执行。与直接使用池接口相比,concurrent.futures API 更容易将工作提交到底层进程池与等待结果分开。
2.2 关于 Process 类
在多处理中,通过创建 Process 对象然后调用其 start() 方法来生成进程。进程遵循 threading.Thread 的 API。多进程程序的一个简单示例是
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
为了显示所涉及的各个进程 ID,这是一个扩展示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
2.3 上下文和启动方法
根据平台的不同,多处理支持三种启动进程的方式。这些启动方法是
- 孵化(spawn)
父进程启动一个新的 Python 解释器进程。子进程只会继承运行进程对象的 run() 方法所需的资源。特别是,不会继承父进程中不必要的文件描述符和句柄。与使用 fork 或 forkserver 相比,使用此方法启动进程相当慢。
在 Unix 和 Windows 上可用。 Windows 和 macOS 上的默认设置。
- 叉子(fork)
父进程使用 os.fork() 来分叉 Python 解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地分叉多线程进程是有问题的。
仅在 Unix 上可用。 Unix 上的默认值。
- 分叉服务器(forkservice)
当程序启动并选择 forkserver 启动方法时,将启动一个服务器进程。从那时起,每当需要一个新进程时,父进程都会连接到服务器并请求它派生一个新进程。 fork 服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。
在支持通过 Unix 管道传递文件描述符的 Unix 平台上可用。
在 3.8 版中更改:在 macOS 上,spawn start 方法现在是默认方法。分叉启动方法应该被认为是不安全的,因为它可能导致子进程崩溃。请参阅 bpo-33725。
在 3.4 版更改:在所有 unix 平台上添加了 spawn,并为某些 unix 平台添加了 forkserver。子进程不再继承 Windows 上的所有父进程可继承句柄。
在 Unix 上,使用 spawn 或 forkserver 启动方法还将启动资源跟踪器进程,该进程跟踪由程序进程创建的未链接的命名系统资源(例如命名信号量或 SharedMemory 对象)。当所有进程都退出时,资源跟踪器会取消链接任何剩余的跟踪对象。通常应该没有,但如果一个进程被信号杀死,可能会有一些“泄露”的资源。 (泄漏的信号量和共享内存段都不会在下次重新启动之前自动取消链接。这对两个对象都是有问题的,因为系统只允许有限数量的命名信号量,而共享内存段会占用主内存中的一些空间。)
要选择启动方法,请在主模块的 if __name__ == '__main__' 子句中使用 set_start_method()。例如:
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
set_start_method()
不应在程序中多次使用。
或者,您可以使用 get_context() 来获取上下文对象。上下文对象具有与多处理模块相同的 API,并允许在同一程序中使用多个启动方法。
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。
想要使用特定启动方法的库可能应该使用 get_context() 以避免干扰库用户的选择。
警告
'spawn' 和 'forkserver' 启动方法目前不能与 Unix 上的“冻结”可执行文件(即由 PyInstaller 和 cx_Freeze 等软件包生成的二进制文件)一起使用。 “fork”启动方法确实有效。
2.4 进程间交换对象(Exchanging objects between processes)
multiprocessing supports two types of communication channel between processes:
Queues
The Queue class is a near clone of queue.Queue. For example:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Queues are thread and process safe.
Pipes
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Pipe() 返回的两个连接对象代表管道的两端。每个连接对象都有 send() 和 recv() 方法(等等)。请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程没有损坏的风险。
2.5 进程之间的同步
多处理包含来自线程的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Without using the lock output from the different processes is liable to get all mixed up.
2.6 进程之间共享状态(Sharing state between processes)
如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。在使用多个进程时尤其如此。
但是,如果您确实需要使用一些共享数据,那么多处理提供了几种这样做的方法。
1)共享内存法:Shared memory
Data can be stored in a shared memory map using Value or Array. For example, the following code
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])will print
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]创建 num 和 arr 时使用的“d”和“i”参数是数组模块使用的类型代码:“d”表示双精度浮点数,“i”表示有符号整数。这些共享对象将是进程和线程安全的。
为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意 ctypes 对象。
2)服务器进程法:Server process
Manager() 返回的管理器对象控制一个服务器进程,该进程保存 Python 对象并允许其他进程使用代理来操作它们。 Manager() 返回的管理器将支持列表、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value 和 Array 类型。例如,from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)will print
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存要慢。
3)工作进程池法:Using a pool of workers
Pool 类代表一个工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。
For example:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意:
- 池的方法只能由创建它的进程使用。
- 此包中的功能要求 __main__ 模块可由子模块导入。这在编程指南中有介绍,但在此值得指出。
这意味着某些示例(例如 multiprocessing.pool.Pool 示例)在交互式解释器中不起作用。例如:
>>>
>>> from multiprocessing import Pool >>> p = Pool(5) >>> def f(x): ... return x*x ... >>> with p: ... p.map(f, [1,2,3]) Process PoolWorker-1: Process PoolWorker-2: Process PoolWorker-3: Traceback (most recent call last): AttributeError: 'module' object has no attribute 'f' AttributeError: 'module' object has no attribute 'f' AttributeError: 'module' object has no attribute 'f'
(如果你尝试这个,它实际上会输出三个以半随机方式交错的完整回溯,然后你可能不得不以某种方式停止父进程。)
最后
以上就是传统睫毛膏为你收集整理的【python知识】多进程专题(2) 一、提要二、多种方式的全部内容,希望文章能够帮你解决【python知识】多进程专题(2) 一、提要二、多种方式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复