概述
一、使用多线程实现生产者与消费者模型
1、Condition模型
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁,但是notify and notifyall本身是不会释放占有的Condition内部的锁,所以随后需要condition.release()来显示的释放锁。
Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。
import threading
import time,random
product=[];
cond = threading.Condition()
class Producer(threading.Thread):
def __init__(self,speed):
threading.Thread.__init__(self)
self.speed = speed
def run(self):
while True:
for i in range(self.speed):
cond.acquire()
while len(product)>=5:
cond.wait();
num=random.random();
product.append(num)
print("生产者"+str(self.speed)+"生产了:"+str(num))
cond.notifyAll()
cond.release()
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
count=0
time_start=time.clock()
while True:
speed=random.randint(1,5);
for i in range(speed):
cond.acquire()
while len(product)<=0:
cond.wait()
num=product[0];
del product[0]
print("消费者,消费了"+str(num))
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
cond.notify()
cond.release()
time.sleep(1)
Producer(1).start();
Producer(2).start();
Consumer().start();
2、Queue模型
(1)创建一个 Queue.Queue() 的实例,来表示缓冲池。
(2)每次从使用生产者线程对队列中的数据进行填充,使用消费者线程取出队列中的数据。
import threading
import queue
import time
import random
class Producer(threading.Thread):
def __init__(self,speed,queue):
threading.Thread.__init__(self)
self.speed = speed
self.queue = queue
def run(self):
while True:
for i in range(self.speed):
item = random.random()
self.queue.put(item)
print("生产者",self.speed,"生产了:",str(item))
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
time_start=time.clock()
count=0
while True:
speed=random.randint(1, 5)
for i in range(speed):
item = self.queue.get()
print("消费者","消费:",item)
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
time.sleep(1)
q = queue.Queue(maxsize = 5)
if __name__ == '__main__':
Producer(1,q).start()
Producer(2,q).start()
Consumer(q).start()
3、信号量模型:
import sys, time
import random
from threading import Thread, Semaphore
product = []
mutex = Semaphore(1)
#也可以是mutex = RLock()
full = Semaphore(0)
empty = Semaphore(5)
class Producer(Thread):
def __init__(self, speed):
Thread.__init__(self);
self.speed=speed;
def run(self):
while True:
for i in range(self.speed):
ProductThing='';
empty.acquire()
mutex.acquire()
num=random.random()
ProductThing+=str(num)+' '
product.append(str(num))
print ('%s: count=%d' % ("producer"+str(self.speed), len(product)))
print (' product things:'+ProductThing)
mutex.release()
full.release()
time.sleep(1)
class Consumer(Thread):
def __init__(self):
Thread.__init__(self);
def run(self):
count=0
time_start=time.clock()
while True:
speed=random.randint(1, 5)
for i in range(speed):
consumeThing=""
full.acquire()
mutex.acquire()
consumeThing+=str(product[0])+' '
del product[0]
print('%s: count=%d' % ("consumer", len(product)))
print(' consume things:'+consumeThing)
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
mutex.release()
empty.release()
time.sleep(1)
if __name__ == '__main__':
Producer(1).start()
Producer(2).start()
Consumer().start()
4、Event模型
threading.Event机制类似于一个线程向其它多个线程发号施令的模式,其它线程都会持有一个threading.Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。
import threading
import random
import time
def produce(speed,e_p1,e_p2,e_c,product):
while True:
for i in range(speed):
if(speed==1):
e_p1.wait();
if(speed==2):
e_p2.wait();
num=random.random();
product.append(num)
print("生产者"+str(speed)+",生产了"+str(num))
e_c.set();
if(speed==1):
e_p1.clear();
if(speed==2):
e_p2.clear();
time.sleep(1)
def consume(e_p1,e_p2,e_c,product):
count=0
time_start=time.clock()
while True:
speed=random.randint(1,5);
for i in range(speed):
e_c.wait();
num=product[0];
del product[0]
print("消费者,消费了"+str(num))
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
r=random.randint(1,2)
if(r==1):
e_p1.set()
if(r==2):
e_p2.set()
e_c.clear()
time.sleep(1)
if __name__ == '__main__':
e_p1= threading.Event();
e_p2= threading.Event();
e_c= threading.Event();
e_p1.set()
product=[]
p1=threading.Thread(target=produce, args=(1,e_p1,e_p2,e_c,product))
p1.start()
p2=threading.Thread(target=produce, args=(2,e_p1,e_p2,e_c,product))
p2.start()
c1=threading.Thread(target=consume, args=(e_p1,e_p2,e_c,product))
c1.start()
二、多进程实现生产者与消费者模型
1、 信号量和共享内存
'''
@author: jqy
'''
from multiprocessing import Process
import time
import random
import multiprocessing
def produce(speed,mutex,full,empty,product,pindex,cindex):
while True:
for i in range(speed):
ProductThing='';
empty.acquire()
mutex.acquire()
num=random.random()
ProductThing+=str(num)+' '
product[pindex.value]=num
pindex.value=(pindex.value+1)%len(product)
print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
mutex.release()
full.release()
time.sleep(1)
def consume(mutex,full,empty,product,pindex,cindex):
count=0
time_start=time.clock()
while True:
speed=random.randint(1, 5)
for i in range(speed):
consumeThing=""
full.acquire()
with mutex:
consumeThing+=str(product[cindex.value])+' '
cindex.value=(cindex.value+1)%len(product)
print('%s: consume things:%s' % ("consumer", consumeThing))
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
empty.release()
time.sleep(1)
if __name__ == '__main__':
product = multiprocessing.Array('d',range(5))
pindex=multiprocessing.Value('i',0)
cindex=multiprocessing.Value('i',0)
mutex = multiprocessing.RLock()
full = multiprocessing.Semaphore(0)
empty = multiprocessing.Semaphore(5)
Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex)).start()
Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex)).start()
Process(target=consume, args=(mutex,full,empty,product,pindex,cindex)).start()
2、服务进程管理器Manager
'''
@author: jqy
'''
from multiprocessing import Process
import time
import random
import multiprocessing
def produce(speed,mutex,full,empty,product,pindex,cindex):
while True:
for i in range(speed):
ProductThing='';
empty.acquire()
mutex.acquire()
num=random.random()
ProductThing+=str(num)+' '
product[pindex.value]=num
pindex.value=(pindex.value+1)%len(product)
print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
mutex.release()
full.release()
time.sleep(1)
def consume(mutex,full,empty,product,pindex,cindex):
count=0
time_start=time.clock()
while True:
speed=random.randint(1, 5)
for i in range(speed):
consumeThing=""
full.acquire()
with mutex:
consumeThing+=str(product[cindex.value])+' '
cindex.value=(cindex.value+1)%len(product)
print('%s: consume things:%s' % ("consumer", consumeThing))
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
empty.release()
time.sleep(1)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
product = mgr.Array('d',range(5))
pindex = mgr.Value('i',0)
cindex = mgr.Value('i',0)
mutex = mgr.RLock()
full = mgr.Semaphore(0)
empty = mgr.Semaphore(5)
p1 = Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex))
p2 = Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex))
p3 = Process(target=consume, args=(mutex,full,empty,product,pindex,cindex))
p1.start(), p2.start(), p3.start()
p1.join(), p2.join(), p3.join()
3、 Condition模型
import multiprocessing
import random
import time
def produce(speed,cond,product):
while True:
for i in range(speed):
cond.acquire()
while len(product)>=5:
cond.wait();
num=random.random();
product.append(num)
print("生产者"+str(speed)+",生产了:"+str(num))
cond.notify()
cond.release()
time.sleep(1)
def consume(cond,product):
count=0
time_start=time.clock()
while True:
speed=random.randint(1,5);
for i in range(speed):
cond.acquire()
while len(product)<=0:
cond.wait()
num=product[0];
del product[0]
print("消费者,消费了"+str(num))
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
cond.notify()
cond.release()
time.sleep(1)
if __name__ == '__main__':
c= multiprocessing.Condition();
product=multiprocessing.Manager().list()
p1=multiprocessing.Process(target=produce, args=(1,c,product))
p1.start()
p2=multiprocessing.Process(target=produce, args=(2,c,product))
p2.start()
c1=multiprocessing.Process(target=consume, args=(c,product))
c1.start()
p1.join()
p2.join()
c1.join()
4、 消息队列模型
'''
@author: jqy
'''
from multiprocessing import Process
import random
import time
import multiprocessing
def produce(speed,q):
while True:
for i in range(speed):
item = random.random()
q.put(item)
print("生产者",speed,"生产了",str(item))
time.sleep(1)
def consume(num,q):
count=0
time_start=time.clock()
while True:
speed=random.randint(1, 5)
for i in range(speed):
item = q.get()
print("消费者","消费了",item)
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
time.sleep(1)
if __name__ == '__main__':
q = multiprocessing.Queue(maxsize = 5)
Process(target=produce, args=(1,q)).start()
Process(target=produce, args=(2,q)).start()
Process(target=consume, args=(1,q)).start()
5、 管道模型:
'''
Created on 2016/11/11
@author: jqy
'''
from multiprocessing import Process
import random
import time
import multiprocessing
def produce(speed,empty,p):
while True:
for i in range(speed):
empty.acquire()
item = random.random()
p.send(item)
print("生产者",speed,",生产了",str(item))
time.sleep(1)
def consume(num,empty,p):
count=0
time_start=time.clock()
while True:
speed=random.randint(1,5)
for i in range(speed):
item = p.recv()
print("消费者"," 消费了",item)
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
empty.release()
time.sleep(1)
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
empty = multiprocessing.Semaphore(5)
Process(target=produce, args=(1,empty,child_conn)).start()
Process(target=produce, args=(2,empty,child_conn)).start()
Process(target=consume, args=(1,empty,parent_conn)).start()
6、 Event模型
生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。
import multiprocessing
import random
import time
def produce(speed,e_p1,e_p2,e_c,product):
while True:
for i in range(speed):
if(speed==1):
e_p1.wait();
if(speed==2):
e_p2.wait();
num=random.random();
product.append(num)
print("生产者"+str(speed)+",生产了"+str(num))
e_c.set();
if(speed==1):
e_p1.clear();
if(speed==2):
e_p2.clear();
time.sleep(1)
def consume(e_p1,e_p2,e_c,product):
count=0
time_start=time.clock()
while True:
speed=random.randint(1,5);
for i in range(speed):
e_c.wait();
num=product[0];
del product[0]
print("消费者,消费了"+str(num))
count=count+1
if(count==20):
time_end=time.clock()
print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
r=random.randint(1,2)
if(r==1):
e_p1.set()
if(r==2):
e_p2.set()
e_c.clear()
time.sleep(1)
if __name__ == '__main__':
e_p1= multiprocessing.Event();
e_p2= multiprocessing.Event();
e_c= multiprocessing.Event();
e_p1.set()
product=multiprocessing.Manager().list()
p1=multiprocessing.Process(target=produce, args=(1,e_p1,e_p2,e_c,product))
p1.start()
p2=multiprocessing.Process(target=produce, args=(2,e_p1,e_p2,e_c,product))
p2.start()
c1=multiprocessing.Process(target=consume, args=(e_p1,e_p2,e_c,product))
c1.start()
p1.join()
p2.join()
c1.join()
三、不同主机上生产者消费者模型:
1、Manager:服务进程管理器可以通过网络由不同计算机上的进程共享
2、 socketTCP模型:
from multiprocessing import Process
import queue
import threading
import socket
import random
import time
MaxSize=5
def produce(speed,host,port):
A=(host,port)
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,MaxSize*3)
s.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
s.connect(A)
item=1;
while True:
for i in range(speed):
if(speed==1):
print("生产者",speed,",生产了","{0:0=3}".format(item))
s.send(bytes("{0:0=3}".format(item), encoding = "utf8"))
else:
print("生产者",speed,",生产了","{0:x=3}".format(item))
s.send(bytes("{0:x=3}".format(item), encoding = "utf8"))
item=item+1
time.sleep(1)
s.close()
def consume(host1,port1,host2,port2):
q=queue.Queue(maxsize=MaxSize);
threading.Thread(target=getMessage, args=(q,host1,port1)).start()
threading.Thread(target=getMessage, args=(q,host2,port2)).start()
count=0
time_start=time.clock()
while True:
speed=random.randint(1,MaxSize)
for i in range(speed):
item=q.get()
print("消费者"," 消费了",item)
count=count+1
if(count==20):
time_end=time.clock()
print("消费"+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
time.sleep(1)
def getMessage(q,host,port):
A=(host,port)
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
sock.setsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF,MaxSize*3)
sock.bind(A)
sock.listen(0)
tcpClientSock, addr=sock.accept()
while True:
try:
data=tcpClientSock.recv(3)
q.put(str(data,encoding = "utf8"))
print("count="+str(q.qsize()))
except:
print("exception")
tcpClientSock.close()
sock.close()
if __name__ == '__main__':
Process(target=produce, args=(1,'localhost',8080)).start()
Process(target=produce, args=(2,'localhost',8090)).start()
Process(target=consume, args=('localhost',8080,'localhost',8090)).start()
3、 远程调用模型:
先在主进程中注册获取产品的方法,消费者在取用商品时调用取用商品的远程方法来获取。取用商品有一定的延迟,使得程序的整个运行速度比较慢。
from multiprocessing import Process
from xmlrpc.client import ServerProxy
from xmlrpc.server import SimpleXMLRPCServer
import random
import time
import multiprocessing
q = multiprocessing.Queue(maxsize = 5)
def produce(speed,q):
while True:
for i in range(speed):
item = random.random()
q.put(item)
print("生产者",speed,"生产了",str(item))
time.sleep(1)
def getAProduct():
global q
return q.get()
def consume(host,port):
server = ServerProxy("http://"+host+":"+str(port))
count=0
time_start=time.clock()
while True:
speed=random.randint(1, 5)
for i in range(speed):
print("开始远程调用")
item = server.getAProduct()
print("消费者","消费了",item)
count=count+1
if(count==20):
time_end=time.clock()
print("消费"+str(count)+"个资源所需要的时间: %f s" % (time_end - time_start))
time.sleep(1)
if __name__ == '__main__':
s = SimpleXMLRPCServer(('localhost', 8000))
s.register_function(getAProduct)
print('注册获取产品方法完成')
Process(target=produce, args=(1,q)).start()
Process(target=produce, args=(2,q)).start()
Process(target=consume, args=('localhost',8000)).start()
s.serve_forever()
4、使用Redis发布订阅模式:
#encoding=utf-8
'''
@author: jqy
'''
from multiprocessing import Process
import random
import time
import redis
def produce(speed):
client = redis.StrictRedis()
while True:
for i in range(speed):
item = random.random()
client.publish("hole",item)
print("生产者",speed,"生产了",str(item))
time.sleep(1)
def consume(speed):
count=0
time_start=time.clock()
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("hole")
for i in range(speed):
for msg in p.listen():
if msg['type'] != 'message':
continue
print("消费者", "消费了", msg)
count = count + 1
if (count == 20):
time_end = time.clock()
print("消费 " + str(count) + "个商品所用时间: %f s" % (time_end - time_start))
time.sleep(1)
if __name__ == '__main__':
Process(target=consume, args=(2,)).start()
Process(target=produce, args=(1,)).start()
Process(target=produce, args=(2,)).start()
注:如果生产者存在,消费者不存在,那么消息直接会丢弃。生产者发送消息的时候,如果消费者突然断掉了,则消息丢失。
参考文献:
multiprocessing --- 基于进程的并行 — Python 3.10.1 文档
Python使用Manager对象实现不同机器上的进程跨网络传输数据 - 云+社区 - 腾讯云
最后
以上就是满意帽子为你收集整理的利用python实现生产者消费者的并发模型的全部内容,希望文章能够帮你解决利用python实现生产者消费者的并发模型所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复