C++20/23 并发工具

C++20/23 并发工具

时间:2026/05/08

关键词:jthread、stop_token、latch、barrier、semaphore、atomic wait、atomic_ref、shared_mutex、osyncstream
核心目标:在 C++11 基础线程、锁、条件变量之上,掌握现代标准库更安全、更直接的并发组件。


1. 为什么需要 C++20/23 并发工具

C++11 已经提供了:

  • std::thread
  • std::mutex
  • std::condition_variable
  • std::future
  • std::atomic

但工程里还会遇到一些不够顺手的问题:

  • std::thread 析构前忘记 join()std::terminate
  • 线程停止缺少统一取消协议
  • 一组线程需要等所有人到齐
  • 多个阶段反复同步
  • 想限制同时访问资源的线程数
  • 原子标志位变化前不想忙等
  • 多线程打印输出互相打乱

C++20/23 的新工具正好补这些空位。


2. std::jthread:自动 join 的线程

std::thread 的一个经典坑:

1
2
3
4
5
6
7
void f() {
std::thread t([] {
work();
});

// 如果这里忘了 join/detach,t 析构会 terminate
}

std::jthread 的析构函数会自动请求停止并 join:

1
2
3
4
5
6
7
8
#include <thread>
#include <iostream>

int main() {
std::jthread t([] {
std::cout << "work\n";
});
}

离开作用域时,t 会自动等待线程结束。
这和 RAII 的方向一致:资源生命周期交给对象管理。

适合:

  • 明确属于某个作用域的后台线程
  • 服务对象内部 worker
  • 测试代码中的临时线程

3. stop_token:协作式取消

jthread 可以把 std::stop_token 自动传给线程函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <thread>
#include <chrono>
#include <iostream>

