我是靠谱客的博主 纯真外套,最近开发中收集的这篇文章主要介绍python多进程编程实例_python学习笔记之---多进程实例,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

python进程:

一些进程中的模块:

os.fork()

subprocess

processing

Multiprocessing

进程间通信方式:

文件

管道

socket

信号

信号量

共享内存

①Linux 下通过fork生成进程

fork()函数,它也属于一个内建函数,并且只在Linux系统下存在。它非常特殊。

普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的PID。

这样做的理由是,一个父进程可以fork()出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID,子进程只需要调用os.getpid()函数可以获取自己的进程号。

"""pid=os.fork()

1.只用在Unix系统中有效,Windows系统中无效

2.fork函数调用一次,返回两次:

在父进程中返回值为子进程id,在子进程中返回值为0"""

importos

pid=os.fork() #生成了一个子进程,出现了2个进程同时开始向下执行:

if pid==0:print("执行子进程,子进程pid={pid},父进程ppid={ppid}".

format(pid=os.getpid(),ppid=os.getppid()))else:print("执行父进程,子进程pid={pid},父进程ppid={ppid}".

format(pid=pid,ppid=os.getpid()))

解析:

os.fork() 执行的时候会生成子进程,主程序还有一个主进程,

此时出了2个进程,一个主进程,一个是子进程

os.fork()规定,主进程自动获取函数返回值是子进程的pid子进程获取函数返回值是0

主进程:pid=19293

if pid==0:

print("执行子进程,子进程pid={pid},父进程ppid={ppid}".

format(pid=os.getpid(),ppid=os.getppid()))

else:

print("执行父进程,子进程pid={pid},父进程ppid={ppid}".

format(pid=pid,ppid=os.getpid()))

主进程执行的结果:else的分支

子进程:pid = 0

if pid==0:

print("执行子进程,子进程pid={pid},父进程ppid={ppid}".

format(pid=os.getpid(),ppid=os.getppid()))

else:

print("执行父进程,子进程pid={pid},父进程ppid={ppid}".

format(pid=pid,ppid=os.getpid()))

子进程执行的结果:if的分支

综合起来看,因为2个进程分别执行if和else的代码块,所以最后的结果是,if被子进程执行了一次,else被主进程执行了一次

②#创建进程:(跨平台)----multiprocessing

importmultiprocessingdef do(n) : #任务函数

#获取当前线程的名字

name =multiprocessing.current_process().nameprint(name,'starting')print("worker", n)return

if __name__ == '__main__':

numList=[]#蓝色部分整个实现了5个进程串行

for i in range(5) :

p = multiprocessing.Process(target=do, args=(i,)) #multiprocessing.Process生成进程,循环5次生成5个进程,args传递一个元祖

numList.append(p) #把进程对象p添加到列表里

p.start() #start启动进程,start之后是就绪态(创建好进程之后要等待cpu有空才能执行,所以start之后是就绪态)

p.join() #join表示当前的进程对象执行完毕之后主进程才会往下执行,子进程之间是串行的

print("Process end.")'''#要想实现5个子进程并行,修改方案:

for i in numList:

i.join() #进程1执行完,才会执行下一次循环

#进程2执行完,才会执行下一次循环

#......

#....

#进程5执行完,退出循环'''

print(numList)#-------》整个程序是一个子进程和一个主进程在执行

执行结果:

E:>py -3 a.py

Process-1 starting

worker  0

Process end.

Process-2 starting

worker  1

Process end.

Process-3 starting

worker  2

Process end.

Process-4 starting

worker  3

Process end.

Process-5 starting

worker  4

Process end.

[, ,

3, stopped)>, , ]

#并行执行结果:#join等待子进程执行完毕才结束主进程

importmultiprocessingdef do(n) : #任务函数

#获取当前线程的名字

name =multiprocessing.current_process().nameprint(name,"starting")print("worker", n)return

if __name__ == '__main__':

