C++11 起的多线程编程笔记
C++11 标准库并发组件:<thread> / <mutex> / <condition_variable> / <future> / <atomic> / <chrono> 目标:并发执行(性能) + 正确同步(安全) + 生命周期可控(可维护)。
1. 时间与计时 std::chrono 1.1 三个核心类型
clock:时钟(如 steady_clock、system_clock)
time_point:时间点(clock::time_point)
duration:时长(seconds/milliseconds/...)
1.2 计时:推荐 steady_clock(不会被系统调时影响) 1 2 3 4 5 6 7 8 9 10 11 12 13 #include <iostream> #include <chrono> #include <cstdint> int main () { auto t0 = std::chrono::steady_clock::now (); auto t1 = t0 + std::chrono::seconds (30 ); auto dt = t1 - t0; std::int64_t sec = std::chrono::duration_cast <std::chrono::seconds>(dt).count (); std::cout << "dt = " << sec << " s\n" ; return 0 ; }
1.3 常见时间单位
std::chrono::seconds(x)
std::chrono::milliseconds(x)
std::chrono::microseconds(x)
std::chrono::nanoseconds(x)
1.4 测耗时并转 double 毫秒 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <iostream> #include <chrono> int main () { auto t0 = std::chrono::steady_clock::now (); std::uint64_t acc = 0 ; for (int i = 0 ; i < 1'000'000 ; ++i) acc += i; auto t1 = std::chrono::steady_clock::now (); using double_ms = std::chrono::duration<double , std::milli>; double ms = std::chrono::duration_cast <double_ms>(t1 - t0).count (); std::cout << "cost = " << ms << " ms, acc=" << acc << "\n" ; }
2. 线程休眠:避免忙等 2.1 sleep_for 1 2 3 4 #include <thread> #include <chrono> std::this_thread::sleep_for (std::chrono::milliseconds (400 ));
2.2 sleep_until(对齐节拍更合适) 1 2 3 4 5 #include <thread> #include <chrono> auto t = std::chrono::steady_clock::now () + std::chrono::milliseconds (400 );std::this_thread::sleep_until (t);
2.3 实例:固定频率循环(推荐写法) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 #include <iostream> #include <chrono> #include <thread> int main () { using clock = std::chrono::steady_clock; auto next = clock::now (); for (int i = 0 ; i < 10 ; ++i) { next += std::chrono::milliseconds (100 ); std::cout << "tick " << i << "\n" ; std::this_thread::sleep_until (next); } }
3. 基础线程 std::thread 3.1 启动线程与 join(必须) 1 2 3 4 5 6 7 8 9 #include <thread> #include <iostream> void work (int id) { std::cout << "worker " << id << "\n" ; }int main () { std::thread t (work, 1 ) ; t.join (); }
join():等待线程结束(最常用)
detach():放飞线程(风险高:对象生命周期/退出时机难控)
4. 异步与返回值:std::async / std::future 4.1 基本用法(带返回值) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #include <iostream> #include <future> #include <thread> #include <chrono> int download () { std::this_thread::sleep_for (std::chrono::milliseconds (200 )); return 404 ; } int main () { auto fut = std::async (std::launch::async, [] { return download (); }); int ret = fut.get (); std::cout << "ret=" << ret << "\n" ; }
4.2 启动策略
std::launch::async:倾向于并行执行
std::launch::deferred:延迟到 get()/wait() 才在当前线程执行
4.3 wait / wait_for / wait_until 1 2 3 4 5 6 7 8 9 10 11 auto fut = std::async (std::launch::async, []{ std::this_thread::sleep_for (std::chrono::seconds (2 )); return 123 ; }); fut.wait (); auto st = fut.wait_for (std::chrono::milliseconds (100 ));if (st == std::future_status::timeout) { }
4.4 异常会在 get() 处重抛(非常实用) 1 2 3 4 5 6 auto fut = std::async (std::launch::async, []() -> int { throw std::runtime_error ("boom" ); }); try { fut.get (); }catch (const std::exception& e) { std::cout << e.what () << "\n" ; }
5. 互斥锁 std::mutex:保护共享数据 5.1 lock_guard(RAII 自动解锁) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include <mutex> #include <thread> #include <iostream> int main () { std::mutex m; int counter = 0 ; auto inc = [&] { for (int i = 0 ; i < 100000 ; ++i) { std::lock_guard<std::mutex> lk (m); ++counter; } }; std::thread t1 (inc) , t2 (inc) ; t1. join (); t2. join (); std::cout << "counter=" << counter << "\n" ; }
5.2 死锁规避(两个锁) C++11 可用 std::lock + std::adopt_lock:
1 2 3 4 5 6 7 8 9 10 #include <mutex> std::mutex m1, m2; void f () { std::lock (m1, m2); std::lock_guard<std::mutex> lk1 (m1, std::adopt_lock) ; std::lock_guard<std::mutex> lk2 (m2, std::adopt_lock) ; }
6. 条件变量 std::condition_variable:生产者-消费者(比 sleep 正确) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <mutex> #include <condition_variable> #include <queue> #include <thread> #include <iostream> int main () { std::mutex m; std::condition_variable cv; std::queue<int > q; bool done = false ; std::thread producer ([&]{ for (int i = 0 ; i < 5 ; ++i) { { std::lock_guard<std::mutex> lk(m); q.push(i); } cv.notify_one(); } { std::lock_guard<std::mutex> lk(m); done = true ; } cv.notify_one(); }) ; std::thread consumer ([&]{ while (true ) { std::unique_lock<std::mutex> lk(m); cv.wait(lk, [&]{ return done || !q.empty(); }); while (!q.empty()) { std::cout << "consume " << q.front() << "\n" ; q.pop(); } if (done) break ; } }) ; producer.join (); consumer.join (); }
7. 原子 std::atomic:轻量同步(计数/标志位) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 #include <atomic> #include <thread> #include <iostream> int main () { std::atomic<int > counter{0 }; auto inc = [&] { for (int i = 0 ; i < 100000 ; ++i) ++counter; }; std::thread t1 (inc) , t2 (inc) ; t1. join (); t2. join (); std::cout << "counter=" << counter.load () << "\n" ; }
8. 线程池/任务队列(C++11 可直接用)
线程池 = 固定 worker 线程 + 任务队列(阻塞队列) 典型价值:避免频繁创建/销毁线程,提高吞吐;统一任务提交接口,支持 future 返回值与异常回传。
8.1 任务队列:BlockingQueue(阻塞队列) 特性:
push() 入队并唤醒消费者
pop() 阻塞等待任务;队列关闭且为空时返回 false
close() 关闭队列并唤醒全部等待线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #pragma once #include <queue> #include <mutex> #include <condition_variable> #include <utility> template <class T >class BlockingQueue {public : BlockingQueue () : closed_ (false ) {} BlockingQueue (const BlockingQueue&) = delete ; BlockingQueue& operator =(const BlockingQueue&) = delete ; bool push (T value) { { std::lock_guard<std::mutex> lk (m_) ; if (closed_) return false ; q_.push (std::move (value)); } cv_.notify_one (); return true ; } bool pop (T& out) { std::unique_lock<std::mutex> lk (m_) ; cv_.wait (lk, [&]{ return closed_ || !q_.empty (); }); if (q_.empty ()) return false ; out = std::move (q_.front ()); q_.pop (); return true ; } void close () { { std::lock_guard<std::mutex> lk (m_) ; closed_ = true ; } cv_.notify_all (); } private : std::mutex m_; std::condition_variable cv_; std::queue<T> q_; bool closed_; };
8.2 线程池:ThreadPool(enqueue 返回 future) 特性:
固定线程数 worker
enqueue(f, args...) 返回 std::future<R>
任务抛异常 → 在 future.get() 时重抛
析构/shutdown():停止接收新任务 + 关闭队列 + join 所有 worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 #pragma once #include <vector> #include <thread> #include <future> #include <functional> #include <stdexcept> #include <type_traits> #include <atomic> #include <utility> #include "blocking_queue.h" class ThreadPool {public : explicit ThreadPool (size_t nthreads) : accept_(true) { if (nthreads == 0 ) nthreads = 1 ; workers_.reserve (nthreads); for (size_t i = 0 ; i < nthreads; ++i) { workers_.push_back (std::thread ([this ]{ worker_loop (); })); } } ThreadPool (const ThreadPool&) = delete ; ThreadPool& operator =(const ThreadPool&) = delete ; ~ThreadPool () { shutdown (); } template <class F , class ... Args> auto enqueue (F&& f, Args&&... args) -> std::future<typename std::result_of<F (Args...) >::type> { typedef typename std::result_of<F (Args...)>::type R; if (!accept_.load ()) { throw std::runtime_error ("ThreadPool is not accepting new tasks." ); } auto task_ptr = std::make_shared<std::packaged_task<R ()>>( std::bind (std::forward<F>(f), std::forward<Args>(args)...) ); std::future<R> fut = task_ptr->get_future (); bool ok = tasks_.push ([task_ptr]{ (*task_ptr)(); }); if (!ok) throw std::runtime_error ("Task queue is closed." ); return fut; } void shutdown () { bool expected = true ; if (accept_.compare_exchange_strong (expected, false )) { tasks_.close (); for (auto & t : workers_) { if (t.joinable ()) t.join (); } } } private : void worker_loop () { std::function<void ()> task; while (tasks_.pop (task)) { task (); } } private : std::vector<std::thread> workers_; BlockingQueue<std::function<void ()>> tasks_; std::atomic<bool > accept_; };
8.3 线程池使用示例(返回值 + 异常回传) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 #include <iostream> #include <chrono> #include <thread> #include "thread_pool.h" int slow_add (int a, int b) { std::this_thread::sleep_for (std::chrono::milliseconds (200 )); return a + b; } int main () { ThreadPool pool (4 ) ; auto f1 = pool.enqueue (slow_add, 1 , 2 ); auto f2 = pool.enqueue ([](int x){ return x * x; }, 12 ); auto f3 = pool.enqueue ([]() -> int { throw std::runtime_error ("boom" ); }); std::cout << "f1=" << f1. get () << "\n" ; std::cout << "f2=" << f2. get () << "\n" ; try { std::cout << "f3=" << f3. get () << "\n" ; } catch (const std::exception& e) { std::cout << "caught: " << e.what () << "\n" ; } pool.shutdown (); }