c++11开始的多线程编程

C++11 起的多线程编程笔记

C++11 标准库并发组件:<thread> / <mutex> / <condition_variable> / <future> / <atomic> / <chrono>
目标:并发执行(性能) + 正确同步(安全) + 生命周期可控(可维护)。

1. 时间与计时 std::chrono

1.1 三个核心类型

  • clock:时钟(如 steady_clocksystem_clock
  • time_point:时间点(clock::time_point
  • duration:时长(seconds/milliseconds/...

1.2 计时:推荐 steady_clock(不会被系统调时影响)

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <chrono>
#include <cstdint>

int main() {
auto t0 = std::chrono::steady_clock::now();
auto t1 = t0 + std::chrono::seconds(30);

auto dt = t1 - t0; // duration
std::int64_t sec = std::chrono::duration_cast<std::chrono::seconds>(dt).count();
std::cout << "dt = " << sec << " s\n";
return 0;
}

1.3 常见时间单位

  • std::chrono::seconds(x)
  • std::chrono::milliseconds(x)
  • std::chrono::microseconds(x)
  • std::chrono::nanoseconds(x)

1.4 测耗时并转 double 毫秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <chrono>

int main() {
auto t0 = std::chrono::steady_clock::now();

std::uint64_t acc = 0;
for (int i = 0; i < 1'000'000; ++i) acc += i;

auto t1 = std::chrono::steady_clock::now();
using double_ms = std::chrono::duration<double, std::milli>;
double ms = std::chrono::duration_cast<double_ms>(t1 - t0).count();

std::cout << "cost = " << ms << " ms, acc=" << acc << "\n";
}

2. 线程休眠:避免忙等

2.1 sleep_for

1
2
3
4
#include <thread>
#include <chrono>

std::this_thread::sleep_for(std::chrono::milliseconds(400));

2.2 sleep_until(对齐节拍更合适)

1
2
3
4
5
#include <thread>
#include <chrono>

auto t = std::chrono::steady_clock::now() + std::chrono::milliseconds(400);
std::this_thread::sleep_until(t);

2.3 实例:固定频率循环(推荐写法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <iostream>
#include <chrono>
#include <thread>

int main() {
using clock = std::chrono::steady_clock;
auto next = clock::now();

for (int i = 0; i < 10; ++i) {
next += std::chrono::milliseconds(100);
std::cout << "tick " << i << "\n";
std::this_thread::sleep_until(next);
}
}

3. 基础线程 std::thread

3.1 启动线程与 join(必须)

1
2
3
4
5
6
7
8
9
#include <thread>
#include <iostream>

void work(int id) { std::cout << "worker " << id << "\n"; }

int main() {
std::thread t(work, 1);
t.join(); // 必须 join 或 detach,否则析构会 std::terminate
}
  • join():等待线程结束(最常用)
  • detach():放飞线程(风险高:对象生命周期/退出时机难控)

4. 异步与返回值:std::async / std::future

4.1 基本用法(带返回值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

int download() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return 404;
}

int main() {
auto fut = std::async(std::launch::async, [] { return download(); });
int ret = fut.get(); // get 只能调用一次
std::cout << "ret=" << ret << "\n";
}

4.2 启动策略

  • std::launch::async:倾向于并行执行
  • std::launch::deferred:延迟到 get()/wait() 才在当前线程执行

4.3 wait / wait_for / wait_until

1
2
3
4
5
6
7
8
9
10
11
auto fut = std::async(std::launch::async, []{
std::this_thread::sleep_for(std::chrono::seconds(2));
return 123;
});

fut.wait(); // 等到完成

auto st = fut.wait_for(std::chrono::milliseconds(100));
if (st == std::future_status::timeout) {
// 还没完成
}

4.4 异常会在 get() 处重抛(非常实用)

1
2
3
4
5
6
auto fut = std::async(std::launch::async, []() -> int {
throw std::runtime_error("boom");
});

try { fut.get(); }
catch (const std::exception& e) { std::cout << e.what() << "\n"; }

5. 互斥锁 std::mutex:保护共享数据

5.1 lock_guard(RAII 自动解锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <mutex>
#include <thread>
#include <iostream>

int main() {
std::mutex m;
int counter = 0;

auto inc = [&] {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lk(m);
++counter;
}
};

std::thread t1(inc), t2(inc);
t1.join(); t2.join();
std::cout << "counter=" << counter << "\n";
}

5.2 死锁规避(两个锁)

C++11 可用 std::lock + std::adopt_lock

1
2
3
4
5
6
7
8
9
10
#include <mutex>

std::mutex m1, m2;

void f() {
std::lock(m1, m2);
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);
// 临界区
}

6. 条件变量 std::condition_variable:生产者-消费者(比 sleep 正确)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <iostream>

int main() {
std::mutex m;
std::condition_variable cv;
std::queue<int> q;
bool done = false;

std::thread producer([&]{
for (int i = 0; i < 5; ++i) {
{ std::lock_guard<std::mutex> lk(m); q.push(i); }
cv.notify_one();
}
{ std::lock_guard<std::mutex> lk(m); done = true; }
cv.notify_one();
});

std::thread consumer([&]{
while (true) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&]{ return done || !q.empty(); }); // 带谓词防虚假唤醒
while (!q.empty()) {
std::cout << "consume " << q.front() << "\n";
q.pop();
}
if (done) break;
}
});

producer.join();
consumer.join();
}