numList=[]#蓝色部分整个实现了5个进程串行

for i in range(5) :

p= multiprocessing.Process(target=do, args=(i,)) #multiprocessing.Process生成进程,循环5次生成5个进程,args传递一个元祖

numList.append(p) #把进程对象p添加到列表里

p.start() #start启动进程,start之后是就绪态(创建好进程之后要等待cpu有空才能执行,所以start之后是就绪态)

#p.join() #join表示当前的进程对象执行完毕之后主进程才会往下执行,子进程之间是串行的

print("Process end.")#要想实现5个子进程并行,修改方案:

for i innumList:

i.join()#进程1执行完,才会执行下一次循环

#进程2执行完,才会执行下一次循环

#......

#....

#进程5执行完,退出循环#join让主进程等待子进程执行完毕,然后主进程才会打印最后一句话

print(numList)

执行结果:

E:>py -3 a.py

Process end.

Process end.

Process end.

Process end.

Process end.

Process-2 starting

worker  1

Process-1 starting

worker  0

Process-3 starting

worker  2

Process-4 starting

worker  3

Process-5 starting

worker  4

[, ,

3, stopped)>, , ]  #加了join让子进程结束之后才执行主进程的 print(numList)

#如果不加join的效果,就是主进程退出了子进程还在跑;如果主进程退出了,想让子进程也结束可以作为看护进程。

importmultiprocessingdef do(n) : #任务函数

#获取当前线程的名字

name =multiprocessing.current_process().nameprint(name,"starting")print("worker", n)return

if __name__ == '__main__':

numList=[]#蓝色部分整个实现了5个进程串行

for i in range(5) :

p= multiprocessing.Process(target=do, args=(i,)) #multiprocessing.Process生成

进程,循环5次生成5个进程,args传递一个元祖

numList.append(p)#把进程对象p添加到列表里

#p.start() #start启动进程,start之后是就绪态(创建好进程之后要等待cpu有空才能

执行,所以start之后是就绪态)#p.join() #join表示当前的进程对象执行完毕之后主进程才会往下执行,子进程之间是

串行的print("Process end.")for i innumList:

i.start()print(numList)

执行结果:

E:>py -3 a.py

Process end.

Process end.

Process end.

Process end.

Process end.

#这里先打印了主进程numList,主进程退出

[, ,

3, started)>, , ]

#子进程还在跑

Process-2 starting

worker  1

Process-3 starting

worker  2

Process-1 starting

worker  0

Process-4 starting

worker  3

Process-5 starting

worker  4     #不加join主进程结束之后,子进程还在继续

③#os.fork()和multiprocessing的结合使用--------precoess是跨平台的

#linux上的执行版本:(linux上执行不用加if __name__ == "__main__": )

from multiprocessing importProcessimportosimporttimedefsleeper(name, seconds):print("Process ID# %s" %(os.getpid()))print("Parent Process ID# %s" %(os.getppid()))#仅支持在linux上,一个进程会有父进程和自己的ID,windows上就没有父进程id

print("%s will sleep for %s seconds" %(name, seconds))

time.sleep(seconds)#if __name__ == "__main__":

child_proc = Process(target = sleeper, args = ('bob', 5))

child_proc.start()print("in parent process after child process start")print("parent process about to join child process")

child_proc.join()print("in parent process after child process join")print("the parent's parent process: %s" % (os.getppid()))

#windows上的执行版本:

from multiprocessing importProcessimportosimporttimedefsleeper(name, seconds):print("Process ID# %s" %(os.getpid()))print("Parent Process ID# %s" %(os.getppid()))#仅支持在linux上,一个进程会有父进程和自己的ID,windows上就没有父进程id

print("%s will sleep for %s seconds" %(name, seconds))

time.sleep(seconds)if __name__ == "__main__":

child_proc= Process(target = sleeper, args = ('bob', 5))