int main() {
std::jthread worker([](std::stop_token st) {
while (!st.stop_requested()) {
std::cout << "tick\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "stopped\n";
});

std::this_thread::sleep_for(std::chrono::milliseconds(350));
worker.request_stop();
}

注意:

  • 停止请求不是强杀线程
  • 被取消的代码需要自己定期检查
  • 阻塞等待也要设计成可唤醒

这叫协作式取消。


4. stop_sourcestop_tokenstop_callback

三者关系:

  • stop_source:发起停止请求
  • stop_token:观察是否请求停止
  • stop_callback:停止请求发生时执行回调

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <stop_token>
#include <iostream>

int main() {
std::stop_source src;
std::stop_token tok = src.get_token();

std::stop_callback cb(tok, [] {
std::cout << "stop requested\n";
});

src.request_stop();
}

工程意义:

  • 可以把取消信号穿过多个模块
  • 不需要共享一个 atomic<bool>
  • 多个观察者可以响应同一次停止请求

5. condition_variable_any 与可取消等待

普通 condition_variable 等待时,如果没有通知,线程会一直睡。
使用 condition_variable_any 的 C++20 stop-token 版本,可以让等待响应取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <condition_variable>
#include <mutex>
#include <stop_token>
#include <thread>
#include <queue>

std::mutex m;
std::condition_variable_any cv;
std::queue<int> q;

void worker(std::stop_token st) {
std::unique_lock lk(m);

while (true) {
bool ready = cv.wait(lk, st, [] {
return !q.empty();
});

if (!ready) {
// stop requested
break;
}

int x = q.front();
q.pop();
lk.unlock();

process(x);

lk.lock();
}
}

这样线程既能等任务,也能被取消。


6. std::latch:一次性倒计时门闩

latch 适合“一组线程都完成某件事后,主线程继续”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <latch>
#include <thread>
#include <vector>
#include <iostream>

int main() {
constexpr int n = 4;
std::latch done(n);
std::vector<std::jthread> threads;

for (int i = 0; i < n; ++i) {
threads.emplace_back([&, i] {
work(i);
done.count_down();
});
}

done.wait();
std::cout << "all workers finished\n";
}

特点:

  • 计数只减少
  • 不能重复使用
  • 适合启动完成、初始化完成、一次性汇合

7. std::barrier:可重复阶段同步

barrier 适合多阶段迭代。

例如仿真:

1
2
3
4
5
阶段 1:每个线程计算局部更新
barrier
阶段 2:交换边界
barrier
阶段 3:进入下一轮

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <barrier>
#include <thread>
#include <vector>

int main() {
constexpr int n = 4;
std::barrier sync(n);
std::vector<std::jthread> threads;

for (int tid = 0; tid < n; ++tid) {
threads.emplace_back([&, tid] {
for (int step = 0; step < 100; ++step) {
compute_local(tid, step);
sync.arrive_and_wait();

exchange_boundary(tid, step);
sync.arrive_and_wait();
}
});
}
}

barrier 可以带完成函数:

1
2
3
std::barrier sync(n, [] {
finish_one_phase();
});

完成函数会在每一轮所有参与者到达后执行一次。


8. std::counting_semaphore:限制并发数量

信号量适合控制同时进入某个区域的线程数。

1
2
3
4
5
6
7
8
9
10
11
#include <semaphore>
#include <thread>
#include <vector>

std::counting_semaphore<8> slots(3); // 最多 3 个线程同时进入

void task() {
slots.acquire();
access_limited_resource();
slots.release();
}

适合:

  • 限制数据库连接数
  • 限制同时进行的 I/O
  • 控制 GPU/网络/文件句柄等稀缺资源
  • 生产者消费者中的空槽/满槽计数

二值信号量:

1
std::binary_semaphore sem(0);

它可以当作轻量事件使用。


9. atomic::wait / notify:原子上的阻塞等待

C++20 给原子变量增加了等待和通知。

传统写法可能忙等:

1
2
3
while (!ready.load(std::memory_order_acquire)) {
// spin
}

如果等待时间可能较长,可以用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <atomic>
#include <thread>

std::atomic<bool> ready = false;

void consumer() {
bool expected = false;
while (!ready.load(std::memory_order_acquire)) {
ready.wait(expected);
}
consume_data();
}

void producer() {
produce_data();
ready.store(true, std::memory_order_release);
ready.notify_one();
}

要点:

  • wait(old) 会在原子值仍等于 old 时阻塞
  • 被唤醒后仍要重新检查条件
  • 内存序仍然要写对
  • 适合状态位、序号、轻量事件

它经常能替代“原子标志 + 自旋 + sleep”的土办法。


10. 用原子序号做生产者通知

一个常见模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <atomic>
#include <cstdint>

std::atomic<std::uint64_t> version{0};

void publish() {
update_shared_data();
version.fetch_add(1, std::memory_order_release);
version.notify_all();
}

void wait_next(std::uint64_t seen) {
while (version.load(std::memory_order_acquire) == seen) {
version.wait(seen);
}
read_shared_data();
}

相比单纯 bool,版本号可以表达“状态变化了几次”。
适合配置刷新、数据快照发布、轻量通知。


11. std::atomic_ref:把已有对象当原子访问

atomic_ref 可以对一个已有对象建立原子视图:

1
2
3
4
5
6
7
8
#include <atomic>

int counter = 0;

void inc() {
std::atomic_ref<int> ref(counter);
ref.fetch_add(1, std::memory_order_relaxed);
}

适合:

  • 数据结构字段不能改成 std::atomic<T>
  • 需要临时以原子方式操作某个对象
  • 与 C API 或共享内存布局兼容

但要非常小心:

  1. 对象生命周期必须覆盖所有 atomic_ref
  2. 对象地址必须满足原子操作的对齐要求
  3. 同一对象不能同时被普通非原子访问造成数据竞争
  4. 它不是让任意复杂对象都 magically thread-safe

如果你能直接把字段设计成 std::atomic<T>,通常更清晰。


12. std::shared_mutex:读多写少

shared_mutex 是 C++17 引入的,但很适合补在并发工具里。

场景:

  • 很多线程读
  • 偶尔一个线程写

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <shared_mutex>
#include <mutex>
#include <unordered_map>
#include <string>

std::unordered_map<std::string, int> table;
std::shared_mutex m;

int get(const std::string& key) {
std::shared_lock lk(m);
auto it = table.find(key);
return it == table.end() ? -1 : it->second;
}

void put(std::string key, int value) {
std::unique_lock lk(m);
table[std::move(key)] = value;
}

注意:

  • 读锁太多可能饿死写者,具体策略看实现
  • 读操作必须真的不修改共享状态
  • 如果临界区很小,普通 mutex 可能更快

13. std::osyncstream:多线程输出不串行乱套

多线程直接写 std::cout,经常输出互相穿插:

1
std::cout << "thread " << id << " value " << x << "\n";

osyncstream 可以让一次输出在释放时整体写入:

1
2
3
4
5
6
7
#include <syncstream>
#include <iostream>

void log(int id, int value) {
std::osyncstream(std::cout)
<< "thread " << id << " value " << value << "\n";
}

它适合调试和日志示例。
性能敏感日志系统仍要用专门的异步日志方案。


14. 这些工具该怎么选

常见选择:

场景 优先工具
作用域内启动线程并自动等待 std::jthread
请求后台任务停下 stop_token
一组线程一次性汇合 std::latch
多阶段反复同步 std::barrier
限制同时访问资源数 std::counting_semaphore
等待原子状态变化 atomic::wait/notify
读多写少共享表 std::shared_mutex
多线程调试输出 std::osyncstream

经验:

  • 能用更高层并行算法或 TBB,就不要手写复杂同步
  • 手写同步时,优先使用表达意图明确的工具
  • 取消、生命周期、唤醒路径要一开始就设计好

15. 和线程池、TBB 的关系

这些标准组件是基础积木。
线程池和 TBB 是更高层的任务调度系统。

适合直接用标准组件的场景:

  • 少量长期线程
  • 简单后台 worker
  • 明确的阶段同步
  • 资源数量限制
  • 轻量状态通知

更适合 TBB/线程池的场景:

  • 大量短任务
  • 数据并行循环
  • 递归分治
  • work stealing
  • 复杂 pipeline

一句话:

标准并发工具负责“正确同步”,任务框架负责“高效调度”。


16. 常见误区

16.1 “jthread 会强制杀死线程”

不会。
它只会请求停止并 join,线程函数必须配合检查 stop_token

16.2 “barrier 到了就一定安全访问所有数据”

barrier 只解决阶段同步。
阶段内部仍然不能有数据竞争。

16.3 “semaphore 可以替代所有锁”

不行。
信号量控制数量,不直接保护复杂不变量。

16.4 “atomic::wait 不需要内存序”

仍然需要。
发布数据和读取数据时的 release/acquire 关系不能省。

16.5 “atomic_ref 可以给普通对象补上线程安全”

只能保证通过这个 atomic view 做的操作是原子的。
其他普通访问如果并发发生,仍然可能数据竞争。


17. 一页总结

现代标准并发工具补齐了 C++11 的几个工程短板:

  1. jthread 让线程生命周期更 RAII
  2. stop_token 提供协作式取消协议
  3. latchbarrier 表达汇合与阶段同步
  4. semaphore 表达资源数量限制
  5. atomic::wait/notify 避免粗糙忙等
  6. atomic_ref 能对既有对象做原子视图
  7. osyncstream 让多线程输出更干净

一句话:

C++20/23 并发工具的价值不是“更底层”,而是把常见同步意图写得更直接、更不容易漏生命周期。


18. 建议继续补充的相关主题

和本篇衔接最紧密的内容:

  1. jthread + condition_variable_any 实现可取消线程池
  2. barrier 在 stencil / 仿真中的阶段同步
  3. atomic::wait 实现轻量 MPMC 通知
  4. shared_mutex 与 RCU 思路对比
  5. C++ executors / sender-receiver 的后续演进

19. 参考资料

  1. cppreference: std::jthread
    https://en.cppreference.com/w/cpp/thread/jthread

  2. cppreference: std::barrier
    https://en.cppreference.com/w/cpp/thread/barrier

  3. cppreference: std::counting_semaphore
    https://en.cppreference.com/w/cpp/thread/counting_semaphore

  4. cppreference: atomic wait
    https://en.cppreference.com/w/cpp/atomic/atomic/wait