
Python 中实现生产者消费者模型,核心是解决多线程(或多进程)间的数据协同问题:生产者生成数据放入缓冲区,消费者从缓冲区取出处理。最稳妥、推荐的方式是使用 queue.Queue —— 它是线程安全的,内置锁和条件变量,无需手动同步。
用 queue.Queue 实现线程版生产者消费者
queue.Queue 天然支持阻塞式读写(如 get() 和 put()),可自动处理“空时等待取”和“满时等待放”,非常适合该模型。
- 创建一个固定容量的队列:
q = queue.Queue(maxsize=5) - 生产者线程调用
q.put(item),若队列已满则阻塞,直到有空位 - 消费者线程调用
q.get(),若队列为空则阻塞,直到有新数据 - 每次
get()后必须调用q.task_done();所有任务提交后调用q.join()可等待全部消费完成
完整可运行示例
以下是一个含 2 个生产者、3 个消费者、共处理 10 个任务的小型演示:
import threading
import queue
import time
import random
<p>q = queue.Queue(maxsize=3)</p><p><span>立即学习</span>“<a href="https://pan.quark.cn/s/00968c3c2c15" style="text-decoration: underline !important; color: blue; font-weight: bolder;" rel="nofollow" target="_blank">Python免费学习笔记(深入)</a>”;</p><div class="aritcle_card flexRow">
<div class="artcardd flexRow">
<a class="aritcle_card_img" href="/ai/2393" title="LibLibAI"><img
src="https://img.php.cn/upload/ai_manual/001/246/273/176352304574139.png" alt="LibLibAI" onerror="this.onerror='';this.src='/static/lhimages/moren/morentu.png'" ></a>
<div class="aritcle_card_info flexColumn">
<a href="/ai/2393" title="LibLibAI">LibLibAI</a>
<p>国内领先的AI创意平台,以海量模型、低门槛操作与“创作-分享-商业化”生态,让小白与专业创作者都能高效实现图文乃至视频创意表达。</p>
</div>
<a href="/ai/2393" title="LibLibAI" class="aritcle_card_btn flexRow flexcenter"><b></b><span>下载</span> </a>
</div>
</div><p>def producer(name):
for i in range(5):
item = f"item-{name}-{i}"
q.put(item) # 自动阻塞
print(f"[P{name}] produced {item}")
time.sleep(random.uniform(0.1, 0.5))</p><p>def consumer(name):
while True:
try:
item = q.get(timeout=2) # 2秒超时,避免永久阻塞
print(f"[C{name}] consumed {item}")
time.sleep(random.uniform(0.2, 0.6))
q.task_done()
except queue.Empty:
print(f"[C{name}] timeout, exiting...")
break</p><h1>启动线程</h1><p>threads = []
for i in range(2):
t = threading.Thread(target=producer, args=(i,))
threads.append(t)
t.start()</p><p>for i in range(3):
t = threading.Thread(target=consumer, args=(i,))
threads.append(t)
t.start()</p><h1>等待所有生产者结束</h1><p>for t in threads[:2]:
t.join()</p><h1>等待队列中所有任务被消费完</h1><p>q.join()</p><p>print("All done.")</p>进阶:协程版(asyncio + asyncio.Queue)
若需高并发 I/O 密集型场景(如网络请求、文件读写),可用 asyncio.Queue 配合协程,资源开销更低:
-
await q.put(item)和await q.get()是异步阻塞操作 - 同样支持
maxsize、task_done()和join() - 注意:不能混用线程与协程原语;所有逻辑需在同一个事件循环中运行
不推荐的手动同步方式
虽然可用 threading.Lock + threading.Condition 手动实现队列,但容易出错(如忘记 notify、死锁、虚假唤醒)。除非学习底层原理或有特殊定制需求,否则没必要重复造轮子。