child_proc.start()print("in parent process after child process start")print("parent process about to join child process")

child_proc.join()print("in parent process after child process join")print("the parent's parent process: %s" % (os.getppid()))

执行结果:

E:>py -3 a.py

in parent process after child process start

parent process about to join child process

Process ID# 39956

Parent Process ID# 40176

bob will sleep for 5 seconds

in parent process after child process join

the parent's parent process: 43868

④#多进程模板程序

#coding=utf-8

importmultiprocessingimporturllib.requestimporttimedeffunc1(url) :

response=urllib.request.urlopen(url)

html=response.read()print(html[0:20])

time.sleep(1)deffunc2(url) :

response=urllib.request.urlopen(url)

html=response.read()print(html[0:20])

time.sleep(1)if __name__ == '__main__':

p1= multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")

p2= multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")

p1.start()

p2.start()

p1.join()

p2.join()

time.sleep(1)print("done!")

执行结果:

E:>py -3 a.py

b'n调用Task类里面的__call__方法

5 任务执行完毕:

self.task_queue.task_done()

6 比如多个任务中,有任意一个任务

漏掉执行self.task_queue.task_done()

7 tasks.join()导致主程序的卡死,因为它会死等

被漏掉的任务执行self.task_queue.task_done()。

4.实现多进程(multiprocessing跨进程的,最后取结果的时候是同一份)

#encoding=utf-8

importmultiprocessingimporttimeclass Consumer(multiprocessing.Process):#生成了进程对象

#派生进程

def __init__(self,task_queue,result_queue):

multiprocessing.Process.__init__(self)#父类的构造函数

self.task_queue =task_queue

self.result_queue=result_queue#重写原进程的run方法

defrun(self):while 1:

nexttask=self.task_queue.get()if nexttask isNone:

self.task_queue.task_done()returnself.result_queue.put(nexttask())

self.task_queue.task_done()return

class Task(object): #任务对象

def __init__(self, a, b):

self.a=a

self.b=bdef __call__(self):

time.sleep(0.1) #pretend to take some time to do the work

return '%s * %s = %s' % (self.a, self.b, self.a *self.b)def __str__(self):return '%s * %s' %(self.a, self.b)if __name__ == "__main__":

tasks=multiprocessing.JoinableQueue()

results=multiprocessing.Queue()for i in range(10):

tasks.put(Task(i,i+1))#获得当前电脑的cpu核数,4核

num_consumers =multiprocessing.cpu_count()

consumers= [ Consumer(tasks,results) for i in range(num_consumers) ] #推导列表生成4个对象,放到consumers里,这个时候就有多个consumers可以做任务了。

#启动每一个进程

for i inconsumers:

i.start()#给每个进程放一个None

for i inconsumers:

tasks.put(None)

tasks.join()#最后打印结果队列中的内容

num_jobs=10

whilenum_jobs:

result= results.get() #取结果队列中的内容

print ('Result: %s' %result)

num_jobs-= 1

执行结果:

D:>py -3 a.py

Result: 0 * 1 = 0

Result: 1 * 2 = 2

Result: 2 * 3 = 6

Result: 3 * 4 = 12

Result: 4 * 5 = 20

Result: 5 * 6 = 30

Result: 6 * 7 = 42

Result: 7 * 8 = 56

Result: 8 * 9 = 72

Result: 9 * 10 = 90

线程安全(多线程、多进程)----加锁,保证安全,线程不安全(单线程、单进程、协程)-----不加锁,,提升性能

锁:

进程1:加锁:a=1,我取走了。。。。计算完毕a=2,释放锁。。。。

进程2:有锁,死等,别人释放锁,加锁:a=2,+1,a=3,释放锁。。。

#同步进程--加锁

把并发的东西变成了一个线性的执行

from multiprocessing importProcess, Lockdefl(lock, num):

lock.acquire()#获得锁

print("Hello Num: %s" %(num))

lock.release()#释放锁

