生产者-消费者模式与阻塞队列
时间:2026/04/09
关键词:mutex、condition_variable、阻塞队列、bounded queue、shutdown、spurious wakeup
核心目标:写出一个正确、可复用的生产者-消费者队列,而不是“能跑但容易死锁或卡住”的版本。
1. 这个模式在解决什么问题
生产者-消费者模式适用于:
- 生产方产生任务或消息
- 消费方异步处理
- 双方速度不一致
典型场景:
它的核心不只是“一个队列”,而是三件事:
2. 最小阻塞队列骨架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #include <condition_variable> #include <mutex> #include <queue>
template <class T> class BlockingQueue { public: void push(T value) { { std::lock_guard<std::mutex> lk(mutex_); queue_.push(std::move(value)); } cv_.notify_one(); }
T pop() { std::unique_lock<std::mutex> lk(mutex_); cv_.wait(lk, [&] { return !queue_.empty(); }); T value = std::move(queue_.front()); queue_.pop(); return value; }
private: std::mutex mutex_; std::condition_variable cv_; std::queue<T> queue_; };
|
3. 为什么一定要用谓词版 wait
错误直觉是:
然后醒来就认为一定有数据。
这是不安全的,因为存在:
正确写法:
1
| cv.wait(lock, [&] { return !queue_.empty(); });
|
也就是:
4. bounded queue:为什么需要容量上限
如果生产速度远大于消费速度,无界队列会不断膨胀。
这时常需要有界队列:
1 2 3
| while (queue_.size() >= capacity_) { not_full_.wait(lock); }
|
这样可以建立:
5. 一个更完整的阻塞队列设计
更工程化的队列通常需要这些接口:
push
try_push
pop
try_pop
close
close() 很关键,因为消费者可能永远在等:
1 2 3
| if (closed_ && queue_.empty()) { return std::nullopt; }
|
否则程序退出时很容易卡死在线程等待上。
6. 推荐的关闭语义
常见设计是:
- 关闭后不允许再
push
- 还能把队列里剩余任务消费完
- 队列空且关闭时,
pop 返回“结束”
示意:
1 2 3 4 5 6
| std::optional<T> pop() { std::unique_lock<std::mutex> lk(mutex_); cv_.wait(lk, [&] { return closed_ || !queue_.empty(); }); if (queue_.empty()) return std::nullopt; ... }
|
这比靠塞一个 "EXIT" 哨兵值更通用。
7. 什么时候用 notify_one,什么时候用 notify_all
经验上:
- 普通入队,通常
notify_one
- 全局状态变化,比如
close(),通常 notify_all
因为关闭时可能有多个线程都在等待,需要全部唤醒重新判断。
8. 一个更稳妥的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #include <condition_variable> #include <mutex> #include <optional> #include <queue>
template <class T> class BlockingQueue { public: explicit BlockingQueue(std::size_t capacity) : capacity_(capacity) {}
bool push(T value) { std::unique_lock<std::mutex> lk(mutex_); not_full_.wait(lk, [&] { return closed_ || queue_.size() < capacity_; }); if (closed_) return false; queue_.push(std::move(value)); lk.unlock(); not_empty_.notify_one(); return true; }
std::optional<T> pop() { std::unique_lock<std::mutex> lk(mutex_); not_empty_.wait(lk, [&] { return closed_ || !queue_.empty(); }); if (queue_.empty()) return std::nullopt; T value = std::move(queue_.front()); queue_.pop(); lk.unlock(); not_full_.notify_one(); return value; }
void close() { std::lock_guard<std::mutex> lk(mutex_); closed_ = true; not_empty_.notify_all(); not_full_.notify_all(); }
private: std::size_t capacity_; std::queue<T> queue_; bool closed_ = false; std::mutex mutex_; std::condition_variable not_empty_; std::condition_variable not_full_; };
|
9. 阻塞队列如何安全扩容
这里的“扩容”通常指:
有界队列原来最多只能放 capacity_ 个元素,现在允许它放更多元素。
扩容本身不需要搬迁队列数据,因为 std::queue 会自己管理内部存储。我们真正要保护的是:
capacity_ 的读写
- 正在等待
not_full_ 的生产者
- 队列的关闭状态
9.1 最小扩容接口
可以给上面的 BlockingQueue 加一个 reserve_capacity():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| bool reserve_capacity(std::size_t new_capacity) { std::unique_lock<std::mutex> lk(mutex_);
if (closed_) { return false; }
if (new_capacity <= capacity_) { return true; }
capacity_ = new_capacity; lk.unlock(); not_full_.notify_all(); return true; }
|
这里的关键点是:
- 修改
capacity_ 必须持有同一把 mutex_
- 扩容后要通知等待中的生产者
- 队列已经
close() 后,不再允许扩容
为什么用 notify_all()?
因为扩容可能一次释放多个可写入位置,多个生产者都可能从“队列满”变成“可以写入”。
9.2 按增量扩容
有时也会写成“增加多少容量”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| bool grow_capacity(std::size_t extra) { std::unique_lock<std::mutex> lk(mutex_);
if (closed_) { return false; }
if (extra == 0) { return true; }
capacity_ += extra; lk.unlock(); not_full_.notify_all(); return true; }
|
工程里还要考虑:
- 最大容量上限
capacity_ + extra 是否溢出
- 是否允许扩容太频繁
- 扩容后内存压力是否可接受
否则有界队列可能又退化成“无界队列”。
9.3 自动扩容的思路
如果希望 push() 遇到满队列时自动扩容,可以把逻辑写在同一把锁里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| bool push(T value) { std::unique_lock<std::mutex> lk(mutex_); bool expanded = false;
if (!closed_ && queue_.size() >= capacity_ && capacity_ < max_capacity_) { capacity_ = std::min(capacity_ * 2, max_capacity_); expanded = true; }
not_full_.wait(lk, [&] { return closed_ || queue_.size() < capacity_; });
if (closed_) return false;
queue_.push(std::move(value)); lk.unlock(); not_empty_.notify_one(); if (expanded) { not_full_.notify_all(); } return true; }
|
对应成员变量可以是:
1 2
| std::size_t capacity_; std::size_t max_capacity_;
|
如果用 std::min,记得包含:
这个版本的意思是:
- 队列满时,先尝试扩到更大的容量
- 如果已经到最大容量,就继续阻塞等待
- 所有判断和容量更新都在锁内完成
- 扩容后唤醒其他可能还在等待的生产者
注意不要在持锁的 push() 里再调用一个内部也会加锁的 grow_capacity(),否则很容易把自己锁死。
9.4 缩容要更谨慎
扩容比较简单,因为它只会让等待生产者更容易继续。
缩容更麻烦,因为可能出现:
- 当前元素数量已经超过新容量
- 生产者和消费者对“满”的判断突然变化
- 缩容策略和关闭流程互相影响
一个保守策略是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| bool shrink_capacity(std::size_t new_capacity) { std::lock_guard<std::mutex> lk(mutex_);
if (closed_) { return false; }
if (new_capacity < queue_.size()) { return false; }
capacity_ = new_capacity; return true; }
|
也就是说:
不把容量缩到当前队列大小以下。
这样可以避免队列瞬间进入一种“已经超载但无法解释”的状态。
10. 常见坑
10.1 if 代替 while/谓词
这是最常见的条件变量错误。
10.2 持锁太久
如果拿着锁做重计算或 I/O,会严重拖慢并发吞吐。
10.3 没有关闭语义
线程可能永远阻塞退出不了。
10.4 用哨兵值替代通用关闭协议
对简单 demo 可以,但扩展性差。
11. 一页总结
生产者-消费者模式的核心不是“有个队列”,而是:
- 用互斥保护共享队列
- 用条件变量等待状态变化
- 用关闭协议管理线程退出
- 必要时用容量上限建立背压
- 动态扩容时,要在同一把锁内修改容量并唤醒等待生产者
如果只记一句:
条件变量永远和“共享状态 + 谓词检查”一起使用,不能只靠通知本身。