生产者-消费者模式与阻塞队列
时间:2026/04/09
关键词:mutex、condition_variable、阻塞队列、bounded queue、shutdown、spurious wakeup
核心目标:写出一个正确、可复用的生产者-消费者队列,而不是“能跑但容易死锁或卡住”的版本。
1. 这个模式在解决什么问题
生产者-消费者模式适用于:
- 生产方产生任务或消息
- 消费方异步处理
- 双方速度不一致
典型场景:
它的核心不只是“一个队列”,而是三件事:
2. 最小阻塞队列骨架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #include <condition_variable> #include <mutex> #include <queue>
template <class T> class BlockingQueue { public: void push(T value) { { std::lock_guard<std::mutex> lk(mutex_); queue_.push(std::move(value)); } cv_.notify_one(); }
T pop() { std::unique_lock<std::mutex> lk(mutex_); cv_.wait(lk, [&] { return !queue_.empty(); }); T value = std::move(queue_.front()); queue_.pop(); return value; }
private: std::mutex mutex_; std::condition_variable cv_; std::queue<T> queue_; };
|
3. 为什么一定要用谓词版 wait
错误直觉是:
然后醒来就认为一定有数据。
这是不安全的,因为存在:
正确写法:
1
| cv.wait(lock, [&] { return !queue_.empty(); });
|
也就是:
4. bounded queue:为什么需要容量上限
如果生产速度远大于消费速度,无界队列会不断膨胀。
这时常需要有界队列:
1 2 3
| while (queue_.size() >= capacity_) { not_full_.wait(lock); }
|
这样可以建立:
5. 一个更完整的阻塞队列设计
更工程化的队列通常需要这些接口:
push
try_push
pop
try_pop
close
close() 很关键,因为消费者可能永远在等:
1 2 3
| if (closed_ && queue_.empty()) { return std::nullopt; }
|
否则程序退出时很容易卡死在线程等待上。
6. 推荐的关闭语义
常见设计是:
- 关闭后不允许再
push
- 还能把队列里剩余任务消费完
- 队列空且关闭时,
pop 返回“结束”
示意:
1 2 3 4 5 6
| std::optional<T> pop() { std::unique_lock<std::mutex> lk(mutex_); cv_.wait(lk, [&] { return closed_ || !queue_.empty(); }); if (queue_.empty()) return std::nullopt; ... }
|
这比靠塞一个 "EXIT" 哨兵值更通用。
7. 什么时候用 notify_one,什么时候用 notify_all
经验上:
- 普通入队,通常
notify_one
- 全局状态变化,比如
close(),通常 notify_all
因为关闭时可能有多个线程都在等待,需要全部唤醒重新判断。
8. 一个更稳妥的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #include <condition_variable> #include <mutex> #include <optional> #include <queue>
template <class T> class BlockingQueue { public: explicit BlockingQueue(std::size_t capacity) : capacity_(capacity) {}
bool push(T value) { std::unique_lock<std::mutex> lk(mutex_); not_full_.wait(lk, [&] { return closed_ || queue_.size() < capacity_; }); if (closed_) return false; queue_.push(std::move(value)); lk.unlock(); not_empty_.notify_one(); return true; }
std::optional<T> pop() { std::unique_lock<std::mutex> lk(mutex_); not_empty_.wait(lk, [&] { return closed_ || !queue_.empty(); }); if (queue_.empty()) return std::nullopt; T value = std::move(queue_.front()); queue_.pop(); lk.unlock(); not_full_.notify_one(); return value; }
void close() { std::lock_guard<std::mutex> lk(mutex_); closed_ = true; not_empty_.notify_all(); not_full_.notify_all(); }
private: std::size_t capacity_; std::queue<T> queue_; bool closed_ = false; std::mutex mutex_; std::condition_variable not_empty_; std::condition_variable not_full_; };
|
9. 常见坑
9.1 if 代替 while/谓词
这是最常见的条件变量错误。
9.2 持锁太久
如果拿着锁做重计算或 I/O,会严重拖慢并发吞吐。
9.3 没有关闭语义
线程可能永远阻塞退出不了。
9.4 用哨兵值替代通用关闭协议
对简单 demo 可以,但扩展性差。
10. 一页总结
生产者-消费者模式的核心不是“有个队列”,而是:
- 用互斥保护共享队列
- 用条件变量等待状态变化
- 用关闭协议管理线程退出
- 必要时用容量上限建立背压
如果只记一句:
条件变量永远和“共享状态 + 谓词检查”一起使用,不能只靠通知本身。