if __name__ == '__main__':

lock= Lock() #创建一个共享锁实例

for num in range(20):

Process(target= l, args =(lock, num)).start()#不加锁---并发执行

from multiprocessing importProcess, Lockimporttimedefl(lock, num):#lock.acquire() # 获得锁

time.sleep(1)print("Hello Num: %s" %(num))#lock.release() # 释放锁

if __name__ == '__main__':

lock= Lock() #创建一个共享锁实例

for num in range(20):

Process(target= l, args = (lock, num)).start()

#加多把锁

#encoding=utf-8

importmultiprocessingimporttimedefworker(s, i):

s.acquire()print(multiprocessing.current_process().name + "acquire")

time.sleep(i)print(multiprocessing.current_process().name + "release")

s.release()if __name__ == "__main__":#设置限制最多3个进程同时访问共享资源

s = multiprocessing.Semaphore(3)for i in range(5):

p= multiprocessing.Process(target = worker, args = (s, i * 2))

p.start()

D:>py -3 a.py

Process-1 acquire

Process-1 release

Process-2 acquire

Process-4 acquire

Process-5 acquire

Process-2 release

Process-3 acquire

Process-4 release

Process-3 release

Process-5 release

信号传递

#encoding=utf-8

importmultiprocessingimporttimedefwait_for_event(e):"""Wait for the event to be set before doing anything"""

print('wait_for_event: starting')

e.wait()#等待收到能执行信号,如果一直未收到将一直阻塞

print('wait_for_event: e.is_set()->', e.is_set())defwait_for_event_timeout(e, t):"""Wait t seconds and then timeout"""

print('wait_for_event_timeout: starting')

e.wait(t)#等待t秒超时,此时Event的状态仍未未设置,继续执行

print('wait_for_event_timeout: e.is_set()->', e.is_set())

e.set()#初始内部标志为真

if __name__ == '__main__':

e=multiprocessing.Event()print("begin,e.is_set()", e.is_set())

w1= multiprocessing.Process(name='block', target=wait_for_event, args=(e,))

w1.start()#可将2改为5,看看执行结果

w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2))

w2.start()print('main: waiting before calling Event.set()')

time.sleep(3)#e.set() #可注释此句话看效果

print('main: event is set')

执行结果:

D:>py -3 a.py

begin,e.is_set() False

main: waiting before calling Event.set()

wait_for_event: starting

wait_for_event_timeout: starting

wait_for_event_timeout: e.is_set()-> False

wait_for_event: e.is_set()-> True

main: event is set

进程同步(使用管道-Pipe)

#使用管道模式-进程间的通信方式是你一句我一句

#encoding=utf-8

importmultiprocessing as mpdefproc_1(pipe):

pipe.send('hello')print('proc_1 received: %s' %pipe.recv())

pipe.send("what is your name?")print('proc_1 received: %s' %pipe.recv())defproc_2(pipe):print('proc_2 received: %s' %pipe.recv())

pipe.send('hello, too')print('proc_2 received: %s' %pipe.recv())

pipe.send("I don't tell you!")if __name__ == '__main__':#创建一个管道对象pipe

pipe =mp.Pipe()print(len(pipe))print(type(pipe))#将第一个pipe对象传给进程1

p1 = mp.Process(target = proc_1, args =(pipe[0], ))#将第二个pipe对象传给进程2

p2 = mp.Process(target = proc_2, args = (pipe[1], ))

p2.start()

p1.start()

p2.join()

p1.join()

执行结果:

C:Usersdell>py -3 C:UsersdellDesktop练习5518.py

2

proc_2 received: hello

proc_1 received: hello, too

proc_2 received: what is your name?

proc_1 received: I don't tell you!

小练习:将收到的消息,按照顺序写入到一个char_history.txt文件里面。

#思路:收到一句写一句

#encoding=utf-8

importmultiprocessing as mpdefwrite_message(file_path,message):

