day40-进程-生产者消费者模型进阶
阅读原文时间:2023年07月10日阅读:2

#1、队列的数据是安全的,因为队列内置了一把锁,大家都来抢占资源的时候,A在操作数据的时候,B就无法操作该数据。

下面代码有两个生产者和三个消费者,包子吃完之后,接着放的两个None被marry和alex吃到了,所以吃饱了,剩下的牛奶,tom吃完。

放在队列里面的None的数量需要根据生产者和消费者的数量来计算,很不方便。

from multiprocessing import Process
from multiprocessing import Queue
import time
import random
def producer(q,food):
for i in range(5):
time.sleep(random.random())
q.put('%s-%s'%(food,i))
print('%s-%s'%(food,i))
q.put(None) #因为有三个消费者,需要三个None才能让consumer方法的循环结束。
q.put(None) #两个生产者有4个None

def consumer(q,name):
while True: #消费者要处理多少数据是不确定的,所以使用while循环。
food = q.get()
if food == None:
print('%s吃饱了'%name)
break #while是无法结束的,所以需要生产者发送None信号。这个信号的数量
print('%s吃了%s'%(name,food))#是根据生产者和消费者的数量来计算的,所以很不方便。

if __name__ == '__main__':
q = Queue()
p = Process(target=producer,args=(q,'包子'))
p.start()
p1 = Process(target=producer,args=(q,'牛奶'))
p1.start()
c = Process(target=consumer,args=(q,'alex'))
c.start()
c1 = Process(target=consumer,args=(q,'marry'))
c1.start()
c2 = Process(target=consumer,args=(q,'tom'))
c2.start()

牛奶-0

alex吃了牛奶-0

包子-0

tom吃了包子-0

包子-1

marry吃了包子-1

包子-2

alex吃了包子-2

牛奶-1

tom吃了牛奶-1

包子-3

marry吃了包子-3

牛奶-2

alex吃了牛奶-2

包子-4

tom吃了包子-4

marry吃饱了

alex吃饱了

牛奶-3

tom吃了牛奶-3

牛奶-4

tom吃了牛奶-4

tom吃饱了

#2、使用JoinableQueue(数据也是安全的),避免计算None的信号数量的麻烦:
#流程:数据全部被消费--->生产进程结束--->主进程结束--->消费守护进程结束。
from multiprocessing import Process
from multiprocessing import JoinableQueue
import time
import random
def producer(q,food):
for i in range(5):
time.sleep(random.random())
q.put('%s-%s'%(food,i))
print('%s-%s'%(food,i))
q.join() #等待数据全部被消费,当包子和牛奶全部被吃完,下面的q.task_done()会发送完成的信号给q.join(),
#q.join()收到信号之后,就不再阻塞,这样这个生产的进程就结束了。

def consumer(q,name):
while True:
food = q.get()
print('%s吃了%s'%(name,food))
q.task_done()

if __name__ == '__main__':
q = JoinableQueue()
p = Process(target=producer,args=(q,'包子'))
p.start()
p1 = Process(target=producer,args=(q,'牛奶'))
p1.start()
c = Process(target=consumer,args=(q,'alex'))
c.daemon = True
c.start()
c1 = Process(target=consumer,args=(q,'marry'))
c1.daemon = True
c1.start()
c2 = Process(target=consumer,args=(q,'tom'))
c2.daemon = True
c2.start()#p.join()和p1.join(),阻塞等待,主进程等待子进程结束之后才结束。
p.join() #生产的进程结束以后,p.join()和p1.join()也结束了,那么if语句下面的主进程都结束了,
p1.join()#接着消费者作为守护进程也随着主进程的结束而结束(循环结束)。这样就不需要计算None的信号数量了。

#3、func因为睡眠了1秒,陷入阻塞,系统会让func1先执行,然后func才执行。
from multiprocessing import Process
import time

def func():
time.sleep(1)
print('')

def func1():
print('')

if __name__ == '__main__':
p = Process(target=func)
p.start()
p1 = Process(target=func1)
p1.start()

#

#4、因为设置了队列,就算func设置了睡眠1秒,也不会阻塞:
from multiprocessing import Process
from multiprocessing import Queue
import time

def func(q):
time.sleep(1)
q.put('')
print('get1')

def func1(q):
print(q.get())
print('')

if __name__ == '__main__':
q = Queue()
p = Process(target=func,args=(q,))
p.start()
p1 = Process(target=func1,args=(q,))
p1.start()

get1

#