生产者-消费者模式与阻塞队列

生产者-消费者模式与阻塞队列

时间:2026/04/09

关键词:mutexcondition_variable、阻塞队列、bounded queue、shutdown、spurious wakeup
核心目标:写出一个正确、可复用的生产者-消费者队列,而不是“能跑但容易死锁或卡住”的版本。


1. 这个模式在解决什么问题

生产者-消费者模式适用于:

  • 生产方产生任务或消息
  • 消费方异步处理
  • 双方速度不一致

典型场景:

  • 日志队列
  • 任务队列
  • 网络消息分发
  • 线程池任务提交

它的核心不只是“一个队列”,而是三件事:

  • 互斥
  • 条件通知
  • 生命周期关闭

2. 最小阻塞队列骨架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <condition_variable>
#include <mutex>
#include <queue>

template <class T>
class BlockingQueue {
public:
void push(T value) {
{
std::lock_guard<std::mutex> lk(mutex_);
queue_.push(std::move(value));
}
cv_.notify_one();
}

T pop() {
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait(lk, [&] { return !queue_.empty(); });
T value = std::move(queue_.front());
queue_.pop();
return value;
}

private:
std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
};

3. 为什么一定要用谓词版 wait

错误直觉是:

1
cv.wait(lock);

然后醒来就认为一定有数据。
这是不安全的,因为存在:

  • 虚假唤醒
  • 多个线程竞争同一个条件

正确写法:

1
cv.wait(lock, [&] { return !queue_.empty(); });

也就是:

  • 醒来后重新检查条件

4. bounded queue:为什么需要容量上限

如果生产速度远大于消费速度,无界队列会不断膨胀。
这时常需要有界队列:

  • 队列满时,生产者阻塞或失败
1
2
3
while (queue_.size() >= capacity_) {
not_full_.wait(lock);
}

这样可以建立:

  • 背压
  • 内存上限

5. 一个更完整的阻塞队列设计

更工程化的队列通常需要这些接口:

  • push
  • try_push
  • pop
  • try_pop
  • close

close() 很关键,因为消费者可能永远在等:

1
2
3
if (closed_ && queue_.empty()) {
return std::nullopt;
}

否则程序退出时很容易卡死在线程等待上。


6. 推荐的关闭语义

常见设计是:

  • 关闭后不允许再 push
  • 还能把队列里剩余任务消费完
  • 队列空且关闭时,pop 返回“结束”

示意:

1
2
3
4
5
6
std::optional<T> pop() {
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait(lk, [&] { return closed_ || !queue_.empty(); });
if (queue_.empty()) return std::nullopt;
...
}

这比靠塞一个 "EXIT" 哨兵值更通用。


7. 什么时候用 notify_one,什么时候用 notify_all

经验上:

  • 普通入队,通常 notify_one
  • 全局状态变化,比如 close(),通常 notify_all

因为关闭时可能有多个线程都在等待,需要全部唤醒重新判断。


8. 一个更稳妥的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>

template <class T>
class BlockingQueue {
public:
explicit BlockingQueue(std::size_t capacity) : capacity_(capacity) {}

bool push(T value) {
std::unique_lock<std::mutex> lk(mutex_);
not_full_.wait(lk, [&] { return closed_ || queue_.size() < capacity_; });
if (closed_) return false;
queue_.push(std::move(value));
lk.unlock();
not_empty_.notify_one();
return true;
}

std::optional<T> pop() {
std::unique_lock<std::mutex> lk(mutex_);
not_empty_.wait(lk, [&] { return closed_ || !queue_.empty(); });
if (queue_.empty()) return std::nullopt;
T value = std::move(queue_.front());
queue_.pop();
lk.unlock();
not_full_.notify_one();
return value;
}

void close() {
std::lock_guard<std::mutex> lk(mutex_);
closed_ = true;
not_empty_.notify_all();
not_full_.notify_all();
}

private:
std::size_t capacity_;
std::queue<T> queue_;
bool closed_ = false;
std::mutex mutex_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
};

9. 阻塞队列如何安全扩容

这里的“扩容”通常指:

有界队列原来最多只能放 capacity_ 个元素,现在允许它放更多元素。