with open(file_path,"a") as fp:

fp.write(message+"n")defproc_1(pipe):

pipe.send('hello')

message=pipe.recv()print('proc_1 received: %s' %message)

write_message("d:\a.txt",message)

pipe.send("what is your name?")

message=pipe.recv()print('proc_1 received: %s' %message)

write_message("d:\a.txt",message)defproc_2(pipe):

message=pipe.recv()print('proc_2 received: %s' %message)

write_message("d:\a.txt",message)

pipe.send('hello, too')

message=pipe.recv()print('proc_2 received: %s' %message)

write_message("d:\a.txt",message)

pipe.send("I don't tell you!")if __name__ == '__main__':#创建一个管道对象pipe

pipe =mp.Pipe()print(len(pipe))print(type(pipe))#将第一个pipe对象传给进程1

p1 = mp.Process(target = proc_1, args =(pipe[0], ))#将第二个pipe对象传给进程2

p2 = mp.Process(target = proc_2, args = (pipe[1], ))

p2.start()

p1.start()

p2.join()

p1.join()

同步进程(使用condition)

#生产者和消费者模式

#encoding=utf-8

importmultiprocessing as mpimportthreadingimporttimedefconsumer(cond):

with cond:print("consumer before wait")

cond.wait()#等待消费

print("consumer after wait")defproducer(cond):

with cond:print("producer before notifyAll")

cond.notify_all()#通知消费者可以消费了

print("producer after notifyAll")if __name__ == '__main__':

condition=mp.Condition()#p1,p2是消费者,p3是生产者

p1 = mp.Process(name = "p1", target = consumer, args=(condition,))

p2= mp.Process(name = "p2", target = consumer, args=(condition,))

p3= mp.Process(name = "p3", target = producer, args=(condition,))

p1.start()

time.sleep(2)

p2.start()

time.sleep(2)

p3.start()

执行结果:

E:>py -3 a.py

consumer before wait

consumer before wait

producer before notifyAll

producer after notifyAll

consumer after wait

consumer after wait

小练习:将生产者和消费者增加队列,来完成一个任务。

比如:生产者生产一种食物,放到队列里面,消费者去队列里面去取食物去吃。

#encoding=utf-8

importmultiprocessing as mpfrom multiprocessing importQueueimportthreadingimporttimeimportrandomdefconsumer(cond,q):for i in range(3):

with cond:print("consumer before wait")

cond.wait()#等待消费

food =q.get()print("consumer after wait,eat %s" %food)defproducer(cond,q):

food_material= ["tomato","egg","lettuce","potato"]for i in range(6):

with cond:print("producer before notifyAll")

cond.notify_all()#通知消费者可以消费了

q.put(random.choice(food_material)+" "+random.choice(food_material))

q.put(random.choice(food_material)+" "+random.choice(food_material))print("producer after notifyAll")if __name__ == '__main__':

condition=mp.Condition()

q=Queue()

p1= mp.Process(name = "p1", target = consumer, args=(condition,q))

p2= mp.Process(name = "p2", target = consumer, args=(condition,q))

p3= mp.Process(name = "p3", target = producer, args=(condition,q))

p1.start()

time.sleep(2)

p2.start()

time.sleep(2)

p3.start()

E:>py -3 a.py

consumer before wait

consumer before wait

producer before notifyAll

producer after notifyAll

consumer after wait,eat egg egg

consumer after wait,eat egg potato

producer before notifyAll

producer after notifyAll

consumer before wait

consumer before wait

producer before notifyAll

producer after notifyAll

consumer after wait,eat egg lettuce

consumer after wait,eat potato potato

producer before notifyAll

producer after notifyAll

consumer before wait

consumer before wait

producer before notifyAll

producer after notifyAll

consumer after wait,eat tomato tomato

consumer after wait,eat potato egg

producer before notifyAll

producer after notifyAll

多进程间共享全局变量

#encoding=utf-8

