10-进程池Pool中进程间通信

目标

  • 知道实现进程池中进程间通信

进程池中的Queue

image-20180822104905591

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue()

用法: queue = multiprocessing.Manager().Queue(3)

而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

image-20180910210121215

阻塞式同步执行

下面的实例演示了进程池中的进程如何通信:

import multiprocessing
import time

# 写入数据的方法
def write_data(queue):
    # for循环 向消息队列中写入值
    for i in range(5):
        # 判断队列是否已满
        if queue.full():
            print("队列已满!~")
            break
        queue.put(i)
        print("正在写入:",i)
        time.sleep(0.2)

# 创建读取数据的方法
def read_data(queue):
    # 循环读取数据
    while True:
        # 判断队列是否为空
        if queue.qsize() == 0:
            print("队列为空~")
            break

        # 从队列中读取数据
        result = queue.get()
        print(result)
        time.sleep(0.2)


if __name__ == '__main__':

    # 创建进程池
    pool = multiprocessing.Pool(2)

    # 创建进程池队列
    queue = multiprocessing.Manager().Queue(3)

    # 在进程池中的进程间进行通信
    # 使用线程池同步的方式,先写后读
    # pool.apply(write_data, (queue, ))
    # pool.apply(read_data, (queue, ))

    # apply_async() 返回ApplyResult 对象
    result = pool.apply_async(write_data, (queue, ))
    # ApplyResult对象的wait() 方法,表示后续进程必须等待当前进程执行完再继续
    result.wait()
    pool.apply_async(read_data, (queue, ))

    pool.close()
    # 异步后,主线程不再等待子进程执行结束,再结束
    # join() 后,表示主线程会等待子进程执行结束后,再结束
    pool.join()

运行结果:

正在写入: 0
正在写入: 1
正在写入: 2
队列已满!~
正在获取 0
正在获取 1
正在获取 2
队列已空!~

Process finished with exit code 0

非阻塞式异步执行

"""
任务1:每隔1s将数据写入queue
任务2:不断地从消息队列取数据

主进程:
    创建进程池专用消息队列
    创建进程池
    使用进程池指派任务

"""
import multiprocessing
import time

# 任务1:每隔1s将数据写入 queue
def write_queue(queue):
    for i in range(5):
        print("写入queue数据:",i)
        queue.put(i)
        time.sleep(1)


# 任务2:不断地从消息队列取数据
def read_queue(queue):
    while True:
        try:
            value = queue.get(timeout=3)
            print("读取到queue的值:",value)
        except:
            print("超过3秒没获取到数据,停止循环")
            break


# 主进程:
if __name__ == '__main__':
    # 创建进程池专用消息队列
    queue = multiprocessing.Manager().Queue(3)

    # 创建进程池
    pool = multiprocessing.Pool(2)

    # 使用进程池指派任务
    # pool.apply(func=write_queue, args=(queue,))
    # pool.apply(func=read_queue, args=(queue,))

    pool.apply_async(func=write_queue, args=(queue,))
    pool.apply_async(func=read_queue, args=(queue,))

    # 关闭pool
    pool.close()

    pool.join()
  • 输出:
写入queue数据 0
读取到queue的值 0
写入queue数据 1
读取到queue的值 1
写入queue数据 2
读取到queue的值 2
写入queue数据 3
读取到queue的值 3
写入queue数据 4
读取到queue的值 4
超过3秒没获取到数据停止循环