10-进程池Pool中进程间通信
目标¶
- 知道实现进程池中进程间通信
进程池中的Queue¶
如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue()
用法: queue = multiprocessing.Manager().Queue(3)
而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
阻塞式同步执行¶
下面的实例演示了进程池中的进程如何通信:
import multiprocessing
import time
# 写入数据的方法
def write_data(queue):
# for循环 向消息队列中写入值
for i in range(5):
# 判断队列是否已满
if queue.full():
print("队列已满!~")
break
queue.put(i)
print("正在写入:",i)
time.sleep(0.2)
# 创建读取数据的方法
def read_data(queue):
# 循环读取数据
while True:
# 判断队列是否为空
if queue.qsize() == 0:
print("队列为空~")
break
# 从队列中读取数据
result = queue.get()
print(result)
time.sleep(0.2)
if __name__ == '__main__':
# 创建进程池
pool = multiprocessing.Pool(2)
# 创建进程池队列
queue = multiprocessing.Manager().Queue(3)
# 在进程池中的进程间进行通信
# 使用线程池同步的方式,先写后读
# pool.apply(write_data, (queue, ))
# pool.apply(read_data, (queue, ))
# apply_async() 返回ApplyResult 对象
result = pool.apply_async(write_data, (queue, ))
# ApplyResult对象的wait() 方法,表示后续进程必须等待当前进程执行完再继续
result.wait()
pool.apply_async(read_data, (queue, ))
pool.close()
# 异步后,主线程不再等待子进程执行结束,再结束
# join() 后,表示主线程会等待子进程执行结束后,再结束
pool.join()
运行结果:
正在写入: 0
正在写入: 1
正在写入: 2
队列已满!~
正在获取: 0
正在获取: 1
正在获取: 2
队列已空!~
Process finished with exit code 0
非阻塞式异步执行¶
"""
任务1:每隔1s将数据写入queue
任务2:不断地从消息队列取数据
主进程:
创建进程池专用消息队列
创建进程池
使用进程池指派任务
"""
import multiprocessing
import time
# 任务1:每隔1s将数据写入 queue
def write_queue(queue):
for i in range(5):
print("写入queue数据:",i)
queue.put(i)
time.sleep(1)
# 任务2:不断地从消息队列取数据
def read_queue(queue):
while True:
try:
value = queue.get(timeout=3)
print("读取到queue的值:",value)
except:
print("超过3秒没获取到数据,停止循环")
break
# 主进程:
if __name__ == '__main__':
# 创建进程池专用消息队列
queue = multiprocessing.Manager().Queue(3)
# 创建进程池
pool = multiprocessing.Pool(2)
# 使用进程池指派任务
# pool.apply(func=write_queue, args=(queue,))
# pool.apply(func=read_queue, args=(queue,))
pool.apply_async(func=write_queue, args=(queue,))
pool.apply_async(func=read_queue, args=(queue,))
# 关闭pool
pool.close()
pool.join()
- 输出:
写入queue数据: 0
读取到queue的值: 0
写入queue数据: 1
读取到queue的值: 1
写入queue数据: 2
读取到queue的值: 2
写入queue数据: 3
读取到queue的值: 3
写入queue数据: 4
读取到queue的值: 4
超过3秒没获取到数据,停止循环