from multiprocessing importProcessdeff(n, a):

n= 3.1415927

for i inrange(len(a)):

a[i]= -a[i]print(a[i])if __name__ == '__main__':

num= 0 # arr = list(range(10))

p= Process(target = f, args =(num, arr))

p.start()

p.join()print(num)print(arr[:])#encoding=utf-8

from multiprocessing importProcess, Value, Arraydeff(n, a):

n.value= 3.1415927

for i inrange(len(a)):

a[i]= -a[i]if __name__ == '__main__':

num= Value('d', 0.0) #创建一个进程间共享的数字类型,默认值为0

arr = Array('i', range(10)) #创建一个进程间共享的数组类型,初始值为range[10]

p = Process(target = f, args =(num, arr))

p.start()

p.join()print(num.value) #获取共享变量num的值

print(arr[:])

C:Usersdell>py -3 C:UsersdellDesktop练习5518.py

0

-1

-2

-3

-4

-5

-6

-7

-8

-9

0

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

"""

小练习:

多进程访问6个网址,计算一下一共有多少个字符。用2个子进程来实现。

"""

#encoding=utf-8

from multiprocessing importProcess,Queue,Value, Arrayimportrequests

url= ["http://www.sina.com.cn","http://www.sohu.com","http://www.163.com","http://cn.bing.com","http://www.baidu.com","http://www.iciba.com"]

q=Queue() #生成跨进程的队列

for i in url: #把url放进队列

q.put(i)deff(count,q):while not q.empty(): #如果队列不为空

url =q.get()

r=requests.get(url)

count.value+=len(r.text)print(url,len(r.text))#print(r.text)

if __name__ =="__main__":

num= Value('i', 0) #创建一个进程间共享的数字类型,默认值为0

p1 = Process(target = f, args =(num, q))

p2= Process(target = f, args =(num, q))

p1.start()

p2.start()

p1.join()

p2.join()print(num.value)

#共享变量 加锁

#encoding=utf-8

importtimefrom multiprocessing importProcess, Value, Lockclass Counter(object): #操作共享变量的类

def __init__(self, initval =0):

self.val= Value('i', initval)#生成实例变量,共享整数变量

self.lock = Lock()#跨进程的锁

def increment(self): #通过加锁的方式,给共享变量+1

with self.lock:

self.val.value+= 1 #共享变量自加1

#print(“increment one time!”,self.value() )

#加此句死锁

def value(self):#获取当前共享变量的值

with self.lock:#with自动调用实例中的__enter__,__exit__方法

returnself.val.value#加锁的情况下返回当前共享变量的值

def func(counter):#多进程执行的任务,counter是类Counter的实例

for i in range(50): #做50次累加

time.sleep(0.01)

counter.increment()if __name__ == '__main__':

counter= Counter(0) #生成了一个类Counter的实例

procs = [Process(target =func,

args= (counter,)) for i in range(10)]#启动了10个进程,执行任务函数func,传入的参数是

#所有的计数操作都会在counter实例里面完成。

#等价于

#for i in range(10):

#Process(target = func, args = (counter,))

for p in procs: p.start() #启动了10个进程

for p in procs: p.join() #阻塞了10个进程

print(counter.value()) #打印了累加值。

#多进程间共享字符串变量

#encoding=utf-8

from multiprocessing importProcess, Manager, Valuefrom ctypes importc_char_pdefgreet(shareStr):

shareStr.value= shareStr.value + ", World!"

if __name__ == '__main__':

manager=Manager()

shareStr= manager.Value(c_char_p, "Hello")

process= Process(target = greet, args =(shareStr,))

process.start()

process.join()print(shareStr.value)

#多进程间共享不同类型的数据结构对象

#encoding=utf-8

from multiprocessing importProcess, Managerdeff( shareDict, shareList ):

shareDict[1] = '1'shareDict['2'] = 2shareDict[0.25] =None

