线程同步消息队列与线程池

线程同步消息队列与线程池

时间:2026/04/09

关键词:任务队列、worker thread、future、停止协议、背压、线程池
核心目标:理解线程池为什么几乎总是“队列 + 工作线程 + 生命周期管理”的组合。


1. 为什么线程池比“每个任务一个线程”更常见

直接为每个任务创建线程的问题在于:

  • 创建销毁开销高
  • 线程数不可控
  • 容易把系统调度器压爆

线程池的思路是:

  • 预先创建固定数量 worker
  • 任务进入共享队列
  • worker 从队列取任务执行

2. 线程池最小结构

一个线程池通常包含:

  • 任务队列
  • 多个工作线程
  • 停止标志
  • 提交接口

示意:

1
producer -> task queue -> workers

3. 推荐的任务表示

最常见的是:

1
std::function<void()>

这样线程池不关心任务具体类型,只负责执行。

如果要返回值,可以把真实任务包装进:

  • std::packaged_task
  • std::promise
  • std::future

4. 一个最小线程池骨架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

class ThreadPool {
public:
explicit ThreadPool(std::size_t n) {
for (std::size_t i = 0; i < n; ++i) {
workers_.emplace_back([this] { worker_loop(); });
}
}

~ThreadPool() {
{
std::lock_guard<std::mutex> lk(mutex_);
stop_ = true;
}
cv_.notify_all();
for (auto& t : workers_) {
if (t.joinable()) t.join();
}
}

void submit(std::function<void()> task) {
{
std::lock_guard<std::mutex> lk(mutex_);
tasks_.push(std::move(task));
}
cv_.notify_one();
}

private:
void worker_loop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait(lk, [&] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
}

bool stop_ = false;
std::mutex mutex_;
std::condition_variable cv_;
std::queue<std::function<void()>> tasks_;
std::vector<std::thread> workers_;
};

5. 为什么停止协议很重要

如果没有明确的停止逻辑,线程池很容易在析构时:

  • worker 永远等在 wait
  • 主线程 join 不回来

正确退出条件通常是:

  • stop_ == true
  • 并且队列已空

6. 返回值怎么做

常见写法是:

  • 把用户任务包装成 packaged_task
  • 返回对应 future

这样提交方既能异步执行,也能之后 get() 结果。

线程池的接口常见长这样:

1
2
template <class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<...>;

这也是完美转发的高频实战场景。


7. 有界任务队列与背压

如果任务生产速度远大于消费速度,线程池也可能把内存吃爆。
所以工程上经常要考虑:

  • 队列容量上限
  • 超限后阻塞
  • 超限后丢弃
  • 超限后降级

这其实就是背压策略。


8. 线程池不是越多线程越好

线程数通常取决于:

  • CPU 核心数
  • 任务是否 CPU 密集
  • 任务是否经常阻塞 I/O

经验上:

  • CPU 密集型:线程数通常接近核心数
  • I/O 密集型:线程数可适当更大

9. 线程池如何安全扩容

线程池扩容的本质是:

在不破坏任务队列、不影响已有 worker、不和析构/停止流程打架的前提下,增加新的 worker 线程。

只“扩容”通常比“缩容”简单,因为扩容不需要强行打断已有线程,只需要让更多线程开始消费同一个任务队列。

9.1 安全扩容要守住的几个点

  1. 扩容时要和停止状态互斥
  2. 新 worker 必须复用同一套 worker_loop
  3. 不要在任务执行期间持有队列锁
  4. 不允许线程池已经停止后继续扩容
  5. 如果有最大线程数,要在锁内检查和更新

最核心的判断是:

1
2
3
if (stop_) {
throw std::runtime_error("thread pool already stopped");
}

否则可能出现这种危险情况:

  1. 析构线程设置 stop_ = true
  2. 另一个线程又新增 worker
  3. 析构只 join 了旧线程或生命周期已经混乱

9.2 一个简单的扩容接口