7. 原子 std::atomic:轻量同步(计数/标志位)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <atomic>
#include <thread>
#include <iostream>

int main() {
std::atomic<int> counter{0};

auto inc = [&] { for (int i = 0; i < 100000; ++i) ++counter; };

std::thread t1(inc), t2(inc);
t1.join(); t2.join();

std::cout << "counter=" << counter.load() << "\n";
}

8. 线程池/任务队列(C++11 可直接用)

线程池 = 固定 worker 线程 + 任务队列(阻塞队列)
典型价值:避免频繁创建/销毁线程,提高吞吐;统一任务提交接口,支持 future 返回值与异常回传。

8.1 任务队列:BlockingQueue(阻塞队列)

特性:

  • push() 入队并唤醒消费者
  • pop() 阻塞等待任务;队列关闭且为空时返回 false
  • close() 关闭队列并唤醒全部等待线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <utility>

template <class T>
class BlockingQueue {
public:
BlockingQueue() : closed_(false) {}

BlockingQueue(const BlockingQueue&) = delete;
BlockingQueue& operator=(const BlockingQueue&) = delete;

bool push(T value) {
{
std::lock_guard<std::mutex> lk(m_);
if (closed_) return false;
q_.push(std::move(value));
}
cv_.notify_one();
return true;
}

bool pop(T& out) {
std::unique_lock<std::mutex> lk(m_);
cv_.wait(lk, [&]{ return closed_ || !q_.empty(); });

if (q_.empty()) return false; // closed_ && empty
out = std::move(q_.front());
q_.pop();
return true;
}

void close() {
{
std::lock_guard<std::mutex> lk(m_);
closed_ = true;
}
cv_.notify_all();
}

private:
std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
bool closed_;
};

8.2 线程池:ThreadPool(enqueue 返回 future)

特性:

  • 固定线程数 worker
  • enqueue(f, args...) 返回 std::future<R>
  • 任务抛异常 → 在 future.get() 时重抛
  • 析构/shutdown():停止接收新任务 + 关闭队列 + join 所有 worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#pragma once
#include <vector>
#include <thread>
#include <future>
#include <functional>
#include <stdexcept>
#include <type_traits>
#include <atomic>
#include <utility>

#include "blocking_queue.h"

class ThreadPool {
public:
explicit ThreadPool(size_t nthreads) : accept_(true) {
if (nthreads == 0) nthreads = 1;
workers_.reserve(nthreads);
for (size_t i = 0; i < nthreads; ++i) {
workers_.push_back(std::thread([this]{ worker_loop(); }));
}
}

ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

~ThreadPool() { shutdown(); }

template <class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
typedef typename std::result_of<F(Args...)>::type R;

if (!accept_.load()) {
throw std::runtime_error("ThreadPool is not accepting new tasks.");
}

auto task_ptr = std::make_shared<std::packaged_task<R()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<R> fut = task_ptr->get_future();

bool ok = tasks_.push([task_ptr]{ (*task_ptr)(); });
if (!ok) throw std::runtime_error("Task queue is closed.");

return fut;
}

void shutdown() {
bool expected = true;
if (accept_.compare_exchange_strong(expected, false)) {
tasks_.close();
for (auto& t : workers_) {
if (t.joinable()) t.join();
}
}
}

private:
void worker_loop() {
std::function<void()> task;
while (tasks_.pop(task)) {
task();
}
}

private:
std::vector<std::thread> workers_;
BlockingQueue<std::function<void()>> tasks_;
std::atomic<bool> accept_;
};

8.3 线程池使用示例(返回值 + 异常回传)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <chrono>
#include <thread>
#include "thread_pool.h"

int slow_add(int a, int b) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return a + b;
}

int main() {
ThreadPool pool(4);

auto f1 = pool.enqueue(slow_add, 1, 2);
auto f2 = pool.enqueue([](int x){ return x * x; }, 12);

auto f3 = pool.enqueue([]() -> int {
throw std::runtime_error("boom");
});

std::cout << "f1=" << f1.get() << "\n";
std::cout << "f2=" << f2.get() << "\n";

try {
std::cout << "f3=" << f3.get() << "\n";
} catch (const std::exception& e) {
std::cout << "caught: " << e.what() << "\n";
}

pool.shutdown(); // 可省略:析构也会 shutdown
}