shareList.reverse()#翻转列表

if __name__ == '__main__':

manager=Manager()

shareDict= manager.dict() #创建共享的字典类型

shareList = manager.list( range( 10 ) ) #创建共享的列表类型

p = Process( target = f, args =( shareDict, shareList ) )

p.start()

p.join()print(shareDict)print(shareList)

进程池的共享队列

#跨进程使用队列

#encoding=utf-8

from multiprocessing importPool,Managerdeffunc(q):print("*"*10)

q.put("12346")if __name__ == "__main__":

manager=Manager()

q= manager.Queue()#进程池队列

pool = Pool(processes=4)for i in range(5):

pool.apply_async(func,(q,))

pool.close()

pool.join()print(q.qsize())

进程间共享实例对象

#encoding=utf-8

importtime, osimportrandomfrom multiprocessing importPool, Value, Lock, Managerfrom multiprocessing.managers importBaseManagerclass MyManager(BaseManager): #跨进程共享内存的对象

pass

defManager():

m= MyManager() #实例化一个跨进程的内存空间

m.start() #启动

return m #返回这个空间对象

class Counter(object): #共享计数的类

def __init__(self, initval=0):

self.val= Value('i', initval)

self.lock=Lock()def increment(self): #通过锁的方式,实现共享数字的累加

with self.lock:

self.val.value+= 1

defvalue(self):

with self.lock:#通过锁的方式,读取共享数字的值

returnself.val.value#将Counter类注册到Manager管理类中,将类注册到共享空间对象里面

MyManager.register('Counter', Counter)def long_time_task(name,counter): #多进程执行的任务

time.sleep(0.2)print('Run task %s (%s)...n' %(name, os.getpid()))

start=time.time()#time.sleep(random.random() * 3)

for i in range(50):

time.sleep(0.01)

counter.increment()#任务,通过实例对象累加50次

end =time.time()#计算一下当前任务的耗时

print('Task %s runs %0.2f seconds.' % (name, (end -start)))if __name__ == '__main__':

manager=Manager()#创建共享Counter类实例对象的变量,Counter类的初始值0

counter =manager.Counter(0)print('Parent process %s.' %os.getpid())

p=Pool()for i in range(5):

p.apply_async(long_time_task, args=(str(i), counter))print('Waiting for all subprocesses done...')

p.close()

p.join()print('All subprocesses done.')print(counter.value())

共享对象的逻辑:

1 class MyManager(BaseManager): #跨进程共享内存的对象

pass

2 将共享的类注册到共享内存中:

MyManager.register('Counter', Counter)

3 启动共享内存对象,并返回这个对象

def Manager():

m = MyManager()  #实例化一个跨进程的内存空间

m.start()        #启动

return m         #返回这个空间对象

4 创建共享对象的实例:

counter = manager.Counter(0)

5 在多进程中使用跨进程的类实例

p.apply_async(long_time_task, args = (str(i), counter))

总结:

1 进程的本质:运行的程序实例

每个进程使用的内存空间是独立的,所以使用的变量都是不共享。

(不同享:一个进程的变量只能在进程内部访问和使用,不能在进程

外部访问和使用)

2 生成进程的方式:

process-->本质:3个参数:name,task(一个函数),args(函数需要的参数)

进程使用的方式:

start

join

pool--》参数:进程启动的数量:cpu核数

进程池执行的几种方式:

pool.map:task, iterable object

pool.apply_asyn(name,task,args)

pool.close()

pool.join()

3 共享变量(5种)

i

d

str

list、dict

实例

4 同步:

非进程池使用的队列:Multiprocess 的Queue,joianblequeue(task_done())

进程池使用的队列:

manager = Manager()

q = manager.Queue()

wait--e.set(信号)

wait--notify_all(生产者和消费者)condition

pipe(管道)

5 锁:

防止资源竞争。

#进程日志

#encoding=utf-8