可以给线程池加一个 add_workers()

1
2
3
4
5
6
7
8
9
10
11
void add_workers(std::size_t count) {
std::lock_guard<std::mutex> lk(mutex_);

if (stop_) {
throw std::runtime_error("thread pool already stopped");
}

for (std::size_t i = 0; i < count; ++i) {
workers_.emplace_back([this] { worker_loop(); });
}
}

这段代码的关键点是:

  • stop_workers_ 的修改放在同一把锁保护下
  • 新线程执行的还是原来的 worker_loop()
  • 新 worker 会自动从同一个 tasks_ 队列里抢任务

不过这只是最小写法。正式项目里通常还会加:

  • max_workers_
  • 当前线程数统计
  • 扩容失败处理
  • 线程池生命周期约束

9.3 带最大线程数的版本

更工程化一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void add_workers(std::size_t count) {
std::lock_guard<std::mutex> lk(mutex_);

if (stop_) {
throw std::runtime_error("thread pool already stopped");
}

const std::size_t current = workers_.size();
const std::size_t allowed = max_workers_ > current
? max_workers_ - current
: 0;

const std::size_t actual = std::min(count, allowed);

for (std::size_t i = 0; i < actual; ++i) {
workers_.emplace_back([this] { worker_loop(); });
}
}

对应成员变量:

1
std::size_t max_workers_ = std::thread::hardware_concurrency() * 2;

这里的重点不是公式,而是:

扩容不能无限扩,否则线程池会退化成“每个任务都创建线程”的混乱状态。

9.4 扩容后需要 notify_all()

通常不一定需要。

因为新 worker 创建后会进入 worker_loop(),它自己会检查队列:

1
cv_.wait(lk, [&] { return stop_ || !tasks_.empty(); });

如果队列里已经有任务,谓词为真,新 worker 不会一直睡着。

但如果你的实现不是带谓词的 wait,或者扩容逻辑还改变了其他调度状态,就要重新检查通知逻辑。

9.5 自动扩容的常见触发条件

如果做成动态线程池,常见策略是:

  • 队列积压超过阈值
  • 当前线程数小于最大线程数
  • 最近一段时间任务消费速度跟不上提交速度
  • 任务是 I/O 密集型,worker 经常阻塞

伪代码:

1
2
3
4
if (tasks_.size() > high_watermark &&
workers_.size() < max_workers_) {
add_workers(1);
}

注意这个判断必须在锁保护下完成,避免多个提交线程同时发现“需要扩容”,然后一起扩太多。

9.6 缩容比扩容更麻烦

扩容是“增加消费者”,一般比较安全。

缩容是“让某些 worker 退出”,要设计额外协议,例如:

  • 空闲超时退出
  • 投递特殊退出任务
  • 设置目标线程数,让多余 worker 在空闲时自然退出

不要粗暴强杀线程。C++ 标准线程没有安全的强制 kill 机制,强行终止线程很容易破坏锁、资源和对象状态。


10. 消息队列 vs 线程池

这两个概念经常一起出现,但不完全一样。

  • 消息队列:强调数据传递与同步
  • 线程池:强调任务执行与线程复用

线程池内部几乎总会用到任务队列,但消息队列本身不一定等于线程池。


11. 常见坑

11.1 任务里抛异常没人管

如果没有 future 或显式捕获,异常可能直接导致线程终止。

11.2 析构时仍允许提交任务

这会让生命周期变得混乱。

11.3 持锁执行任务

这是严重错误。
正确做法是:

  • 取出任务后释放锁
  • 再执行任务

11.4 线程池里再无限提交内部任务

这可能制造级联膨胀和死锁风险。


12. 一页总结

线程池最关键的不是模板技巧,而是三个工程点:

  1. 任务队列
  2. worker 生命周期
  3. 明确的停止与背压策略

如果只记一句:

线程池本质上是“用受控线程数去消费一个受控任务流”。