Rall与智能指针 帮我把这个整理一下“# RALL & 智能指针
在多进程编程中由于相互进程线程之间相互关系,首先最重要的就是互相访问对象时不会产生冲突。 RAII(Resource Acquisition Is Initialization,资源获取即初始化)是一种在 C++ 中常见的编程范式,主要用于管理资源(如动态内存、文件句柄、网络连接等)。其核心思想是将资源的生命周期绑定到对象的生命周期,通过对象的构造函数来获取资源,通过对象的析构函数来释放资源。这种方式避免了显式的资源管理,减少了资源泄漏的可能性。
RALL 通常应用于 内存管理(智能指针);文件管理;互斥锁管理 智能指针:unique_ptr shared_ptr
进程和线程 进程都有自己独立的一块内存空间,一个进程可以有多个线程。一个进程只的线程能共享资源
多线形编程 thread 基础线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # include <thread> void foo (args) int main () { std::thread t (foo args) std::thread::join () std::thread::detach () t.joinable std::thread t ([](int a, int b ){ std::cout<< a << " " << b << "\n" ; },1 ,2 ) ;}
参数传递 智能指针
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # include <iostream> # include <thread> # include <memory> void foo_unique (std::unique_per *p) {};void foo_shared (std::shared_per *p) {};int main () { std::unique_ptr<int > ptr_1 = std::make_unique <int >(520 ); std::shared_ptr<int > ptr_2 = std::make_shared <int >(520 ); std::thread t_unique (foo_unique, std::move(ptr_1)) ; t_unique.join (); std::thrad t_shared (foo_shared, ptr_2) ; t_shared.join (); }
线程所有权管理 使用std::move()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # include <iostream> # include <thread> void foo_1 () {};void foo_2 () {};int main () { std::thread t_1 (foo_1) ; std::thread t_2 (std::move(t_1)) ; t_1 = std::thread (foo_2); std::cout<< t_1. joinable (); std::cout<< t_2. joinable (); std::thread t_3; t3 = std::move (t_2); t1. join (); t3. join (); return 0 ; }
std::thrad scoped_thread RALL 自动join()&detach()[需要在思考一下]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # include <iostream> # include <thread> class scoped_thread { std::thread t; public : explicit scoped_thread (std::thread t_) :t(std::move(t_)){ if (!t.joinable ()){ throw std::logic_error ("thread is unjoinable." ); } } ~scoped_thread (){ if (t.joinable ()){ t.join (); } } scoped_thread (scoped_thread const &) = delete ; scoped_thread& operator =(scoped_thread const &) = dalete; } void foo (int cnt) {};int main () { scoped_thread st (std::thread(foo, 100 )) ; return 0 ; }
CPU核心 & 支持线程数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # include <iostream> # include <thread> void foo (int a) { std::count<< std::this_thread::get_id (); }; int main () { size_t n = std::thread::hardware_concurrency () std::vector<std::thread> pv (10 ); for (auto t :pv) t = std::thread (foo); for (auto t : pv){ if (t.joinable ()) t.join (); } return 0 ; }
互斥操作 互斥锁基于一种互斥的原则,即 同一时刻 只允许 一个线程访问被保护的资源。当一个线程获取互斥锁,其他试图访问共享资源的线程会被 阻塞,直到当前这个获取锁的线程 释放锁。确保避免了数据竞争的问题。 std::mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 # include <iostream> # include <thread> # include <mutex> # include <vector> # include <queue> class Myclass { std::queue<int > msg_que; std::mutex mtx; public : void in_msg_que (int num) { for (int i =0 i<num;i++){ mtx.lock (); msg_que.push (i); mtx.unlock (); } } void out_msg_que (int num) { for (int i=0 ;i<num;i++){ int commad; if (pop_command (command)){ }else { } } } bool pop_command (int &commad){ mtx.lock (); if (msg_que.empty ()){ mtx.unlock (); return false ; } command = msg_que.front (); msg_que.pop (); return true ; } }; int main () { MyCLass obj_a; std::thread out_msg_t (&Myclass::out_msg_que, &obj_a,7 ) ; std::thread in_msg_t (&Myclass::in_msg_que, &obj_a,5 ) ; out_msg_t .join (); in_msg_t .join (); return 0 ; }
std::lock_guard 作用逾
1 2 3 4 5 6 7 8 9 10 11 12 13 # include <iostream> # include <thread> # include <mutex> std::mutex mtx; int count;void foo (int num) { for (int i=0 ;i<num;i++){ std::lock_guard<std::mutex> lck (mtx) ; count++ } }
unique_lock lock_guard 升级版支持更多操作,但是带来更多的开销,简单的操作还是采用lock_guard
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 #include <iostream> #include <thread> #include <mutex> #include <vector> #include <queue> class MyClass { std::queue<int > msg_que; std::mutex mtx; public : void in_msg_que (int num) { for (int i = 0 ; i < num; i ++ ){ std::cout << "in_msg_que() running, push data: " << i << "\n" ; std::unique_lock<std::mutex> lck (mtx, std::try_to_lock) ; if (lck.owns_lock ()){ msg_que.push (i); }else { std::cout << "in_msg_que() running, but cannot get lock: " << i << "\n" ; } } } void out_msg_que (int num) { int command = 0 ; for (int i = 0 ; i < num; i ++ ){ if (pop_command (command)){ std::cout << "out_msg_que() running, command is: " << command << "\n" ; }else { std::cout << "out_msg_que() running, queue is empty: " << i << "\n" ; } } } bool pop_command (int & command) { std::unique_lock<std::mutex> lck (mtx) ; if (msg_que.empty ()){ return false ; } command = msg_que.front (); msg_que.pop (); return true ; } }; int main () { MyClass obj; std::thread out_msg_t (&MyClass::out_msg_que, &obj, 10000 ) ; std::thread in_msg_t (&MyClass::in_msg_que, &obj, 10000 ) ; out_msg_t .join (); in_msg_t .join (); return 0 ; }
std::defer_lock: 延迟加锁 std::adopt_lock:已经拥有了互斥量的锁,管理转移给 std::unique_lock 对象 std::try_to_lock:对象并尝试非阻塞地锁定互斥量。如果互斥量当前不可用,它不会阻塞线程,而是直接返回。 lck.owns_lock()是否已经有锁
std::timed_mutex 基本概念 std::timed_mutex::try_lock_for() std::timed_mutex::try_lock_until()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 # include <iostream> # include <thread> # include <mutex> # include <vector> # include <chrono> std::timed_mutex t_mtx; int count;void foo (int id) { std::chrono::milliseconds timeout (100 ) ; auto start = std::chrono::steady_clock::now (); if (t_mtx.try_lock_for (timeout)){ auto end = std::chrono::steady_clock::now (); std::chrono::duration<double , std::milli> dura = end -start; std::cout << "thread " << id << ": Successfully get the lock. " ; std::cout << "waiting duration = " << dura.count () << "ms." << "\n" ; std::this_thread::sleep_for (std::chrono::milliseconds (15 )); count ++ ; t_mtx.unlock (); } } int main () { std::vector<std::thread> threads (10 ) ; for (int i =0 ;i<threads.size ();i++) return 0 ; }
std::recursive_mutex 提供了重复(递归)加锁的特性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # include <iostream> # include <thread> # include <mutex> # include <vector> std::mutex r_mtx; int count;void foo (int k) { if (k == 0 ) return ; r_mtx.lock (); count++; dfs (k-1 ); r_mtx.unlock (); } int main () { std::vector<std::thread> threads (3 ) ; for (auto t : threads) t = std::thread (dfs,5 ); for (auto t: threads) t.join (); return 0 ; }
std::shared_mutex 在 C++ 17 被正式引入,是一种互斥量(mutex)类型,包含在 头文件中。 std::shared_mutex 提供了两种不同的锁机制,其可以作为 共享锁 使用,也可以作为 独占锁 使用。 共享锁 允许多个线程同时对共享数据进行访问,一般用于多个线程读取共享资源的情况。在这种机制下,多个线程可以同时获得互斥量的锁(共享锁),只要对数据的操作仅限于读取。共享锁 可以提高多线程读取数据场景下的并发性能。 独占锁 只允许单个线程在同一时刻对共享数据进行操作,一般用于多线程对共享数据进行修改等操作的情况。独占锁 被一个线程占用时,其他线程就无法获得这个锁,直到这个独占锁被释放,以确保数据的一致性。 共享锁和独占锁感觉就像是在锁中在分出两个大类,比如先完成a任务才能完成b任务之类的 读并发与写阻塞
当有多个读线程获得共享锁时,这些读线程可以并发地对共享资源进行读取操作。此时如果有写线程尝试获取独占锁,这个写线程会被阻塞。
阻塞会一直持续,直到所有当前持有共享锁的读线程都释放了共享锁。
写独占与读阻塞
当一个写线程占有独占锁时,新的读线程尝试获取共享锁会被阻塞。独占锁的优先级高于共享锁。
这种设计是为了防止写操作饥饿(write - starvation)的情况。如果读线程不断地获取共享锁,而写线程一直无法获取独占锁来执行写入操作,这会导致数据不能及时更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <iostream> #include <thread> #include <shared_mutex> #include <chrono> #include <vector> class Reader_Writer { std::shared_mutex s_mtx; int val = 0 ; public : void read_data (int id) { std::shared_lock<std::shared_mutex> lck (s_mtx) ; std::cout << "Reader " << id << " reads the value = " << val << "\n" ; } void reader (int id) { for (int i = 0 ; i < 100 ; i ++ ){ read_data (id); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } } void write_data (int id) { std::unique_lock<std::shared_mutex> lck (s_mtx) ; val ++ ; std::cout << "Writer " << id << " writes the value = " << val << "\n" ; } void writer (int id) { for (int i = 0 ; i < 10 ; i ++ ){ write_data (id); std::this_thread::sleep_for (std::chrono::milliseconds (1000 )); } } }; int main () { Reader_Writer obj; std::vector<std::thread> readers (10 ) ; for (size_t i = 0 ; i < readers.size (); i ++ ){ readers[i] = std::thread (&Reader_Writer::reader, &obj, i); } std::vector<std::thread> writers (2 ) ; for (size_t i = 0 ; i < writers.size (); i ++ ){ writers[i] = std::thread (&Reader_Writer::writer, &obj, i); } for (auto & reader : readers) reader.join (); for (auto & writer : writers) writer.join (); return 0 ; }
std::call_once这个函数模板主要作用是 保证一个函数(可调用对象)在多线程环境下只被调用一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # include <iostream> # include <thread> # include <mutex> # include <vector> std::mutex mtx; std::once_flag flag; int val;void init () { std::call_one (flag,[](){ val = 1 }); }
生产者-消费者问题 std::condition_variable主要用于 线程间的同步。 std::condition_variable::wait() 函数是 std::condition_variable 中的一个关键成员函数,主要用于让线程 等待某个条件成立,通常配合互斥量(mutex)进和 std::unique_lock 使用,以实现线程的同步。 使用 wait() 函数时,只能搭配 std::unique_lock 互斥锁使用,不能使用 std::lock_guard。std::condition_variable::wait() 需要在等待期间自动释放互斥量,然后在被唤醒后重新获取互斥量。但是 std::lock_guard 没有提供这样的功能。
std::condition_variable::notify_one() 主要用于当某个条件成立,通知 一个 处于阻塞状态下(等待该条件成立)的线程,进而唤醒这个线程。 notify_all()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 # include <iostream> # include <thread> # include <mutex> # include <atomic> # include <condition_variable> # include <queue> # include <vector> # include <chrono> std::mutex mtx; std::condition_variable cv; std::queue<int > buffer; const int BUFFER_SIZE = 10 ;const int PRODUCER_NUM = 2 ;const int CONSUMER_NUM = 5 ;int produce_finished_count;void producer (int id, int data_num) { for (int i =0 ;i<data_num;i++){ std::unique_lock<std::mutex> lck (mutx) ; while (buffer.size () >= BUFFER_SIZE){ cv.wait (lck); } buffer.push (i); cv.notify_all (); } { std::lock_guard<std::mutex> lck (mtx) ; produce_finished_count ++; if (produce_finished_count == PRODUCER_NUM){ cv.notify_all (); } } } void consumer (int id) { while (true ){ std::unique_lock<std::mutex> lck (mtx) ; while (buffer.empty () && produce_finished_count < PRODUCER_NUM){ cv.wait (lck); } while (buffer.empty () && produce_finished_count == PRODUCER_NUM) break ; int data = buffer.front (); buffer.pop (); lck.unlok (); cv.notify_all (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } } int main () { std::vector<std::thread> producers (PRODUCER_NUM) ; std::vector<std::thread> consumers (CONSUMER_NUM) ; for (size_t i =0 ;i<producers.size ();i++){ producers[i] = std::thread (producer,i+1 ,1000 ); } for (size_t i=0 ;i<consumers.size ();i++){ consumers[i] = std::thread (consumer,i+1 ,1000 ); } for (auto t: producers) t.join (); for (auto t: consumers) t.join (); return 0 ; }
异步任务 std::async 是 C++ 11 引入的一个函数模板,用于异步地启动一个函数任务。它位于 头文件中
1 2 3 4 5 6 7 8 9 10 11 12 13 # include <iostream> # include <future> int foo (int a, int b) { return a +b; } int main () { std::future<int > res = std::async (std::launch::async, foo, 1 , 2 ); auto data = res.get (); std::cout<< data << "\n" ; return 0 }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 # include <iostream> # include <thread> # include <future> # include <chrono> int foo (int a, int b) { std::cout<< "foo(): thread id =" << std::this_thread::get_id () << "\n" ; std::cout << "foo(): starts calculating the res..." << "\n" ; std::this_thread::sleep_for (std::chrono::seconds (4 )); std::cout << "foo(): ends calculating the res..." << "\n" ; return a + b; } int main () { std::cout << "main(): thread id = " << std::this_thread::get_id () << "\n" ; std::future<int > res = std::async (std::launch::async, foo, 3 , 4 ); std::cout << "main(): starts doing somethine else..." << "\n" ; std::this_thread::sleep_for (std::chrono::seconds (2 )); std::cout << "main(): ends doing something else..." << "\n" ; auto data = res.get (); std::cout << "finally get the res = " << data << "\n" ; std::cout << "Kirisame Marisa." << "\n" ; return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <iostream> #include <thread> #include <future> #include <chrono> int foo (int a, int b) { std::cout << "foo(): thread id = " << std::this_thread::get_id () << "\n" ; std::cout << "foo(): starts calculating the res..." << "\n" ; std::this_thread::sleep_for (std::chrono::seconds (4 )); std::cout << "foo(): ends calculating the res..." << "\n" ; return a + b; } int main () { std::cout << "main(): thread id = " << std::this_thread::get_id () << "\n" ; std::future<int > res = std::async (std::launch::deferred, foo, 3 , 4 ); std::cout << "main(): starts doing somethine else..." << "\n" ; std::this_thread::sleep_for (std::chrono::seconds (2 )); std::cout << "main(): ends doing something else..." << "\n" ; auto data = res.get (); std::cout << "finally get the res = " << data << "\n" ; std::cout << "Kirisame Marisa." << "\n" ; return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <iostream> #include <thread> #include <chrono> #include <future> int foo (int a, int b) { std::cout << "foo(): thread id = " << std::this_thread::get_id () << "\n" ; std::cout << "foo(): starts caculating the res..." << "\n" ; std::this_thread::sleep_for (std::chrono::seconds (2 )); std::cout << "foo(): ends caculating the res..." << "\n" ; return a + b; } int main () { std::cout << "main(): thread id = " << std::this_thread::get_id () << "\n" ; std::future<int > res = std::async (std::launch::async, foo, 3 , 4 ); auto get_data = [&](std::future<int >& res){ auto data = res.get (); std::cout << "finally get the res = " << data << "\n" ; }; std::future_status status = res.wait_for (std::chrono::seconds (3 )); if (status == std::future_status::timeout){ std::cout << "WARNING: time out" << "\n" ; }else if (status == std::future_status::ready){ std::cout << "successfully get result." << "\n" ; get_data (res); }else if (status == std::future_status::deferred){ std::cout << "thread is deferred." << "\n" ; get_data (res); } return 0 ; }
原子操作 原子操作通常用于多线程开发中,由于其特性使得其可以实现安全的多线程访问,并且不需要担心复杂的线程锁等
1.std::atomic
2.std::atomic_flag 1. 类似bool 常量,只有两种状态 ture&false 2. test_and_set()// 将 std::atomic_flag 设置为 ture,但是返回值为设置前的状态 3. clear() // ture -> false; 4. amtoic_flag 自旋锁机制实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # include <iostream> # include <thread> # include <atomic> # include <vector> class spinlock_mutex { std::atomic_flag flag; public : spinlock_mutex ():flag (ATOMIC_FLAG_INIT) {}; void lock () { while (flag.test_and_set ()); } void unlock () { flag.clear (); } } int count;spinlock_mutex mtx; void foo () { mtx.lock (); count++; mtx.clear (); } int main () { std::vector<std::thread> threads (10 ) ; for (auto t : threads) t = std::thread (foo); for (auto t : threads) t.join (); return 0 ; }
std::atomic
std::atomic atm(0) // = atm = 0
load() // int val = atm.load()
store() // operator= 重载
exchange // 注意 atm.exchange(x) 返回的是改变成x前的值,而不是x
std::memory_order_relaxed 内存序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # include <iostream> # include <thread> # include <atomic> # include <chrono> std::atomic<int > atm1 (0 ) , atm2 (0 ) ;int res1 = 0 ; res2 = 0 ;void foo1 () { res1 = atm2.l oad(std::memory_order_relaxed); atm1. store (1 , std::memory_order_relaxed) } void foo2 () { res2 = atm1.l oad(std::memory_order_relaxed); atm2. store (1 ,std::memory_order_relaxed) } int main () { std::thread t1 (foo1) std::thread t2 (foo2) t1.join () ; t2. join (); }
std::memory_order_seq_cst 严格的内存顺序。它不仅保证原子操作的原子性,还保证所有使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <atomic> #include <thread> #include <iostream> #include <chrono> std::atomic<int > atm1 (0 ) , atm2 (0 ) ;int res1 = 0 , res2 = 0 ;void foo1 () { res1 = atm2.l oad(std::memory_order_seq_cst); amt1 = std::store (1 , std::memory_order_seq_cst); } void foo2 () { res2 = atm1.l oad(std::memory_order_seq_cst); amt2 = std::store (1 , std::memory_order_seq_cst); } int main () { std::thread t1 (foo1) ; std::thread t2 (foo2) ; t1. join (); t2. join (); return 0 ; }
总结 多种方法实现加速求和优化 std::mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # include <iostream> # include <thread> # include <mutex> # include <vector> # include <numeric> # include <chrono> const int NUM_THEEAD = 24 ;long long arr_sum = 0 ;std::mutex mtx; void get_sum (const vector<int > &arr, int start, int end) { long long cur_sum = 0 ; for (int i =start; i< end;i++) cur_sum += i; std::lock_guard (std::mutex) lck (mtx); arr_sum += cur_sum; } int main () { const int n = 1e9 ; std::vector<int > arr (n) ; std::iota (begin (arr), end (arr), 0 ); auto start_time = std::chrono::high_resoultion_clock::now (); const int step = n/NUM_THREAD; std::vector (std::thread) threads (NUM_THEAD); for (int i =0 ;i< NUM_THREAD; i++){ int start = i*step; int end = (i == NUM_THREADS - 1 )?n:(i+1 )*step; threads[i] = std::threads (get_sum, std::ref (arr),start, end); } for (auto t:threads) t.join (); return 0 ; }
std::async 异步任务的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 # include <iostream> # include <thread> # include <vector> # include <meteric> # include <chrono> const int NUM_THEADS = 24 ;long long arr_sum;long long get_sum (cont std::vector<int > &arr, int start, int end) { long long cur_sum = 0 ; for (int i = start;i<end;i++){ cur_sum += arr[i]; } return cur_sum; } int main () { const int num = 1e9 ; std::vector<int > arr (n) ; std::iota (begin (arr), end (arr),0 ); const int step = n/NUM_THREADS; std::vector<std::future<long long >> futures (NUM_THREADS); for (int i =0 ;i<NUM_THREADS;i++){ int start = i * step; int end = (i == NUM_THREADS -1 )? n; (i+1 )*step; futures[i] == std::sanyc (get_sum, std::ref (arr),start, end); } for (auto t: futures){ arr_sum += res.get (); } return 0 ; }
std::promise 异步任务的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include <iostream> #include <thread> #include <future> #include <vector> #include <numeric> #include <chrono> const int NUM_THREADS = 24 ;long long arr_sum; void get_sum (const std::vector<int >& arr, int start, int end, std::promise<long long >& pro) { long long cur_sum = 0 ; for (size_t i = start; i < end; i ++ ){ cur_sum += arr[i]; } pro.set_value (cur_sum); } int main () { const int n = 1e9 ; std::vector<int > arr (n) ; std::iota (begin (arr), end (arr), 0 ); auto start_time = std::chrono::high_resolution_clock::now (); const int step = n / NUM_THREADS; std::vector<std::promise<long long >> promises (NUM_THREADS); std::vector<std::future<long long >> futures (NUM_THREADS); std::vector<std::thread> threads (NUM_THREADS) ; for (int i = 0 ; i < NUM_THREADS; i ++ ){ int start = i * step; int end = (i == NUM_THREADS - 1 ) ? n : (i + 1 ) * step; futures[i] = promises[i].get_future (); threads[i] = std::thread (get_sum, std::ref (arr), start, end, std::ref (promises[i])); } for (auto & res : futures){ arr_sum += res.get (); } for (auto & t : threads) t.join (); auto end_time = std::chrono::high_resolution_clock::now (); std::chrono::duration<double , std::milli> dura = end_time - start_time; std::cout << "spend time = " << dura.count () << "ms" << "\n" ; std::cout << "finally the sum = " << arr_sum << "\n" ; return 0 ; }
std::atomic 原子操作的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include <iostream> #include <thread> #include <atomic> #include <vector> #include <numeric> #include <chrono> const int NUM_THREADS = 24 ;std::atomic<long long > arr_sum; void get_sum (const std::vector<int >& arr, int start, int end) { long long cur_sum = 0 ; for (size_t i = start; i < end; i ++ ){ cur_sum += arr[i]; } arr_sum += cur_sum; } int main () { const int n = 1e9 ; std::vector<int > arr (n) ; std::iota (begin (arr), end (arr), 0 ); auto start_time = std::chrono::high_resolution_clock::now (); const int step = n / NUM_THREADS; std::vector<std::thread> threads (NUM_THREADS) ; for (int i = 0 ; i < NUM_THREADS; i ++ ){ int start = i * step; int end = (i == NUM_THREADS - 1 ) ? n : (i + 1 ) * step; threads[i] = std::thread (get_sum, std::ref (arr), start, end); } for (auto & t : threads) t.join (); auto end_time = std::chrono::high_resolution_clock::now (); std::chrono::duration<double , std::milli> dura = end_time - start_time; std::cout << "spend time = " << dura.count () << "ms." << "\n" ; std::cout << "finally the sum = " << arr_sum << "\n" ; return 0 ; }
线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 #include <iostream> #include <thread> #include <mutex> #include <future> #include <condition_variable> #include <functional> #include <vector> #include <queue> class ThreadPool { void work () ; bool stop; std::queue<std::function<void ()>> tasks; std::vector<std::thread> threads; std::condition_variable cv; std::mutex mtx; public : ThreadPool (int thread_num); ~ThreadPool (); template <typename F, typename ... Arg> auto submit (F&& f, Arg&&... args) -> std::future<typename std::result_of<F (Arg...) >::type> ;}; ThreadPool::ThreadPool (int thread_num): stop (false ) { for (size_t i = 0 ; i < thread_num; i ++ ){ threads.emplace_back (&ThreadPool::work, this ); } } ThreadPool::~ThreadPool (){ { std::lock_guard<std::mutex> lck (mtx) ; stop = true ; } cv.notify_all (); for (auto & t : threads){ if (t.joinable ()){ t.join (); } } } template <typename F, typename ... Arg>auto ThreadPool::submit (F&& f, Arg&&... args) -> std::future<typename std::result_of<F (Arg...) >::type> { using func_type = typename std::result_of<F (Arg...)>::type; auto task = std::make_shared<std::packaged_task<func_type ()>>( std::bind (std::forward<F>(f), std::forward<Arg>(args)...) ); { std::lock_guard<std::mutex> lck (mtx) ; if (stop){ throw std::runtime_error ("ERROR: The thread pool is stoped." ); } tasks.emplace ([task](){ (*task)(); }); } cv.notify_one (); return task -> get_future (); } void ThreadPool::work () { while (true ){ std::function<void ()> task; { std::unique_lock<std::mutex> lck (mtx) ; while (tasks.empty () && !stop){ cv.wait (lck); } if (tasks.empty () && stop){ return ; } task = std::move (tasks.front ()); tasks.pop (); } task (); } }
#include <iostream>
#include <future>
#include <vector>
#include "Thread_Pool.cpp" // 引入线程池
using namespace std;
int main(){
ThreadPool pool(4); // 创建一个有 4 个线程的线程池
// 提交一些任务
vector<pair<future<int>, int>> results; // 存储 future 对象及任务编号
for(int i = 0; i < 12; i ++ ){
// 提交任务后,得到 std::future 对象
auto res = pool.submit([](int x){
cout << "Task " << x << ": thread id = " << this_thread::get_id() << "\n";
return x * x;
}, i);
results.emplace_back(move(res), i); // std::future 不可拷贝
}
// 获取结果
for(auto& [res, id] : results){
cout << "Task " << id << ": result = " << res.get() << "\n";
}
return 0;
}```
## 补充
### 构造函数
## 参考引用
1. [RALL]https://blog.csdn.net/weixin_45031801/article/details/142737361
2. [thread]https://marisamagic.github.io”