importmultiprocessingimportloggingimportsysdefworker():print('I am working....')

sys.stdout.flush()if __name__ == '__main__':#设置日志输出到控制台

multiprocessing.log_to_stderr()

logger=multiprocessing.get_logger()#设置输出日志的级别

logger.setLevel(logging.INFO)

p= multiprocessing.Process(target =worker)

p.start()

p.join()

C:Usersdell>py -3 C:UsersdellDesktop练习5518.py

[INFO/Process-1] child process calling self.run()

I am working....

[INFO/Process-1] process shutting down

[INFO/Process-1] process exiting with exitcode 0

[INFO/MainProcess] process shutting down

#守护进程:让子进程跟主进程共存亡

#encoding=utf-8

importmultiprocessingimporttime, loggingimportsysdef daemon(): #demon进程的任务

p =multiprocessing.current_process()print('Starting:', p.name, p.pid)

sys.stdout.flush()#将缓冲区数据写入终端

time.sleep(2)print('Exiting :', p.name, p.pid)

sys.stdout.flush()def non_daemon(): #非demon进程的任务

p =multiprocessing.current_process()print('Starting:', p.name, p.pid)

sys.stdout.flush()

time.sleep(2)print('Exiting :', p.name, p.pid)

sys.stdout.flush()if __name__ == '__main__':#设置日志输出到控制台

multiprocessing.log_to_stderr()

logger=multiprocessing.get_logger()#设置输出日志的级别

logger.setLevel(logging.DEBUG)

d= multiprocessing.Process(name='daemon', target=daemon)

d.daemon=True

n= multiprocessing.Process(name='non-daemon', target=non_daemon)

n.daemon=False

d.start()

time.sleep(1)

n.start()#d.join() #加join可以让他们都执行完在退出

#n.join()

print('d.is_alive()', d.is_alive())print("n.is_alive()", n.is_alive())print("main Process end!")

#subprocess模块

#标准输出

importsubprocess#默认参数shell=Fasle,必须使用列表方式来设定命令和参数,例如:[命令,参数1,参数2]

obj = subprocess.Popen(["python"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

obj.stdin.write(b"print(1);n")

obj.stdin.write(b"print(2);n")

obj.stdin.write(b"print(3);n")

obj.stdin.write(b"print(4);n")

obj.stdin.close()

cmd_out=obj.stdout.read()

obj.stdout.close()

cmd_error=obj.stderr.read()

obj.stderr.close()print(cmd_out)print(cmd_error)

#进程通讯实例:批量执行系统命令

打开一个只有ip地址的文本文件,读取其中的ip或域名,然后进行ping操作, 并将ping结果写入ping.txt文件中。ip.txt文件内容如下: www.baidu.com www.taobao.com 123.45.5.34 127.0.0.1

importsubprocessimportosclassShell(object) :defrunCmd(self, cmd) :

res= subprocess.Popen(cmd, shell=True,

stdout=subprocess.PIPE, stderr=subprocess.STDOUT)#获取子进程的标准输出,标准错误信息

sout, serr =res.communicate()#sout:执行命令后的输出内容,serr出错内容,res.pid为进程编号

returnres.returncode, sout, serr, res.pid

shell=Shell()

fp= open('c:\test\ip.txt', 'r')

ipList=fp.readlines()

fp.close()

fp= open('c:\test\ping.txt', 'a')print(ipList)for i inipList :

i=i.strip()

result= shell.runCmd('ping' +i)if result[0] ==0 :

w= i + ': 0'fp.write(w+ 'n')else:

w= i + ': 1'fp.write(w+ 'n')print( result[1].decode("gbk"))

fp.close()

最后

以上就是纯真外套为你收集整理的python多进程编程实例_python学习笔记之---多进程实例的全部内容,希望文章能够帮你解决python多进程编程实例_python学习笔记之---多进程实例所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(40)

评论列表共有 0 条评论

立即
投稿
返回
顶部