楚天

惟楚有材,于斯为盛

TBB 开启的并行编程之旅

time: 2025-12-23

1. 并发与并行:概念与 TBB 的定位

  • 并发(Concurrency):关注“任务结构”。多个任务在同一时间段内推进即可(单核也能并发)。
  • 并行(Parallelism):关注“同时执行”。多个任务在同一时刻运行(通常依赖多核)。

2. 并发任务:task_groupparallel_invoke

2.1 task_group:提交多个独立任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <tbb/task_group.h>
#include <iostream>
#include <string>
#include <thread>
#include <chrono>

void download(const std::string& file) {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Downloaded: " << file << "\n";
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hello, " << name << "\n";
}

int main() {
tbb::task_group tg;
tg.run([&]{ download("hello.zip"); });
tg.run([&]{ interact(); });
tg.wait();
return 0;
}

2.2 parallel_invoke:更简洁的并发调用

当只是“并行执行几个函数”,优先用 parallel_invoke

1
2
3
4
5
6
#include <tbb/parallel_invoke.h>

tbb::parallel_invoke(
[&]{ download("hello.zip"); },
[&]{ interact(); }
);

3. 数据并行:parallel_for / parallel_for_each

3.1 手动分块(task_group)→ 推荐替换为 parallel_for

手动分块能跑,但属于“自己实现调度”。TBB 风格是把“范围”交给调度器切分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <tbb/parallel_for.h>
#include <tbb/blocked_range.h>
#include <vector>
#include <cmath>
#include <cstddef>

int main() {
const size_t n = 1u << 16;
std::vector<float> dp(n);

tbb::parallel_for(
tbb::blocked_range<size_t>(0, n),
[&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
dp[i] = std::sinf(static_cast<float>(i));
}
}
);

return 0;
}

3.2 parallel_for(begin,end,body):最短常用写法

1
2
3
4
5
6
7
8
9
10
11
12
#include <tbb/parallel_for.h>
#include <vector>
#include <cmath>

int main() {
const size_t n = 1u << 16;
std::vector<float> dp(n);

tbb::parallel_for((size_t)0, n, [&](size_t i){
dp[i] = std::sinf((float)i);
});
}

3.3 parallel_for_each:并行遍历容器元素

语义:对 [first,last) 每个元素并行调用一次 func(element);要求元素之间无数据冲突。

1
2
3
4
5
6
7
8
9
10
#include <tbb/parallel_for_each.h>
#include <vector>
#include <cmath>

int main() {
std::vector<float> a(1<<16, 1.0f);
tbb::parallel_for_each(a.begin(), a.end(), [&](float& f){
f = std::sinf(f);
});
}

4. 多维并行:blocked_range2d / 3d

4.1 二维:blocked_range2d(rows/cols)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <tbb/parallel_for.h>
#include <tbb/blocked_range2d.h>
#include <vector>
#include <cmath>
#include <cstddef>

int main() {
const size_t n = 1024;
std::vector<float> a(n * n);

tbb::parallel_for(
tbb::blocked_range2d<size_t>(0, n, 0, n),
[&](const tbb::blocked_range2d<size_t>& r) {
for (size_t i = r.rows().begin(); i != r.rows().end(); ++i) {
for (size_t j = r.cols().begin(); j != r.cols().end(); ++j) {
a[i*n + j] = std::sinf((float)(i*n + j));
}
}
}
);
return 0;
}

4.2 维度记忆

  • 1D:tbb::blocked_range<T>
  • 2D:tbb::blocked_range2d<T>rows()cols()
  • 3D:tbb::blocked_range3d<T>pages()rows()cols()

5. 缩并(Reduce):从“共享变量”到 parallel_reduce

当任务存在“跨迭代依赖”(比如求和、统计),不要用共享变量直接累加,会数据竞争或锁开销巨大。
标准范式:局部累加 + 合并