扩容本身不需要搬迁队列数据,因为 std::queue 会自己管理内部存储。我们真正要保护的是:

  • capacity_ 的读写
  • 正在等待 not_full_ 的生产者
  • 队列的关闭状态

9.1 最小扩容接口

可以给上面的 BlockingQueue 加一个 reserve_capacity()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool reserve_capacity(std::size_t new_capacity) {
std::unique_lock<std::mutex> lk(mutex_);

if (closed_) {
return false;
}

if (new_capacity <= capacity_) {
return true;
}

capacity_ = new_capacity;
lk.unlock();
not_full_.notify_all();
return true;
}

这里的关键点是:

  • 修改 capacity_ 必须持有同一把 mutex_
  • 扩容后要通知等待中的生产者
  • 队列已经 close() 后,不再允许扩容

为什么用 notify_all()

因为扩容可能一次释放多个可写入位置,多个生产者都可能从“队列满”变成“可以写入”。

9.2 按增量扩容

有时也会写成“增加多少容量”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool grow_capacity(std::size_t extra) {
std::unique_lock<std::mutex> lk(mutex_);

if (closed_) {
return false;
}

if (extra == 0) {
return true;
}

capacity_ += extra;
lk.unlock();
not_full_.notify_all();
return true;
}

工程里还要考虑:

  • 最大容量上限
  • capacity_ + extra 是否溢出
  • 是否允许扩容太频繁
  • 扩容后内存压力是否可接受

否则有界队列可能又退化成“无界队列”。

9.3 自动扩容的思路

如果希望 push() 遇到满队列时自动扩容,可以把逻辑写在同一把锁里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool push(T value) {
std::unique_lock<std::mutex> lk(mutex_);
bool expanded = false;

if (!closed_ &&
queue_.size() >= capacity_ &&
capacity_ < max_capacity_) {
capacity_ = std::min(capacity_ * 2, max_capacity_);
expanded = true;
}

not_full_.wait(lk, [&] {
return closed_ || queue_.size() < capacity_;
});

if (closed_) return false;

queue_.push(std::move(value));
lk.unlock();
not_empty_.notify_one();
if (expanded) {
not_full_.notify_all();
}
return true;
}

对应成员变量可以是:

1
2
std::size_t capacity_;
std::size_t max_capacity_;

如果用 std::min,记得包含:

1
#include <algorithm>

这个版本的意思是:

  • 队列满时,先尝试扩到更大的容量
  • 如果已经到最大容量,就继续阻塞等待
  • 所有判断和容量更新都在锁内完成
  • 扩容后唤醒其他可能还在等待的生产者

注意不要在持锁的 push() 里再调用一个内部也会加锁的 grow_capacity(),否则很容易把自己锁死。

9.4 缩容要更谨慎

扩容比较简单,因为它只会让等待生产者更容易继续。

缩容更麻烦,因为可能出现:

  • 当前元素数量已经超过新容量
  • 生产者和消费者对“满”的判断突然变化
  • 缩容策略和关闭流程互相影响

一个保守策略是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool shrink_capacity(std::size_t new_capacity) {
std::lock_guard<std::mutex> lk(mutex_);

if (closed_) {
return false;
}

if (new_capacity < queue_.size()) {
return false;
}

capacity_ = new_capacity;
return true;
}

也就是说:

不把容量缩到当前队列大小以下。

这样可以避免队列瞬间进入一种“已经超载但无法解释”的状态。


10. 常见坑

10.1 if 代替 while/谓词

这是最常见的条件变量错误。

10.2 持锁太久

如果拿着锁做重计算或 I/O,会严重拖慢并发吞吐。

10.3 没有关闭语义

线程可能永远阻塞退出不了。

10.4 用哨兵值替代通用关闭协议

对简单 demo 可以,但扩展性差。


11. 一页总结

生产者-消费者模式的核心不是“有个队列”,而是:

  1. 用互斥保护共享队列
  2. 用条件变量等待状态变化
  3. 用关闭协议管理线程退出
  4. 必要时用容量上限建立背压
  5. 动态扩容时,要在同一把锁内修改容量并唤醒等待生产者

如果只记一句:

条件变量永远和“共享状态 + 谓词检查”一起使用,不能只靠通知本身。