生产者-消费者模式与阻塞队列

生产者-消费者模式与阻塞队列

时间:2026/04/09

关键词:mutexcondition_variable、阻塞队列、bounded queue、shutdown、spurious wakeup
核心目标:写出一个正确、可复用的生产者-消费者队列,而不是“能跑但容易死锁或卡住”的版本。


1. 这个模式在解决什么问题

生产者-消费者模式适用于:

  • 生产方产生任务或消息
  • 消费方异步处理
  • 双方速度不一致

典型场景:

  • 日志队列
  • 任务队列
  • 网络消息分发
  • 线程池任务提交

它的核心不只是“一个队列”,而是三件事:

  • 互斥
  • 条件通知
  • 生命周期关闭

2. 最小阻塞队列骨架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <condition_variable>
#include <mutex>
#include <queue>

template <class T>
class BlockingQueue {
public:
void push(T value) {
{
std::lock_guard<std::mutex> lk(mutex_);
queue_.push(std::move(value));
}
cv_.notify_one();
}

T pop() {
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait(lk, [&] { return !queue_.empty(); });
T value = std::move(queue_.front());
queue_.pop();
return value;
}

private:
std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
};

3. 为什么一定要用谓词版 wait

错误直觉是:

1
cv.wait(lock);

然后醒来就认为一定有数据。
这是不安全的,因为存在:

  • 虚假唤醒
  • 多个线程竞争同一个条件

正确写法:

1
cv.wait(lock, [&] { return !queue_.empty(); });

也就是:

  • 醒来后重新检查条件

4. bounded queue:为什么需要容量上限

如果生产速度远大于消费速度,无界队列会不断膨胀。
这时常需要有界队列:

  • 队列满时,生产者阻塞或失败
1
2
3
while (queue_.size() >= capacity_) {
not_full_.wait(lock);
}

这样可以建立:

  • 背压
  • 内存上限

5. 一个更完整的阻塞队列设计

更工程化的队列通常需要这些接口:

  • push
  • try_push
  • pop
  • try_pop
  • close

close() 很关键,因为消费者可能永远在等:

1
2
3
if (closed_ && queue_.empty()) {
return std::nullopt;
}

否则程序退出时很容易卡死在线程等待上。


6. 推荐的关闭语义

常见设计是:

  • 关闭后不允许再 push
  • 还能把队列里剩余任务消费完
  • 队列空且关闭时,pop 返回“结束”

示意:

1
2
3
4
5
6
std::optional<T> pop() {
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait(lk, [&] { return closed_ || !queue_.empty(); });
if (queue_.empty()) return std::nullopt;
...
}

这比靠塞一个 "EXIT" 哨兵值更通用。


7. 什么时候用 notify_one,什么时候用 notify_all

经验上:

  • 普通入队,通常 notify_one
  • 全局状态变化,比如 close(),通常 notify_all

因为关闭时可能有多个线程都在等待,需要全部唤醒重新判断。


8. 一个更稳妥的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>

template <class T>
class BlockingQueue {
public:
explicit BlockingQueue(std::size_t capacity) : capacity_(capacity) {}

bool push(T value) {
std::unique_lock<std::mutex> lk(mutex_);
not_full_.wait(lk, [&] { return closed_ || queue_.size() < capacity_; });
if (closed_) return false;
queue_.push(std::move(value));
lk.unlock();
not_empty_.notify_one();
return true;
}

std::optional<T> pop() {
std::unique_lock<std::mutex> lk(mutex_);
not_empty_.wait(lk, [&] { return closed_ || !queue_.empty(); });
if (queue_.empty()) return std::nullopt;
T value = std::move(queue_.front());
queue_.pop();
lk.unlock();
not_full_.notify_one();
return value;
}

void close() {
std::lock_guard<std::mutex> lk(mutex_);
closed_ = true;
not_empty_.notify_all();
not_full_.notify_all();
}

private:
std::size_t capacity_;
std::queue<T> queue_;
bool closed_ = false;
std::mutex mutex_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
};

9. 常见坑

9.1 if 代替 while/谓词

这是最常见的条件变量错误。

9.2 持锁太久

如果拿着锁做重计算或 I/O,会严重拖慢并发吞吐。

9.3 没有关闭语义

线程可能永远阻塞退出不了。

9.4 用哨兵值替代通用关闭协议

对简单 demo 可以,但扩展性差。


10. 一页总结

生产者-消费者模式的核心不是“有个队列”,而是:

  1. 用互斥保护共享队列
  2. 用条件变量等待状态变化
  3. 用关闭协议管理线程退出
  4. 必要时用容量上限建立背压

如果只记一句:

条件变量永远和“共享状态 + 谓词检查”一起使用,不能只靠通知本身。