生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品,从而消耗掉生产的数据。达到供需平衡,不能生产多了浪费,也不能需要消耗资源的时候没有。
from multiprocessing import Process,Queue #多进程组件,队列 import time,random #生产者方法 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) #模拟获取数据时间 f = '%s生产的%s%s'%(name,food,i) print(f) q.put(f) #添加进队列 #消费者方法 def consumer(q,name): while True: food = q.get() #如果获取不到,会一直阻塞进程不会结束子进程 # 当队列中的数据是None的时候结束while循环 if food is None: print('%s获取到一个空'%name) break f = '\033[31m%s消费了%s\033[0m' % (name, food) print(f) time.sleep(random.randint(1,3)) # 模拟消耗数据时间 if __name__ == '__main__': q = Queue() # 创建队列 # 模拟生产者 生产数据 p = Process(target=producer, args=('p', '包子', q)) #创建进程 p.start() #启动进程 p1 = Process(target=producer, args=('p1', '烧饼', q)) p1.start() #模拟消费者消费数据 c = Process(target=consumer, args=(q, 'c')) c.start() c1 = Process(target=consumer, args=(q, 'c1')) c1.start() p.join()#阻塞主进程 直到p和p1 子进程结束后才执行q.put() 方法 p1.join()#阻塞主进程 直到p和p1 子进程结束后才执行q.put() 方法 #为了确保生产者生产完所有数据后, #最后一个是None,方便结束子进程中的while循环, #否则会一直等待队列中加入新数据。 q.put(None) q.put(None)
使用Queue
组件实现的缺点就是,实现了多少个消费者consumer进程,就需要在最后往队列中添加多少个None
标识,方便生产完毕结束消费者consumer进程。否则,p.get()
不到任务会阻塞子进程,因为while
循环,直到队列q
中有新的任务加进来,才会再次执行。而我们的生产者只能生产这么多东西,所以相当于程序卡死。
from multiprocessing import JoinableQueue,Process import time,random #生产者方法 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1, 2)) f = '%s生产的%s%s'%(name,food,i) q.put(f) print(f) q.join() #一直阻塞,等待消耗完所有的数据后才释放 #消费者方法 def consumer(name,q): while True: food = q.get() print('\033[31m%s消费了%s\033[0m' % (name, food)) time.sleep(random.randint(4,8)) q.task_done() #每次消耗减1 if __name__ == '__main__': q = JoinableQueue() #创建队列 #模拟生产者队列 p1 = Process(target=producer,args=('p1','包子',q)) p1.start() p2 = Process(target=producer,args=('p2','烧饼',q)) p2.start() #模拟消费者队列 c1 = Process(target=consumer,args=('c1',q)) c1.daemon = True #守护进程:主进程结束,子进程也会结束 c1.start() c2 = Process(target=consumer,args=('c2',q)) c2.daemon = True c2.start() p1.join() #阻塞主进程,等到p1子进程结束才往下执行 p2.join() # q.task_done() 每次消耗队列中的 任务数减1 # q.join() 一直阻塞,等待队列中的任务数消耗完才释放 # 因为有 q.join 所有一直会等待 c1,c2 消耗完毕。才会执行 p.join 后面的代码 # 因为 c1 c2 是守护进程,所以到这一步主进程代码执行完毕,主进程会释放死掉, # 所以 c1 c2 也会跟随 主进程释放死掉。
使用JoinableQueue
组件,是因为JoinableQueue
中有两个方法:task_done()
和join()
。首先说join()
和Process
中的join()
的效果类似,都是阻塞当前进程,防止当前进程结束。但是JoinableQueue
的join()
是和task_down()
配合使用的。Process
中的join()
是等到子进程中的代码执行完毕,就会执行主进程join()
下面的代码。而JoinableQueue
中的join()
是等到队列中的任务数量为0
的时候才会执行q.join()
下面的代码,否则会一直阻塞。task_down()
方法是每获取一次队列中的任务,就需要执行一次。直到队列中的任务数为0
的时候,就会执行JoinableQueue
的join()
后面的方法了。所以生产者生产完所有的数据后,会一直阻塞着。不让p1
和p2
进程结束。等到消费者get()
一次数据,就会执行一次task_down()
方法,从而队列中的任务数量减1
,当数量为0
后,执行JoinableQueue
的join()
后面代码,从而p1
和p2
进程结束。
因为p1
和p2
添加了join()
方法,所以当子进程中的consumer
方法执行完后,才会往下执行。从而主进程结束。因为这里把消费者进程c1
和c2
设置成了守护进程,主进程结束的同时,c1
和c2
进程也会随之结束,进程都结束了。所以消费者consumer
方法也会结束。
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!