5.1 parallel_reduce(lambda 版:最常用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <cmath>
#include <cstddef>

int main() {
const size_t n = 1u << 26;

float sum = tbb::parallel_reduce(
tbb::blocked_range<size_t>(0, n),
0.0f, // identity
[&](const tbb::blocked_range<size_t>& r, float local) -> float {
for (size_t i = r.begin(); i != r.end(); ++i) {
local += std::sinf((float)i);
}
return local;
},
[](float a, float b) -> float { return a + b; } // combine
);

(void)sum;
return 0;
}

5.2 parallel_deterministic_reduce

浮点加法不满足结合律:合并顺序不同,末位可能不同。
若需要“每次运行更一致的合并顺序”,可考虑 deterministic 版本(可能略慢)。


6. 缩并(工程写法):Reducer 结构体

当归约逻辑变复杂(多个字段、多统计量、希望复用),用结构体 reducer 更稳、更清晰。

6.1 结构体 reducer:并行 sum sin(i)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <cmath>
#include <cstddef>

struct SinSumReducer {
float sum;

SinSumReducer() : sum(0.0f) {} // identity
SinSumReducer(SinSumReducer&, tbb::split) : sum(0.0f) {} // split ctor

void operator()(const tbb::blocked_range<size_t>& r) {
float local = sum;
for (size_t i = r.begin(); i != r.end(); ++i) {
local += std::sinf((float)i);
}
sum = local;
}

void join(const SinSumReducer& rhs) {
sum += rhs.sum;
}
};

int main() {
const size_t n = 1u << 26;
SinSumReducer body;
tbb::parallel_reduce(tbb::blocked_range<size_t>(0, n), body);
float result = body.sum;
(void)result;
return 0;
}

6.2 多字段统计:sum / min / max / count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <tbb/parallel_reduce.h>
#include <tbb/blocked_range.h>
#include <vector>
#include <limits>
#include <cstddef>

struct StatsReducer {
double sum;
float mn;
float mx;
size_t cnt;

StatsReducer()
: sum(0.0),
mn(std::numeric_limits<float>::infinity()),
mx(-std::numeric_limits<float>::infinity()),
cnt(0) {}

StatsReducer(StatsReducer&, tbb::split)
: sum(0.0),
mn(std::numeric_limits<float>::infinity()),
mx(-std::numeric_limits<float>::infinity()),
cnt(0) {}

const std::vector<float>* a = nullptr;

void operator()(const tbb::blocked_range<size_t>& r) {
double s = sum;
float lo = mn, hi = mx;
size_t c = cnt;

for (size_t i = r.begin(); i != r.end(); ++i) {
float v = (*a)[i];
s += v;
if (v < lo) lo = v;
if (v > hi) hi = v;
++c;
}
sum = s; mn = lo; mx = hi; cnt = c;
}

void join(const StatsReducer& rhs) {
sum += rhs.sum;
if (rhs.mn < mn) mn = rhs.mn;
if (rhs.mx > mx) mx = rhs.mx;
cnt += rhs.cnt;
}
};

int main() {
std::vector<float> a(1u<<20, 1.0f);

StatsReducer body;
body.a = &a;
tbb::parallel_reduce(tbb::blocked_range<size_t>(0, a.size()), body);

// body.sum/body.mn/body.mx/body.cnt
return 0;
}

7. 线程本地累加器:combinable / enumerable_thread_specific

当模式是“每线程一份局部值,最后合并”,这两者非常实用。

7.1 tbb::combinable<T>(标量/小对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <tbb/parallel_for.h>
#include <tbb/combinable.h>
#include <cmath>
#include <cstddef>

int main() {
const size_t n = 1u << 26;
tbb::combinable<double> tls_sum([]{ return 0.0; });

tbb::parallel_for((size_t)0, n, [&](size_t i){
tls_sum.local() += std::sin((double)i);
});

double sum = tls_sum.combine([](double a, double b){ return a + b; });
(void)sum;
return 0;
}

7.2 tbb::enumerable_thread_specific<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <tbb/parallel_for.h>
#include <tbb/enumerable_thread_specific.h>
#include <vector>
#include <cstddef>

int main() {
const size_t n = 1u << 20;
std::vector<int> data(n, 0); // 假设值域 [0,255]

tbb::enumerable_thread_specific<std::vector<size_t>> tls_hist(
[]{ return std::vector<size_t>(256, 0); }
);

tbb::parallel_for((size_t)0, n, [&](size_t i){
tls_hist.local()[(unsigned)data[i]] += 1;
});

std::vector<size_t> hist(256, 0);
for (auto& h : tls_hist)
for (int b = 0; b < 256; ++b) hist[b] += h[b];

return 0;
}

8. 扫描(Scan):parallel_scan(前缀和/累计输出)

parallel_scan 常用于:前缀和、累计概率、积分图等。
关键机制:两阶段(pre-scan / final-scan),用 is_final 控制是否写输出。

8.1 parallel_scan(lambda 版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <tbb/parallel_scan.h>
#include <tbb/blocked_range.h>
#include <vector>
#include <cmath>
#include <cstddef>
#include <iostream>

int main() {
const size_t n = 1u << 20;
std::vector<float> prefix(n);

float total = tbb::parallel_scan(
tbb::blocked_range<size_t>(0, n),
0.0f,
[&](const tbb::blocked_range<size_t>& r, float running, bool is_final) -> float {
for (size_t i = r.begin(); i != r.end(); ++i) {
running += std::sinf((float)i);
if (is_final) prefix[i] = running;
}
return running;
},
[](float a, float b) -> float { return a + b; }
);

std::cout << prefix[n/2] << "\n";
std::cout << total << "\n";
return 0;
}

8.2 parallel_scan(结构体版:工程范式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <tbb/parallel_scan.h>
#include <tbb/blocked_range.h>
#include <vector>
#include <type_traits>
#include <cstddef>

struct PrefixScanBody {
const std::vector<float>& in;
std::vector<float>& out;
float sum;

PrefixScanBody(const std::vector<float>& in_, std::vector<float>& out_)
: in(in_), out(out_), sum(0.0f) {}

PrefixScanBody(PrefixScanBody& b, tbb::split)
: in(b.in), out(b.out), sum(0.0f) {}

template <typename Tag>
void operator()(const tbb::blocked_range<size_t>& r, Tag) {
float temp = sum;
for (size_t i = r.begin(); i != r.end(); ++i) {
temp += in[i];
if constexpr (std::is_same_v<Tag, tbb::final_scan_tag>) {
out[i] = temp;
}
}
sum = temp;
}

void reverse_join(PrefixScanBody& rhs) { sum += rhs.sum; }
void assign(PrefixScanBody& rhs) { sum = rhs.sum; }
};

int main() {
const size_t n = 1u << 20;
std::vector<float> in(n, 1.0f);
std::vector<float> out(n, 0.0f);

PrefixScanBody body(in, out);
tbb::parallel_scan(tbb::blocked_range<size_t>(0, n), body);
return 0;
}

9. 任务域与嵌套:task_arena / isolate

9.1 task_arena:限制并行度 / 隔离并行区域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <tbb/task_arena.h>
#include <tbb/parallel_for.h>
#include <vector>
#include <cmath>

int main() {
const size_t n = 1u << 20;
std::vector<float> a(n);

tbb::task_arena arena(4); // 该区域最多 4 个线程参与
arena.execute([&]{
tbb::parallel_for((size_t)0, n, [&](size_t i){
a[i] = std::sinf((float)i);
});
});
return 0;
}

9.2 this_task_arena::isolate:禁止内部任务被窃取(隔离干扰)

1
2
3
4
5
#include <tbb/this_task_arena.h>

tbb::this_task_arena::isolate([&]{
// 这里 spawn 的任务更隔离,不易跨域被 steal
});

实务提醒:嵌套并行时不要依赖“线程固定/执行路径固定”。更推荐减少共享状态、用 reduce/tls 合并,或用 arena/isolate 控制并行边界。


10. 分块策略(Partitioner):性能与可预测性

TBB 切分范围时可以指定 partitioner:

  • tbb::static_partitioner:划分更固定、可预测
  • tbb::affinity_partitioner:记录历史映射,提高缓存命中(适合重复执行的相似循环)
  • tbb::simple_partitioner:简单切分策略

示例:观察每个线程拿到的块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <tbb/parallel_for.h>
#include <tbb/blocked_range.h>
#include <tbb/task_arena.h>
#include <tbb/this_task_arena.h>
#include <iostream>

int main() {
const size_t n = 32;
tbb::task_arena arena(4);

arena.execute([&]{
tbb::parallel_for(
tbb::blocked_range<size_t>(0, n),
[&](const tbb::blocked_range<size_t>& r){
std::cout
<< "tid=" << tbb::this_task_arena::current_thread_index()
<< " range=[" << r.begin() << "," << r.end() << ")"
<< " size=" << r.size() << "\n";
},
tbb::static_partitioner{}
);
});
return 0;
}

11. 全局并行度控制:global_control(工程常用)

当你不希望 TBB “吃满所有核”,可全局限制:

1
2
3
4
5
6
7
#include <tbb/global_control.h>

int main() {
tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 8);
// 后续 TBB 并行算法最多使用 8 个线程
return 0;
}

12. 并发容器:concurrent_vector

特点:并发 push 更友好,但实现上可能是分段存储,不等同于 std::vector 的严格连续内存语义。

1
2
3
4
5
6
7
8
9
10
11
12
#include <tbb/concurrent_vector.h>
#include <tbb/parallel_for.h>
#include <string>

int main() {
tbb::concurrent_vector<std::string> out;

tbb::parallel_for(0, 1000, [&](int i){
out.push_back("item_" + std::to_string(i));
});
return 0;
}

13. 流水线并行:parallel_pipeline(I/O + compute 的标准解法)

适用:读入→解析→计算→写出,多阶段、不同并行度需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <tbb/parallel_pipeline.h>
#include <tbb/flow_control.h>
#include <iostream>

int main() {
int x = 0;

tbb::parallel_pipeline(
4, // max live tokens

tbb::make_filter<void, int>(
tbb::filter_mode::serial_in_order,
[&](tbb::flow_control& fc) -> int {
if (x >= 20) { fc.stop(); return 0; }
return x++;
}
)
&
tbb::make_filter<int, int>(
tbb::filter_mode::parallel,
[&](int v) -> int {
return v * v; // heavy compute
}
)
&
tbb::make_filter<int, void>(
tbb::filter_mode::serial_in_order,
[&](int y) {
std::cout << y << "\n";
}
)
);

return 0;
}

补充

https://en.cppreference.com/w/cpp/language/access.html?utm_source=chatgpt.com
https://en.cppreference.com/w/cpp/language/operators.html?utm_source=chatgpt.com
https://mooshak.dcc.fc.up.pt/~oni-judge/doc/cppreference/reference/en/cpp/language/constructor.html?utm_source=chatgpt.com

C++11 起的多线程编程笔记

C++11 标准库并发组件:<thread> / <mutex> / <condition_variable> / <future> / <atomic> / <chrono>
目标:并发执行(性能) + 正确同步(安全) + 生命周期可控(可维护)。

1. 时间与计时 std::chrono

1.1 三个核心类型

  • clock:时钟(如 steady_clocksystem_clock
  • time_point:时间点(clock::time_point
  • duration:时长(seconds/milliseconds/...

1.2 计时:推荐 steady_clock(不会被系统调时影响)

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <chrono>
#include <cstdint>

int main() {
auto t0 = std::chrono::steady_clock::now();
auto t1 = t0 + std::chrono::seconds(30);

auto dt = t1 - t0; // duration
std::int64_t sec = std::chrono::duration_cast<std::chrono::seconds>(dt).count();
std::cout << "dt = " << sec << " s\n";
return 0;
}

1.3 常见时间单位

  • std::chrono::seconds(x)
  • std::chrono::milliseconds(x)
  • std::chrono::microseconds(x)
  • std::chrono::nanoseconds(x)

1.4 测耗时并转 double 毫秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <chrono>

int main() {
auto t0 = std::chrono::steady_clock::now();

std::uint64_t acc = 0;
for (int i = 0; i < 1'000'000; ++i) acc += i;

auto t1 = std::chrono::steady_clock::now();
using double_ms = std::chrono::duration<double, std::milli>;
double ms = std::chrono::duration_cast<double_ms>(t1 - t0).count();

std::cout << "cost = " << ms << " ms, acc=" << acc << "\n";
}

2. 线程休眠:避免忙等

2.1 sleep_for

1
2
3
4
#include <thread>
#include <chrono>

std::this_thread::sleep_for(std::chrono::milliseconds(400));

2.2 sleep_until(对齐节拍更合适)

1
2
3
4
5
#include <thread>
#include <chrono>

auto t = std::chrono::steady_clock::now() + std::chrono::milliseconds(400);
std::this_thread::sleep_until(t);

2.3 实例:固定频率循环(推荐写法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <iostream>
#include <chrono>
#include <thread>

int main() {
using clock = std::chrono::steady_clock;
auto next = clock::now();

for (int i = 0; i < 10; ++i) {
next += std::chrono::milliseconds(100);
std::cout << "tick " << i << "\n";
std::this_thread::sleep_until(next);
}
}

3. 基础线程 std::thread

3.1 启动线程与 join(必须)

1
2
3
4
5
6
7
8
9
#include <thread>
#include <iostream>

void work(int id) { std::cout << "worker " << id << "\n"; }

int main() {
std::thread t(work, 1);
t.join(); // 必须 join 或 detach,否则析构会 std::terminate
}
  • join():等待线程结束(最常用)
  • detach():放飞线程(风险高:对象生命周期/退出时机难控)

4. 异步与返回值:std::async / std::future

4.1 基本用法(带返回值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

int download() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return 404;
}

int main() {
auto fut = std::async(std::launch::async, [] { return download(); });
int ret = fut.get(); // get 只能调用一次
std::cout << "ret=" << ret << "\n";
}

4.2 启动策略

  • std::launch::async:倾向于并行执行
  • std::launch::deferred:延迟到 get()/wait() 才在当前线程执行

4.3 wait / wait_for / wait_until

1
2
3
4
5
6
7
8
9
10
11
auto fut = std::async(std::launch::async, []{
std::this_thread::sleep_for(std::chrono::seconds(2));
return 123;
});

fut.wait(); // 等到完成

auto st = fut.wait_for(std::chrono::milliseconds(100));
if (st == std::future_status::timeout) {
// 还没完成
}

4.4 异常会在 get() 处重抛(非常实用)

1
2
3
4
5
6
auto fut = std::async(std::launch::async, []() -> int {
throw std::runtime_error("boom");
});

try { fut.get(); }
catch (const std::exception& e) { std::cout << e.what() << "\n"; }

5. 互斥锁 std::mutex:保护共享数据

5.1 lock_guard(RAII 自动解锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <mutex>
#include <thread>
#include <iostream>

int main() {
std::mutex m;
int counter = 0;

auto inc = [&] {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lk(m);
++counter;
}
};

std::thread t1(inc), t2(inc);
t1.join(); t2.join();
std::cout << "counter=" << counter << "\n";
}

5.2 死锁规避(两个锁)

C++11 可用 std::lock + std::adopt_lock

1
2
3
4
5
6
7
8
9
10
#include <mutex>

std::mutex m1, m2;

void f() {
std::lock(m1, m2);
std::lock_guard<std::mutex> lk1(m1, std::adopt_lock);
std::lock_guard<std::mutex> lk2(m2, std::adopt_lock);
// 临界区
}

6. 条件变量 std::condition_variable:生产者-消费者(比 sleep 正确)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <iostream>

int main() {
std::mutex m;
std::condition_variable cv;
std::queue<int> q;
bool done = false;

std::thread producer([&]{
for (int i = 0; i < 5; ++i) {
{ std::lock_guard<std::mutex> lk(m); q.push(i); }
cv.notify_one();
}
{ std::lock_guard<std::mutex> lk(m); done = true; }
cv.notify_one();
});

std::thread consumer([&]{
while (true) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&]{ return done || !q.empty(); }); // 带谓词防虚假唤醒
while (!q.empty()) {
std::cout << "consume " << q.front() << "\n";
q.pop();
}
if (done) break;
}
});

producer.join();
consumer.join();
}

7. 原子 std::atomic:轻量同步(计数/标志位)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <atomic>
#include <thread>
#include <iostream>

int main() {
std::atomic<int> counter{0};

auto inc = [&] { for (int i = 0; i < 100000; ++i) ++counter; };

std::thread t1(inc), t2(inc);
t1.join(); t2.join();

std::cout << "counter=" << counter.load() << "\n";
}

8. 线程池/任务队列(C++11 可直接用)

线程池 = 固定 worker 线程 + 任务队列(阻塞队列)
典型价值:避免频繁创建/销毁线程,提高吞吐;统一任务提交接口,支持 future 返回值与异常回传。

8.1 任务队列:BlockingQueue(阻塞队列)

特性:

  • push() 入队并唤醒消费者
  • pop() 阻塞等待任务;队列关闭且为空时返回 false
  • close() 关闭队列并唤醒全部等待线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <utility>

template <class T>
class BlockingQueue {
public:
BlockingQueue() : closed_(false) {}

BlockingQueue(const BlockingQueue&) = delete;
BlockingQueue& operator=(const BlockingQueue&) = delete;

bool push(T value) {
{
std::lock_guard<std::mutex> lk(m_);
if (closed_) return false;
q_.push(std::move(value));
}
cv_.notify_one();
return true;
}

bool pop(T& out) {
std::unique_lock<std::mutex> lk(m_);
cv_.wait(lk, [&]{ return closed_ || !q_.empty(); });

if (q_.empty()) return false; // closed_ && empty
out = std::move(q_.front());
q_.pop();
return true;
}

void close() {
{
std::lock_guard<std::mutex> lk(m_);
closed_ = true;
}
cv_.notify_all();
}

private:
std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
bool closed_;
};

8.2 线程池:ThreadPool(enqueue 返回 future)

特性:

  • 固定线程数 worker
  • enqueue(f, args...) 返回 std::future<R>
  • 任务抛异常 → 在 future.get() 时重抛
  • 析构/shutdown():停止接收新任务 + 关闭队列 + join 所有 worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#pragma once
#include <vector>
#include <thread>
#include <future>
#include <functional>
#include <stdexcept>
#include <type_traits>
#include <atomic>
#include <utility>

#include "blocking_queue.h"

class ThreadPool {
public:
explicit ThreadPool(size_t nthreads) : accept_(true) {
if (nthreads == 0) nthreads = 1;
workers_.reserve(nthreads);
for (size_t i = 0; i < nthreads; ++i) {
workers_.push_back(std::thread([this]{ worker_loop(); }));
}
}

ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

~ThreadPool() { shutdown(); }

template <class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
typedef typename std::result_of<F(Args...)>::type R;

if (!accept_.load()) {
throw std::runtime_error("ThreadPool is not accepting new tasks.");
}

auto task_ptr = std::make_shared<std::packaged_task<R()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<R> fut = task_ptr->get_future();

bool ok = tasks_.push([task_ptr]{ (*task_ptr)(); });
if (!ok) throw std::runtime_error("Task queue is closed.");

return fut;
}

void shutdown() {
bool expected = true;
if (accept_.compare_exchange_strong(expected, false)) {
tasks_.close();
for (auto& t : workers_) {
if (t.joinable()) t.join();
}
}
}

private:
void worker_loop() {
std::function<void()> task;
while (tasks_.pop(task)) {
task();
}
}

private:
std::vector<std::thread> workers_;
BlockingQueue<std::function<void()>> tasks_;
std::atomic<bool> accept_;
};

8.3 线程池使用示例(返回值 + 异常回传)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <chrono>
#include <thread>
#include "thread_pool.h"

int slow_add(int a, int b) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return a + b;
}

int main() {
ThreadPool pool(4);

auto f1 = pool.enqueue(slow_add, 1, 2);
auto f2 = pool.enqueue([](int x){ return x * x; }, 12);

auto f3 = pool.enqueue([]() -> int {
throw std::runtime_error("boom");
});

std::cout << "f1=" << f1.get() << "\n";
std::cout << "f2=" << f2.get() << "\n";

try {
std::cout << "f3=" << f3.get() << "\n";
} catch (const std::exception& e) {
std::cout << "caught: " << e.what() << "\n";
}

pool.shutdown(); // 可省略:析构也会 shutdown
}

动态规划

入门

  1. 爬楼梯
  2. 使用最小花费爬楼梯
  3. 爬楼梯 II
    需要注意数组初始化问题
  4. 组合总和 Ⅳ
    翻译成爬楼梯的问题
    这个题可以在想想扩展一下
  5. 统计构造好字符串的方案数
    在dp数组不确定初始化的情况下
    采用dfs的方法通过规定边界实现初始化简化
  6. 统计打字方案数【1】
    好好在思考一下

打家劫舍

  1. 打家劫舍
    lambda 在递归中应用需要补一下
  2. 打家劫舍 II

Masked Autoencoders Are Scalable Vision Learners

论文地址:https://arxiv.org/pdf/2111.06377。
作者:何凯明
机构: Facebook

摘要

带掩码的自编码器是一个可拓展的视觉学习器
MAE基于两个核心设计,可以高效的训练大模型:
1、非对称的encoder-decoder架构编码器只作用在可见的patch中,对于丢掉的patch,编码器不会对它进行编码,这样能够节省一定的计算时间。解码器是一个比较轻量的解码器,拿到编码器的输出之后,重构被遮挡住的块。
2、使用较高的mask比例将mask比例设置为75%,迫使模型得到一个较好的自监督训练效果。如果只是遮住几块的话,只需要进行插值就可以出来了,模型可能学不到特别的东西。编码器只编码1/4大小的图片,可以降低计算量,训练加速3倍或者以上模型结构。最终作者只使用一个最简单的ViT-Huge模型(模型结构来自ViT这篇文章),在ImageNet-1K(100万张图片)上预训练,准确率能达到87.8%;这个模型主要是用来做迁移学习,它证明了在迁移学习的其他任务上表现也非常好注:在ViT这篇文章的最后一段有提到过怎样做自监督学习,作者说效果并不是很好,结论是:用有标号的模型和大的训练集可以得到很好的效果。本文亮点是,只使用小的数据集(ImageNet-1k,100万张图片),而且是通过自监督,就能够做到跟之前可能效果一样好的模型了。

网络结构

网络图

让扩散模型回归“去噪”本质

论文标题: Back to Basics: Let Denoising Generative Models Denoise
作者: Tianhong Li(黎天鸿), Kaiming He(何恺明)
机构: 麻省理工学院 (MIT)
论文地址: https://arxiv.org/abs/2511.13720
代码仓库: https://github.com/LTH14/JiT
总结:本文指出了一个反常识问题,就是传统的去噪模型每步预测的唯噪声,而噪声在高维度中的分布是很难观察的,但是如果将预测噪声改为预测真实纯洁的图像就能改变这一问题。

黄山

AI创作图黄山 奇松 山谷 云海 松树 绿色 自然 风景 旅行 摄影 中国 安徽省 地理 生态 宁静 壮丽nipic.com-昵图网

一、整体思路(预算版)

  • 出行方式:广州 → 黄山北 高铁往返,二等座一般单程 520–600 元左右,往返约 1100–1200 元。(携程火车票)

  • 住宿策略

    • 不住山上,两晚都住山下(汤口或者屯溪),选 100–150/晚的经济型酒店/客栈。
  • 景点选择

    • D1–D2:黄山一日半:走索道+步行的轻量版线路
    • D3:宏村半日 + 黄山市区随便逛,视预算可选
  • 控制花费的小技巧

    • 索道只坐一段(上山坐,视体力下山走部分路)。
    • 山上的水/零食尽量自带一些。
    • 宏村可以跟当地一日游/拼车,省心也省一点车费。

二、详细行程规划

▶ D1:广州 → 黄山北 → 汤口/屯溪(轻松抵达日)

上午 / 中午

  • 广州南/广州东/广州新塘 → 黄山北 坐高铁,尽量选中午前出发、下午两三点到黄山北的车次(车程约 5 小时)。(Trip.com)

  • 黄山北出站后:

    • 若第二天一早上黄山:建议直接去 汤口镇(黄山南大门),大巴或拼车约 1–1.5 小时。
    • 若想市区逛一逛:可以先住 屯溪老街附近,晚上吃吃逛逛。

下午 / 晚上

  • 办理入住(控制在 100–150 元/晚 的快捷酒店/客栈)。
  • 在屯溪的话:逛 屯溪老街,吃一顿徽菜(毛豆腐、臭鳜鱼、刀板香等)。
  • 在汤口的话:熟悉第二天上山的大巴、售票处位置,早点休息。

建议:预算紧 & 主要目标是黄山 → D1 晚上住汤口 会更高效,第二天可以早早上山。


▶ D2:黄山一日游(索道 + 精华步行,不住山上)

预算版选择:黄山一日“精华速刷版”,不上主峰也能感受到黄山特色。

早上

  1. 早起从酒店 → 汤口新国线换乘中心

  2. 买票:

  3. 坐大巴到 云谷寺(后山口) → 再坐 云谷索道上山,单程 80 元左右(平旺季)。(hf.bendibao.com)

上山后推荐线路(强度中等):

  • 索道上站(白鹅岭) → 始信峰(看奇松、怪石)
  • 北海景区:清凉台、猴子观海 等观景点
  • 喜欢走一点路的:可以往 西海入口 方向走一段,再折返,不进大峡谷深处(省体力)。
  • 中午在北海/光明顶附近简单吃点自带干粮或景区快餐。

下午下山

  • 体力不错:

    • 从北海 → 白鹅岭 → 步行下到 云谷寺(路程较长但一路风景不错)。
  • 体力一般:

    • 返回云谷索道上站 → 再坐索道下山(再花 1×80 元)。

然后:云谷寺 → 大巴 → 汤口 → 回酒店休息。

不上天都峰/莲花峰,可以大幅度降低体力和时间消耗,一天玩得比较从容。


▶ D3:宏村半日游 + 回广州

早上:汤口/屯溪 → 宏村

  • 从汤口/黄山市区报一个去 宏村的拼车/一日游,或者去汽车站坐车,车程约 1–1.5 小时。
  • 宏村门票:成人约 90–100 元左右。(去哪儿网)

宏村游玩重点:

  • 南湖、月沼:徽派建筑 + 水面倒影,拍照很好看。
  • 巷子里慢慢走走,找几家咖啡/小吃店坐坐,体验古村氛围。

下午

  • 午后从宏村 → 返回黄山北 / 屯溪。
  • 预留足够时间,傍晚或晚上坐高铁 黄山北 → 广州

三、人均费用粗算(按“2000 左右”来配)

下表是大致估算,实际以你订到的车票/酒店为准。

1. 大头:往返高铁

  • 广州 → 黄山北 二等座:约 520–600 元
  • 黄山北 → 广州 二等座:约 520–600 元
    👉 合计按 1100–1200 元 预估。(携程火车票)

2. 景区门票 + 索道 + 大巴

  • 黄山门票:190

  • 黄山景区大巴:38(往返)

  • 索道:

    • 只坐上行一次:80
    • 若上下都坐:160
  • 宏村门票:约 94(按网上成人价估算)(去哪儿网)

控制版(只坐单程索道)合计:
190 + 38 + 80 + 94 ≈ 402 元

如果你觉得体力一般,上下都坐索道:
190 + 38 + 160 + 94 ≈ 482 元

3. 住宿

  • D1:汤口/屯溪经济型酒店 100–150
  • D2:同上,再 100–150
    👉 住宿预估:200–300 元

4. 吃饭 + 当地交通

  • 吃饭按 80–100 元/天 算,3 天 ≈ 240–300 元
  • 市内/镇内小交通、拼车到宏村等:预留 100–150 元

5. 总体汇总(偏省钱方案)

按较省的方式估算(高铁 1100、索道只坐一次、酒店便宜点):

  • 高铁:1100
  • 黄山 + 宏村门票/车/索道:约 400
  • 住宿:约 250
  • 吃饭 + 小交通:约 350

合计 ≈ 2100 元左右

也就是说:“两千左右”是可以做到的,但会略微紧张,主要看你高铁能不能抢到稍便宜的车次、酒店选多便宜的、索道坐几次。
如果你想更轻松一点(多坐索道、吃好一点),建议心里预留到 2200–2300 更舒服。


四、给你一个能直接照抄的简短行程版

  • D1:广州 → 黄山北(高铁)→ 汤口,入住;晚上简单逛逛、准备上山物资。
  • D2:汤口 → 黄山(云谷索道上 → 北海 → 始信峰 → 光明顶附近 → 视体力部分步行下山/坐索道下),晚上回汤口 / 屯溪休息。
  • D3:早上去宏村(南湖、月沼拍照闲逛),下午返回黄山北,傍晚/晚上高铁回广州。

如果你愿意告诉我大概出行月份 + 更偏“省钱”还是“轻松”,我可以帮你把上面每一天细到:大概几点出门、建议坐的车次时段、黄山具体走哪条步行路线,给你一个“可直接发给同伴/直接订票用”的终版行程单。

第二次组会

一共四个人汇报,其中在投文献一篇方向为拉曼,三篇论汇报拉曼两片,层析一篇

在投文献

深度学习预测单细胞H2O2含量

数据集

细胞的数据集在预测问题上可以看作为时间序列,或者弱化其时间的数据看为序列。这种序列和文字序列有一定的相识性,在处理这方面数据可以更多的考虑循环神经网络和注意力机制进行处理

网络

注意力机制

损失函数

未知

评价指标

R^2

论文问题

网络结构不清晰,深度学习三要素 网络模型、损失函数和评价指标的选取有问题

论文汇报

transformer+RNN 超高清重建图像

网络

RNN(head)

GAN生成式对抗网络

GAN生成式对抗网络,GAN网络由生成网络和判别网络两个网络组成,生成式网络通过学习图片后将噪声转换为图片,而判别式网络将区分数据集图片和生成图片进行判别。生成式与判别式网络类似与小偷与警察,通过不断的博弈互相升级,在这场游戏中我们更加希望小偷取得胜利。既是生成式网络生成的图片可以让判别器难以判断。

网络结构

网络模型

损失函数

评价指标

pytorch实现

GAN缺陷与改进