消息队列用于实现多个进程之间的信息共享
最先放进队列的信息被最先取出,最后放进队列的被最后取出【类似羽毛球筒】。
使用消息队列
继承multiprocessing.Queue()
来创建一个消息队列,最重要的参数是maxsize=
默认为0,设置了队列的最大长度,放入多于maxsize的数据会阻塞,直到有值从队列中取出。
- 放值(向队列头放)
- 使用
.put()
向队列里面加入元素,若队列已满,则会阻塞,直到有值从队列中取出。 - 使用
.put_nowait()
立即向队列里加入元素,若已满,则报错。
- 使用
- 取值(从队列尾取)
- 只用
get()
从队列中取出一个元素,若队列已空,则会阻塞,直到有值放入队列。 - 使用
get_nowait()
立即从队列中取出一个元素,若队列已空,则报错。
- 只用
常见判断[非准确值]
- 使用
.full()
判断队列是否已满 - 使用
.empty()
判断队列是否已空 - 使用
.qsize()
得到队列的当前长度
值明明已经放进去了,但.empty()
的返回值还是True。
sleep()一下就又正确了。
所以不要在取值或放值后立刻进行判断
实现进程间的通信
一个进程进行写,另一个进程读数据。
进程池中的进程通信
在已有进程池的情况下使用multiprocessing.Manager().Queue()
在进程池内部创建消息队列。
# 1、准备两个进程
# 2、准备一个队列 一个进程向队列中写入数据,然后吧队列传递到另外一个进程
# 3、另外一个进程读取数据
import time
import multiprocessing
# 1、写入数据到队列的函数
def write_queue(queue):
# for循环,向队列写入数据
for i in range(10):
# 判断队列是否已满
if queue.full():
print("队列已满!")
break
# 向队列中放入值
queue.put(i)
print("写入成功,已经写入:",i)
time.sleep(0.5)
# 2、读取队列数据并显示的函数
def read_queue(queue):
while True:
# 判断队列是否已经为空
if queue.qsize() == 0:
print("队列已空!")
break
# 从队列中读取数据
value = queue.get()
print("已经读取:", value)
if __name__ == '__main__':
# 1、创建进程池
pool = multiprocessing.Pool(2)
# 2、创建进程池中的队列
queue = multiprocessing.Manager().Queue(5)
# 3、使用进程池执行任务
# 3.1 同步方式
# pool.apply(write_queue, (queue, ))
# pool.apply(read_queue, (queue, ))
# 3.2 异步方式
# apply_async() 返回值 ApplyResult对象,该对象有一个 wait() 的方法
result = pool.apply_async(write_queue, (queue, ))
# wait() 方法类似join() 表示先让当前进程执行完毕,后续进程才能启动
result.wait()
pool.apply_async(read_queue, (queue, ))
# close()表示不再接收新的任务
pool.close()
# 主进程会等待进程池执行结束后再退出
pool.join()
记得向进程池的apply()
或apply_async()
中传入要使用的消息队列参数!