MegaMU个人站

欢迎光临

落日黄沙 白帆秋水 你可知谁的记忆在时空里飞?


消息队列—多个进程之间的信息传递

目录

消息队列用于实现多个进程之间的信息共享

最先放进队列的信息被最先取出,最后放进队列的被最后取出【类似羽毛球筒】。

img

使用消息队列

继承multiprocessing.Queue()来创建一个消息队列,最重要的参数是maxsize=默认为0,设置了队列的最大长度,放入多于maxsize的数据会阻塞,直到有值从队列中取出。

img

  • 放值(向队列头放)
    • 使用.put()向队列里面加入元素,若队列已满,则会阻塞,直到有值从队列中取出。
    • 使用.put_nowait()立即向队列里加入元素,若已满,则报错。
  • 取值(从队列尾取)
    • 只用get()从队列中取出一个元素,若队列已空,则会阻塞,直到有值放入队列。
    • 使用get_nowait()立即从队列中取出一个元素,若队列已空,则报错。

常见判断[非准确值]

  • 使用.full()判断队列是否已满
  • 使用.empty()判断队列是否已空
  • 使用.qsize()得到队列的当前长度

img

值明明已经放进去了,但.empty()的返回值还是True。

img

sleep()一下就又正确了。

所以不要在取值或放值后立刻进行判断

实现进程间的通信

img

一个进程进行写,另一个进程读数据。

进程池中的进程通信

已有进程池的情况下使用multiprocessing.Manager().Queue()在进程池内部创建消息队列。

# 1、准备两个进程
# 2、准备一个队列 一个进程向队列中写入数据,然后吧队列传递到另外一个进程
# 3、另外一个进程读取数据

import time
import multiprocessing

# 1、写入数据到队列的函数
def write_queue(queue):
    # for循环,向队列写入数据
    for i in range(10):
        # 判断队列是否已满
        if queue.full():
            print("队列已满!")
            break
        # 向队列中放入值
        queue.put(i)
        print("写入成功,已经写入:",i)
        time.sleep(0.5)

# 2、读取队列数据并显示的函数
def read_queue(queue):
    while True:
        # 判断队列是否已经为空
        if queue.qsize() == 0:
            print("队列已空!")
            break

        # 从队列中读取数据
        value = queue.get()
        print("已经读取:", value)


if __name__ == '__main__':
    # 1、创建进程池
    pool = multiprocessing.Pool(2)

    # 2、创建进程池中的队列
    queue = multiprocessing.Manager().Queue(5)

    # 3、使用进程池执行任务
    #     3.1 同步方式
    # pool.apply(write_queue, (queue, ))
    # pool.apply(read_queue, (queue, ))
    #     3.2 异步方式
    # apply_async() 返回值 ApplyResult对象,该对象有一个 wait() 的方法
    result = pool.apply_async(write_queue, (queue, ))
    # wait() 方法类似join() 表示先让当前进程执行完毕,后续进程才能启动    
    result.wait()

    pool.apply_async(read_queue, (queue, ))
    # close()表示不再接收新的任务
    pool.close()
    # 主进程会等待进程池执行结束后再退出
    pool.join()

记得向进程池的apply()apply_async()中传入要使用的消息队列参数!

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