楚天

惟楚有材,于斯为盛

数据处理与数据集评估笔记

1. 为什么数据决定模型上限

对句子嵌入、检索召回和 RAG 场景来说,模型当然重要,但很多时候真正决定上限的是数据。

常见原因有:

  • 正样本质量决定模型学到什么叫“相关”
  • 负样本质量决定模型能否真正学会区分
  • 样本分布决定模型上线后是否能泛化
  • 标签噪声决定训练信号是否稳定
  • 评估集质量决定你是否能看清真实问题

先记住一个判断:模型是在数据里学规律。如果数据本身混乱、偏斜或失真,训练出来的结果大概率也会带着同样的问题。


2. 数据处理到底包括什么

很多人一提数据处理,只想到“删乱码、去空值”,但在 embedding 项目里,数据处理通常至少包括下面这些部分:

  • 数据采集
  • 样本构造
  • 数据清洗
  • 去重和归一化
  • 标签治理
  • 样本分布分析
  • 训练 / 验证 / 测试切分
  • 评估集构建
  • 数据版本管理

换个更完整的说法:

数据处理 = 数据工程 + 数据质量控制 + 数据评估设计。


3. 先定义样本单位,再谈处理策略

在做数据处理之前,第一件事不是清洗,而是先明确“一个样本到底是什么”。

不同任务里,样本单位并不一样:

3.1 相似度任务

通常是句子对:

  • sentence_a
  • sentence_b
  • label

3.2 检索任务

通常是:

  • query
  • positive
  • negativehard_negative

3.3 文档向量库构建

通常是:

  • doc_id
  • chunk_id
  • text
  • metadata

如果样本单位没定义清楚,后面很多统计都会失真。
比如你以“句子对”为单位去算分布,和以“query”为单位去算分布,结论会完全不同。


4. 数据处理的主线流程

一个比较稳的流程通常是:

  1. 明确任务目标和样本单位
  2. 采集原始数据
  3. 构造正样本、负样本、hard negative
  4. 做文本清洗和标准化
  5. 做去重、去模板、去噪
  6. 分析样本分布和标签分布
  7. 切分 train / valid / test
  8. 构建标准评估集和 badcase 集
  9. 训练第一版模型
  10. 用 badcase 反推数据问题,再回流修正

这条链路里,最容易被忽视的不是清洗,而是:

  • 分布分析
  • 泄漏排查
  • 评估集设计

5. 数据清洗要做什么

5.1 基础清洗

最基础的一层通常包括:

  • 去空文本
  • 去乱码
  • 去无意义符号
  • 去极短文本
  • 去极长文本
  • 统一空白符
  • 统一全角半角
  • 统一大小写

如果是中文场景,还经常要考虑:

  • 简繁统一
  • 标点统一
  • URL、邮箱、手机号脱敏
  • 时间、数字、金额是否归一化

5.2 噪声清洗

很多业务数据不是“脏”,而是“有大量无效结构”。

例如:

  • 页眉页脚
  • 导航栏
  • 广告词
  • 模板开场白
  • 重复免责声明
  • OCR 识别错误文本

这些内容如果不清掉,会让模型学到大量无关模式。

5.3 去重

去重非常重要,因为重复样本会让训练和评估都失真。

常见去重对象:

  • 完全重复文本
  • 近重复文本
  • 同一 query 的重复点击对
  • 同一文档切块后的重复片段

如果不做去重,常见后果有:

  • 模型过度拟合高频表达
  • 评估结果虚高
  • 数据规模看起来很大,但有效信息量并不高

6. 样本分布为什么必须看

很多训练失败,不是模型没学会,而是数据分布本身有问题。

至少要看下面几类分布。

6.1 文本长度分布

要看:

  • query 长度分布
  • document / passage 长度分布
  • chunk 长度分布

如果训练集里大部分文本很短,但线上是长 query 或长段落,模型上线后就容易掉点。

6.2 标签分布

例如:

  • 正负样本比例
  • 2 / 1 / 0 多档标签比例
  • query 是否只有正样本没有负样本

如果标签极度失衡,模型往往会学到偏置策略,而不是学到真正的语义边界。

6.3 场景分布

要看数据是否覆盖不同业务场景,例如:

  • FAQ
  • 搜索
  • 商品检索
  • 知识库问答
  • 代码检索

如果训练样本只覆盖热门场景,长尾场景通常会明显掉效果。

6.4 热门与长尾分布

要区分:

  • 高频 query
  • 长尾 query
  • 高频文档
  • 稀有文档

如果训练集被热门 query 主导,模型可能对高频表达很强,但对长尾表达泛化很差。

6.5 正样本来源分布

正样本如果同时来自多种来源,例如:

  • 点击日志
  • 人工标注
  • FAQ 映射
  • 改写生成

最好分别统计占比,因为不同来源噪声强度不一样。

6.6 负样本难度分布

不能只看负样本数量,还要看难度结构。

通常可以分成:

  • 随机负样本
  • 同域负样本
  • hard negative

如果全是简单负样本,模型离线 loss 可能很好看,但上线区分能力依然不够。


7. 标签质量怎么检查

数据量大不代表标签质量高。

标签问题通常来自:

  • 点击不等于相关
  • 曝光位置偏置
  • 人工标注标准不一致
  • 自动构造规则太粗糙
  • 历史系统的错误被继承进新数据

可以从下面几方面检查:

7.1 抽样人工复核

最直接也最有效。

重点抽样:

  • 高频 query
  • 长尾 query
  • 高相似但错召回样本
  • 模型和标签明显冲突的样本

7.2 看标签一致性

例如同一个 query 是否出现下面这种情况:

  • 同一文档有时标正样本
  • 有时又标成负样本

这种冲突标签会直接污染训练信号。

7.3 看模型反常样本

如果一个样本长期表现为:

  • 标签是正样本,但模型始终给很低分
  • 标签是负样本,但模型和人工都觉得很相关

那就要怀疑数据或标签本身有问题,而不是先怀疑模型。


8. 正样本和负样本要怎么治理

8.1 正样本治理

正样本要优先保证“真相关”。

对于日志数据,常见过滤规则包括:

  • 去掉极短停留
  • 去掉误点击
  • 去掉曝光高但满意度低的内容
  • 去掉跳出率特别高的点击对

如果是结构化映射数据,还要检查:

  • 映射关系是否过期
  • 标题和正文是否错配
  • 类目标签是否长期维护

8.2 负样本治理

负样本的核心不是“越多越好”,而是“越像线上干扰项越好”。

常见做法:

  • 先用随机负样本保证基础区分
  • 再补同域负样本
  • 最后重点挖 hard negative

8.3 Hard Negative 是最值得花时间的部分

hard negative 常见来源:

  • BM25 高分误召回
  • 旧 embedding 模型的错召回
  • Cross-Encoder 高混淆样本
  • 人工 badcase 回流

很多时候,模型的真实提升不来自更多数据,而来自更强的 hard negative。


9. 训练集、验证集、测试集怎么切

切分数据时,重点不是比例,而是避免数据泄漏。

9.1 常见切分比例

  • 8:1:1
  • 9:0.5:0.5

9.2 更重要的是避免泄漏

例如:

  • 同一问题的改写版本,不要分到不同集合
  • 同一文档的多个 chunk,不要乱落到 train 和 test
  • 同一用户会话,不要一部分在训练、一部分在测试
  • 同一商品不同标题变体,不要跨集合乱分

如果泄漏存在,评估结果通常会明显虚高。

9.3 切分方式最好按业务实体来做

例如按下面这些维度切,通常更稳:

  • query_id
  • doc_id
  • 按会话
  • 按用户

而不是简单随机切行。


10. 评估集应该怎么设计

评估集不是简单从训练数据里抽一点出来,而是要刻意设计。

一个好的评估集通常要满足:

  • 覆盖核心业务场景
  • 覆盖热门 query 和长尾 query
  • 覆盖高混淆样本
  • 尽量有人工校验
  • 分布尽量接近线上真实请求

10.1 标准评估集

这是每次训练都要跑的稳定基线。

要求:

  • 版本稳定
  • 标注质量高
  • 用来做横向对比

10.2 Badcase 集

这是最重要的补充集。

来源通常是:

  • 线上错召回
  • 用户投诉
  • 人工复核发现的高价值错误
  • 新业务新术语

badcase 集的作用不是看平均分,而是防止关键错误反复出现。

10.3 分层评估

如果场景复杂,建议把评估集再分层:

  • 热门 query 集
  • 长尾 query 集
  • 专业术语集
  • 否定表达集
  • 数字、时间、版本号敏感集

这样你才能知道模型到底是“整体提升”,还是只在某一类场景提升。


11. 数据集评估到底评什么

除了看模型指标,还应该单独评估数据集本身。

至少可以看下面几类指标。

11.1 规模指标

  • 样本总量
  • 唯一 query 数
  • 唯一 doc 数
  • 平均每个 query 的正样本数
  • 平均每个 query 的负样本数

11.2 分布指标

  • 文本长度分布
  • 标签分布
  • 场景分布
  • 来源分布
  • 热门 / 长尾分布

11.3 质量指标

  • 重复率
  • 近重复率
  • 空文本比例
  • 模板文本比例
  • 乱码比例
  • 标签冲突率

11.4 难度指标

  • hard negative 占比
  • 高混淆样本占比
  • 多跳或长文本样本占比

如果这些指标长期不看,训练往往会变成“看 loss 猜问题”。


12. 一个很实用的数据诊断思路

如果模型效果不好,可以先按下面顺序排查:

  1. 看训练和测试是否泄漏
  2. 看是否有大量重复样本
  3. 看标签是否冲突
  4. 看负样本是不是太简单
  5. 看热门样本是否占比过高
  6. 看长尾 query 是否覆盖不足
  7. 看评估集是否真的接近线上

这条排查顺序很实用,因为很多问题根本不是模型结构导致的。


13. 数据版本管理为什么重要

如果你每次训练都在用“最新数据”,但没有快照和版本号,后面几乎一定会遇到这类问题:

  • 效果变好了,不知道是哪批数据带来的
  • 效果变差了,不知道是哪次清洗规则改坏了
  • badcase 修好了,但下个版本又回来了

所以至少要给下面这些东西做版本管理:

  • 原始数据快照
  • 清洗规则版本
  • 样本构造规则版本
  • hard negative 生成版本
  • 评估集版本
  • badcase 集版本

14. 一个适合落地的默认做法

如果你现在要从零做一套数据处理和数据集评估流程,可以先按这个 baseline 走:

  1. 先定义样本单位和标签规范
  2. 从业务日志和结构化关系里构造正样本
  3. 补随机负样本、同域负样本和 hard negative
  4. 做基础清洗、去重和归一化
  5. 统计长度分布、标签分布、场景分布和来源分布
  6. query_iddoc_id 切 train / valid / test
  7. 建一份小而稳定的标准评估集
  8. 再维护一份持续追加的 badcase 集
  9. 每次训练都固定跑离线评估和分层评估
  10. 把线上 badcase 持续回流

这套流程不花哨,但很稳,也最容易持续迭代。


15. 一份可直接抄的检查清单

  1. 是否明确了样本单位
  2. 是否定义了标注规则
  3. 是否做了基础清洗和噪声清洗
  4. 是否做了完全去重和近重复检查
  5. 是否统计了长度分布和标签分布
  6. 是否区分了热门和长尾样本
  7. 是否统计了正样本和负样本来源
  8. 是否有足够强的 hard negative
  9. 是否避免了 train / test 泄漏
  10. 是否有稳定的评估集和 badcase 集
  11. 是否给数据和规则做了版本管理

16. 一句话总结

数据处理和数据集评估的核心不是“把脏数据洗干净”,而是:

让训练数据更真实、分布更合理、标签更稳定、评估更可信。

对 embedding 项目来说,很多时候真正决定模型上限的,不是你换了多大的模型,而是你把数据治理做到了什么程度。

Drogon 学习笔记

1. Drogon 是什么

Drogon 是一个基于 C++ 的现代 Web 应用框架,适合用来开发:

  • 高性能 HTTP API 服务
  • C++ 网关服务
  • 需要低延迟的业务接口
  • WebSocket 服务
  • 需要接 MySQL / PostgreSQL / Redis 的后端系统

它的几个核心特点:

  • 性能高,底层是非阻塞 IO
  • 支持异步编程
  • 支持路由、控制器、过滤器、中间件
  • 支持数据库、Redis、Session、文件上传
  • 支持 HTTP Client,也能调用其他内部服务
  • 支持 C++20 协程

如果先抓核心区别,可以这样记:FastAPI 更偏 Python 生态里的高效 API 框架,Drogon 则更像适合高性能 C++ 服务的全功能 Web 框架。

2. 为什么很多后端项目会用 Drogon

如果你的系统有这些特点:

  • 接口并发高
  • 对延迟敏感
  • 业务层想用 C++ 实现
  • 需要把网络、数据库、缓存、会话统一放在一个服务里

那 Drogon 会很合适。

尤其是在你现在这类 AI 项目架构里,Drogon 很适合承担:

  • 对外 API 网关
  • 会话和鉴权层
  • 请求转发层
  • MySQL 持久化层
  • Redis 状态层

而 Python 生态仍然更适合承担:

  • 模型推理
  • 文档解析
  • RAG 流程
  • Celery 异步任务

因此,很多混合架构会拆成:

  • Drogon:对外服务、状态治理、性能敏感层
  • FastAPI:内部模型服务、任务接口
  • Celery:后台异步执行

3. 安装和工程初始化

Drogon 的安装方式比 FastAPI 重一些,因为它是 C++ 框架,需要编译环境和依赖。

常见方式有:

  • vcpkg
  • 源码编译
  • Docker

如果你想先快速上手,最常见的是源码安装:

1
2
3
4
5
6
7
8
git clone https://github.com/drogonframework/drogon.git
cd drogon
git submodule update --init
mkdir build
cd build
cmake ..
make -j4
sudo make install

安装完成后,通常还会有一个命令行工具:

1
drogon_ctl

它常用来:

  • 创建项目骨架
  • 创建 Controller
  • 创建 Filter
  • 生成 ORM Model

例如创建一个项目:

1
drogon_ctl create project demo

4. 最小可运行示例

和 FastAPI 的 main.py 一样,Drogon 也可以先从一个最小服务开始。

4.1 CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
cmake_minimum_required(VERSION 3.16)
project(drogon_demo)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(Drogon CONFIG REQUIRED)

add_executable(${PROJECT_NAME} main.cc)
target_link_libraries(${PROJECT_NAME} PRIVATE Drogon::Drogon)

4.2 main.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <drogon/drogon.h>

using namespace drogon;

int main()
{
app().registerHandler(
"/",
[](const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
Json::Value json;
json["message"] = "hello drogon";
callback(HttpResponse::newHttpJsonResponse(json));
},
{Get});

app().addListener("0.0.0.0", 8080);
app().setThreadNum(1);
app().run();
}

编译和运行:

1
2
3
cmake -S . -B build
cmake --build build
./build/drogon_demo

访问:

  • http://127.0.0.1:8080/

如果对照 FastAPI,可以这样看:

  • app = FastAPI() 对应 app()
  • @app.get("/") 对应 registerHandler("/", ...)
  • return {...} 对应 callback(HttpResponse::newHttpJsonResponse(...))

5. Drogon 的基本工作方式

可以把一次请求理解成下面这条链路:

  1. 客户端发来 HTTP 请求
  2. Drogon 根据路径和方法匹配路由
  3. 先经过中间件和过滤器
  4. 进入 Handler 或 Controller 方法
  5. 业务代码处理请求
  6. 构造 HttpResponse
  7. 通过 callback 返回给客户端

和 FastAPI 很像,但有一个非常重要的区别:

Drogon 不会像 FastAPI 那样自动把 Python 函数返回值转成 JSON,你需要更显式地构造响应对象。

6. 直接注册 Handler

最简单的方式就是直接在 main() 里注册路由。

1
2
3
4
5
6
7
8
9
app().registerHandler(
"/health",
[](const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
Json::Value json;
json["status"] = "ok";
callback(HttpResponse::newHttpJsonResponse(json));
},
{Get});

适合:

  • 学习阶段
  • 很小的 demo
  • 临时测试接口

但项目一大,通常更推荐 Controller。

7. 三种常见 Controller

Drogon 常见的控制器主要有三类:

  • HttpSimpleController
  • HttpController
  • WebSocketController

7.1 HttpSimpleController

适合单个简单接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <drogon/HttpSimpleController.h>

class Ping : public drogon::HttpSimpleController<Ping>
{
public:
PATH_LIST_BEGIN
PATH_ADD("/ping", drogon::Get);
PATH_LIST_END

void asyncHandleHttpRequest(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback) override
{
Json::Value json;
json["message"] = "pong";
callback(drogon::HttpResponse::newHttpJsonResponse(json));
}
};

7.2 HttpController

适合一组 REST 风格接口,功能更完整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <drogon/HttpController.h>

namespace api::v1
{
class User : public drogon::HttpController<User>
{
public:
METHOD_LIST_BEGIN
ADD_METHOD_TO(User::getUser, "/api/v1/users/{id}", drogon::Get);
ADD_METHOD_TO(User::createUser, "/api/v1/users", drogon::Post);
METHOD_LIST_END

void getUser(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback,
int id) const
{
Json::Value json;
json["id"] = id;
callback(drogon::HttpResponse::newHttpJsonResponse(json));
}

void createUser(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback) const
{
auto jsonPtr = req->getJsonObject();

Json::Value json;
json["name"] = jsonPtr ? (*jsonPtr)["name"].asString() : "";
callback(drogon::HttpResponse::newHttpJsonResponse(json));
}
};
} // namespace api::v1

7.3 WebSocketController

适合:

  • 实时聊天
  • 实时协作
  • 推送类业务

如果你后面要做双向实时通信,可以重点看这一类;如果只是 AI 输出流式结果,很多时候 SSE 就够了,不一定非要 WebSocket。

8. 路径参数、查询参数、JSON 请求体

这部分可以类比 FastAPI 的:

  • 路径参数
  • 查询参数
  • 请求体

8.1 路径参数

在 Drogon 里,路径参数可以直接映射到函数参数。

1
ADD_METHOD_TO(BookController::getBook, "/api/v1/books/{book_id}", drogon::Get);

对应的处理函数:

1
2
3
4
void getBook(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback,
int bookId) const;

放到 FastAPI 的语境里,大致对应:

1
2
3
@app.get("/books/{book_id}")
def get_book(book_id: int):
...

8.2 查询参数

查询参数通常从 req 里读取:

1
2
auto keyword = req->getParameter("keyword");
auto page = req->getParameter("page");

例如:

1
/api/v1/search?keyword=llm&page=1

8.3 JSON 请求体

如果客户端发的是 JSON,请求体常见写法是:

1
2
3
4
5
6
7
8
9
10
auto jsonPtr = req->getJsonObject();
if (!jsonPtr)
{
auto resp = drogon::HttpResponse::newHttpResponse();
resp->setStatusCode(drogon::k400BadRequest);
callback(resp);
return;
}

std::string name = (*jsonPtr)["name"].asString();

结论:

  • 路径参数:函数参数映射
  • 查询参数:req->getParameter()
  • JSON 请求体:req->getJsonObject()

9. JSON 响应和状态码

Drogon 里最常见的响应方式是手动构造 JSON。

1
2
3
4
5
6
7
Json::Value json;
json["message"] = "created";
json["id"] = 1001;

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k201Created);
callback(resp);

这和 FastAPI 的差异在于:

  • FastAPI 更自动
  • Drogon 更显式

显式的好处是:

  • 更清楚知道返回了什么
  • 更方便做细粒度控制
  • 更符合 C++ 项目风格

10. 配置文件 config.json

很多 Drogon 项目不会把所有配置都写死在 main() 里,而是放到配置文件。

最常见的主程序写法是:

1
2
3
4
5
6
7
#include <drogon/drogon.h>

int main()
{
drogon::app().loadConfigFile("config.json");
drogon::app().run();
}

一个简化版配置示意:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"listeners": [
{
"address": "0.0.0.0",
"port": 8080
}
],
"db_clients": [
{
"name": "default",
"rdbms": "mysql",
"host": "127.0.0.1",
"port": 3306,
"dbname": "demo",
"user": "root",
"passwd": "123456",
"is_fast": false,
"number_of_connections": 4
}
],
"redis_clients": [
{
"name": "default",
"host": "127.0.0.1",
"port": 6379,
"number_of_connections": 2
}
],
"app": {
"number_of_threads": 4,
"enable_session": true,
"upload_path": "uploads",
"client_max_body_size": "50M"
}
}

这里最重要的点是:

  • listeners:监听地址和端口
  • db_clients:数据库连接
  • redis_clients:Redis 连接
  • app:线程数、Session、上传路径等全局设置

11. 过滤器和中间件

这部分很像 FastAPI 的依赖、鉴权逻辑、中间件,但 Drogon 拆得更明确。

简单理解:

  • Filter:更像“接口前置校验”
  • Middleware:更像“围绕请求处理链做前后处理”

例如做一个简单鉴权 Filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <drogon/HttpFilter.h>

class AuthFilter : public drogon::HttpFilter<AuthFilter>
{
public:
void doFilter(const drogon::HttpRequestPtr &req,
FilterCallback &&fcb,
FilterChainCallback &&fccb) override
{
auto token = req->getHeader("x-api-key");
if (token != "secret")
{
Json::Value json;
json["error"] = "unauthorized";

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k401Unauthorized);
fcb(resp);
return;
}

fccb();
}
};

挂到某个接口上:

1
ADD_METHOD_TO(User::getUser, "/api/v1/users/{id}", drogon::Get, "AuthFilter");

适合抽出来做的逻辑通常有:

  • 登录校验
  • API Key 校验
  • 频率限制
  • 内网访问限制
  • 统一日志

12. Session

Drogon 内置了 Session 支持,但默认不是强制开启的。

它适合做:

  • 登录态
  • 短期会话数据
  • 访问频率控制
  • 一些轻量服务端状态

开启方式可以在配置文件里写:

1
2
3
4
5
{
"app": {
"enable_session": true
}
}

或者代码里显式开启:

1
drogon::app().enableSession(1200);

使用示例:

1
2
3
4
5
6
req->session()->insert("user_id", 1001);

if (req->session()->find("user_id"))
{
auto userId = req->session()->get<int>("user_id");
}

注意:

Session 依赖 Cookie。如果客户端不保存 Cookie,每次请求都可能被当成新会话。

13. 文件上传

在 AI 项目里,文件上传非常常见,比如:

  • 上传 PDF
  • 上传图片
  • 上传音频
  • 上传数据文件

Drogon 常用 MultiPartParser 来处理 multipart 上传。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <drogon/drogon.h>

void upload(const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback)
{
drogon::MultiPartParser parser;
if (parser.parse(req) != 0 || parser.getFiles().empty())
{
Json::Value json;
json["error"] = "no file";

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k400BadRequest);
callback(resp);
return;
}

auto file = parser.getFiles()[0];
file.save("./uploads");

Json::Value json;
json["filename"] = file.getFileName();
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
callback(resp);
}

实际项目里要特别注意:

  • 不要直接信任原始文件名
  • 要限制文件大小
  • 要校验扩展名和 MIME 类型
  • 最好自己生成保存文件名

对于 AI 系统,更推荐的流程通常是:

  1. Drogon 接收上传
  2. 保存到磁盘或对象存储
  3. 写一条数据库记录
  4. 调内部 FastAPI / Celery 提交处理任务

14. 数据库访问:DbClient 和 ORM

Drogon 自带数据库支持,常见关系型数据库包括:

  • MySQL
  • PostgreSQL
  • SQLite3

最常见的是通过 DbClient 执行 SQL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
auto client = drogon::app().getDbClient();

client->execSqlAsync(
"select id, name from users where id = ?",
[callback](const drogon::orm::Result &result) {
Json::Value json;

if (result.empty())
{
json["error"] = "not found";
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k404NotFound);
callback(resp);
return;
}

json["id"] = result[0]["id"].as<int>();
json["name"] = result[0]["name"].as<std::string>();
callback(drogon::HttpResponse::newHttpJsonResponse(json));
},
[callback](const drogon::orm::DrogonDbException &e) {
Json::Value json;
json["error"] = e.base().what();
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k500InternalServerError);
callback(resp);
},
1001);

要点:

  • MySQL 占位符通常用 ?
  • PostgreSQL 占位符通常用 $1$2
  • 异步接口更符合 Drogon 的整体风格
  • 不要在回调里做很重的阻塞操作

如果你不想手写太多 SQL,还可以用 drogon_ctl 根据表结构生成 ORM Model。

适合:

  • 表结构比较稳定
  • 想减少手写样板代码
  • 想把数据库访问写得更结构化

15. Redis 支持

Drogon 也支持 Redis,而且同样是异步风格。

如果已经在 config.json 里配置了 redis_clients,运行后就可以获取客户端:

1
auto redisClient = drogon::app().getRedisClient();

执行命令示例:

1
2
3
4
5
6
7
8
9
redisClient->execCommandAsync(
[](const drogon::nosql::RedisResult &r) {
LOG_INFO << "redis ok: " << r.asString();
},
[](const std::exception &err) {
LOG_ERROR << "redis error: " << err.what();
},
"get session:%s",
"1001");

它适合做:

  • 缓存
  • 任务状态存储
  • 限流计数器
  • 会话辅助状态
  • 流式输出中间层

如果把你现在的整条链路连起来理解,就是:

  • Drogon:对外接口
  • Redis:缓存 / 状态 / 消息中间层
  • FastAPI / Celery:后台处理

16. C++20 协程 Task<>

如果你用的是较新的编译器,Drogon 支持协程,这可以让异步代码更接近同步写法。

例如数据库查询可以写成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <drogon/drogon.h>

drogon::Task<drogon::HttpResponsePtr> getUserCoro(int userId)
{
auto db = drogon::app().getDbClient();

try
{
auto result = co_await db->execSqlCoro(
"select id, name from users where id = ?",
userId);

Json::Value json;
json["id"] = result[0]["id"].as<int>();
json["name"] = result[0]["name"].as<std::string>();
co_return drogon::HttpResponse::newHttpJsonResponse(json);
}
catch (const drogon::orm::DrogonDbException &e)
{
Json::Value json;
json["error"] = e.base().what();
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k500InternalServerError);
co_return resp;
}
}

它的好处是:

  • 少写回调嵌套
  • 控制流更清晰
  • 复杂异步逻辑更好维护

但你要注意:

  • 需要合适的编译器和 C++20 支持
  • 协程让代码更好写,不代表可以随便阻塞线程

17. Drogon 和 FastAPI 怎么选

这两个框架不是简单的“谁替代谁”,而是各自适合不同层。

维度 Drogon FastAPI
语言 C++ Python
性能 更高,适合性能敏感层 足够高,开发更快
开发效率 相对慢一些 很高
参数校验 需要自己控制更多细节 自动化更强
生态 系统层、服务层强 AI、数据、模型生态强
适合位置 网关、核心服务、低延迟接口 模型服务、管理后台、内部 API

如果只抓结论,可以这样选:

  • 想更快把模型接口和内部服务跑起来,用 FastAPI
  • 想把网关、状态治理和性能敏感层放在 C++ 里,用 Drogon

18. Drogon + FastAPI + Celery 的典型配合方式

这是你当前项目最值得掌握的一块。

一个典型流程可以这样拆:

  1. 客户端请求先到 Drogon
  2. Drogon 做鉴权、Session、参数检查、落库
  3. Drogon 调用内部 FastAPI 接口提交任务
  4. FastAPI 再把重任务交给 Celery
  5. Celery Worker 真正处理文档、推理、索引构建
  6. 结果写回 MySQL / Redis
  7. Drogon 再把状态或结果返回给前端

你可以把它们理解成固定分工:

  • Drogon:网关和业务编排层
  • FastAPI:Python 服务入口
  • Celery:后台任务执行层
  • Redis:状态、缓存、队列中间层
  • MySQL:长期持久化

这套架构特别适合:

  • 文档上传处理
  • RAG 建库
  • 长耗时模型推理
  • 流式输出聊天系统

19. 一个比较常见的项目结构

如果用 Drogon 做 C++ 网关,一个比较实用的结构可以是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
cpp_gateway/
├── CMakeLists.txt
├── config.json
├── main.cc
├── controllers/
│ ├── HealthController.h
│ ├── HealthController.cc
│ ├── DocumentsController.h
│ └── DocumentsController.cc
├── filters/
│ ├── AuthFilter.h
│ └── AuthFilter.cc
├── services/
│ ├── TaskService.h
│ ├── TaskService.cc
│ ├── PythonClient.h
│ └── PythonClient.cc
├── repositories/
│ ├── DocumentRepo.h
│ └── DocumentRepo.cc
└── models/

建议职责划分:

  • controllers/:只处理 HTTP 层
  • filters/:鉴权、限流、前置校验
  • services/:业务编排、调内部服务
  • repositories/:数据库读写
  • models/:ORM 生成模型或业务数据结构

也就是说:

不要把所有逻辑都堆在 Controller 里。

20. 常见坑

20.1 在 Handler 里做阻塞重活

例如:

  • 大文件解析
  • 模型推理
  • 长时间等待第三方服务

这些都不应该直接堵在 Drogon 的请求处理线程里。

更合理的做法是:

  • Drogon 负责接请求
  • 重活交给 FastAPI / Celery

20.2 误以为性能高就应该把所有东西都写进 C++

不是。

对于 AI 项目:

  • 模型生态、向量库、文档处理库,大部分都在 Python 侧更成熟
  • Drogon 更适合做高性能 API 和状态治理

20.3 忽视文件上传安全

例如:

  • 直接使用原始文件名保存
  • 不限制 body 大小
  • 不校验扩展名

这在上传接口里很危险。

这样会导致服务端不断创建新 Session,白白浪费资源。

20.5 数据库回调里继续做阻塞操作

这会拖慢整个异步链路。

20.6 Controller 过胖

如果 Controller 里同时写:

  • 参数解析
  • SQL
  • Redis
  • 调内部 HTTP
  • 业务编排

后面会非常难维护。

21. 一套比较实用的学习顺序

如果你是为了做 AI 项目里的 C++ 网关,推荐这样学:

  1. 先跑通最小 Drogon 服务
  2. 学会 registerHandler()HttpController
  3. 学会 JSON 请求和响应
  4. 学会 config.json
  5. 学会 Filter 和 Session
  6. 学会文件上传
  7. 学会 DbClient 和 Redis
  8. 最后再接 FastAPI 和 Celery

22. 总结

Drogon 的核心价值在于:

  • 用 C++ 写高性能 Web 服务
  • 把 HTTP、数据库、Redis、Session、文件上传放到一套框架里
  • 适合做网关层、业务层、低延迟接口层

对于 AI 项目,可以把它理解成:

  • 对外 API 入口
  • 会话治理层
  • 状态协调层
  • 数据持久化入口
  • 调用 Python 服务的编排层

如果你把前面几份笔记连起来看,可以把它们理解成一条链:

  • Drogon:对外网关 / C++ 服务层
  • FastAPI:Python API 层
  • Redis:缓存 / 状态 / 中间层
  • Celery:后台异步任务层
  • MySQL:持久化数据层

23. 参考

Celery 学习笔记

1. Celery 是什么

Celery 是一个基于 Python 的分布式任务队列,用来把“耗时任务”从主业务流程里拆出去异步执行。

典型场景:

  • 发送邮件、短信、消息通知
  • 文件转码、图片处理、PDF 解析
  • 大模型推理、批量数据处理
  • 定时任务、周期性任务
  • 需要失败重试的后台任务

先记住最实用的分工:FastAPI 负责接请求和返回结果,Celery 负责把耗时任务放到后台慢慢执行。

2. 为什么要用 Celery

如果一个接口内部要做很久的事情,比如:

  • 调第三方接口要 10 秒
  • 处理一批文件要 2 分钟
  • 模型推理要占满 CPU / GPU

那就不适合一直阻塞 HTTP 请求。更合理的做法是:

  1. FastAPI 收到请求
  2. 把任务丢给 Celery
  3. 立刻返回一个 task_id
  4. 前端或调用方再通过 task_id 查询执行结果

这样接口响应会更快,系统也更容易扩展。

3. Celery 核心架构

Celery 主要由 3 个角色组成:

3.1 Broker

消息中间件,负责存放“待执行任务”。

常见选择:

  • Redis
  • RabbitMQ

它就是任务排队和等待消费的地方。

3.2 Worker

执行任务的工作进程。

它会不断从 Broker 中取任务,然后真正执行 Python 函数。

3.3 Result Backend

结果存储,用来保存任务状态和返回值。

常见选择:

  • Redis
  • Database
  • RPC

如果你只关心“任务有没有执行”,不关心返回结果,也可以不配置结果后端;但实际项目里通常还是会配置,方便查状态。

4. 工作流程

Celery 的完整链路可以这样理解:

  1. 客户端或 FastAPI 调用一个 Celery 任务
  2. 任务消息被发送到 Broker
  3. Worker 从 Broker 取出任务
  4. Worker 执行任务函数
  5. 执行结果和状态写入 Result Backend
  6. 业务侧通过 task_id 查询任务状态和结果

对应的状态通常有:

  • PENDING:任务还没开始,或者还查不到
  • STARTED:任务已开始执行
  • SUCCESS:执行成功
  • FAILURE:执行失败
  • RETRY:正在重试
  • REVOKED:任务被撤销

说明:

  • 默认不一定会看到 STARTED
  • 如果想更明确地看到运行中状态,可以开启 task_track_started=True

5. 安装

1
pip install celery redis

如果使用 Redis 作为 Broker 和 Result Backend,需要先启动 Redis。

6. 最小可运行示例

6.1 编写 Celery 应用

文件:celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
from celery import Celery

app = Celery(
"demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

app.conf.update(
task_track_started=True,
timezone="Asia/Shanghai",
)

说明:

  • /1 表示 Redis 的第 1 个逻辑库,用来当 Broker
  • /2 表示 Redis 的第 2 个逻辑库,用来存任务结果
  • Broker 和 Backend 分开写更清晰,排查问题也更方便

6.2 定义任务

文件:tasks.py

1
2
3
4
5
6
7
8
import time

from celery_app import app

@app.task
def add(a: int, b: int) -> int:
time.sleep(3)
return a + b

这里的 add 虽然是普通 Python 函数,但加了 @app.task 之后,它就成了 Celery 任务。

6.3 启动 Worker

1
celery -A tasks worker -l info

含义:

  • -A tasks:表示 Celery 应用和任务定义在 tasks.py
  • worker:启动 worker 进程
  • -l info:日志级别为 info

6.4 提交任务

1
2
3
4
from tasks import add

result = add.delay(3, 5)
print(result.id)

注意:

  • delay() 只是“提交任务”
  • 它不会在当前进程里同步执行
  • 返回的是 AsyncResult
  • 真正执行任务的是独立的 Worker

6.5 查询任务结果

1
2
3
4
5
6
7
8
9
10
11
from celery.result import AsyncResult

from celery_app import app

task = AsyncResult("任务ID", app=app)

print(task.status)
print(task.successful())

if task.successful():
print(task.get())

常用属性和方法:

  • task.id:任务 ID
  • task.status:任务状态
  • task.successful():是否执行成功
  • task.failed():是否执行失败
  • task.ready():任务是否结束
  • task.get():获取结果,可能会阻塞

7. delay()apply_async() 的区别

7.1 delay()

最简单的调用方式,其实就是 apply_async() 的简写。

1
add.delay(3, 5)

适合:

  • 普通异步提交
  • 不需要额外参数时

7.2 apply_async()

更灵活,支持倒计时、定时执行、路由到指定队列等。

1
add.apply_async(args=[3, 5], countdown=10)

常见参数:

  • args / kwargs:任务参数
  • countdown=10:10 秒后执行
  • eta=...:指定某个时间点执行
  • expires=60:60 秒后任务过期
  • queue="email":指定进入哪个队列
  • routing_key="task.email":指定路由键

结论:

  • 简单任务用 delay()
  • 有调度需求时用 apply_async()

8. 任务重试

很多后台任务依赖外部资源,比如:

  • 第三方 API
  • 数据库
  • 对象存储
  • 模型服务

这些服务不稳定时,直接失败往往不合理,更适合自动重试。

1
2
3
4
5
6
7
8
9
from celery_app import app

@app.task(bind=True, max_retries=3, default_retry_delay=5)
def fetch_remote_data(self, url: str):
try:
# 这里假设会调用外部接口
raise RuntimeError("temporary error")
except Exception as exc:
raise self.retry(exc=exc)

说明:

  • bind=True 后,任务第一个参数会变成 self
  • self.retry(...) 可以重新入队
  • max_retries=3 表示最多重试 3 次
  • default_retry_delay=5 表示默认每次间隔 5 秒

适合有“短暂失败可能恢复”的任务,不适合参数本身就错误的任务。

9. 队列与路由

当任务类型越来越多时,通常不会把所有任务都丢到一个队列里。

常见拆法:

  • default:普通任务
  • email:发邮件
  • file:文件处理
  • gpu:模型推理

这样可以让不同 Worker 只处理自己擅长的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from celery import Celery
from kombu import Exchange, Queue

app = Celery("demo")

app.conf.task_queues = (
Queue("default", Exchange("default"), routing_key="default"),
Queue("file", Exchange("file"), routing_key="file.process"),
Queue("gpu", Exchange("gpu"), routing_key="gpu.infer"),
)

app.conf.task_routes = {
"tasks.process_pdf": {
"queue": "file",
"routing_key": "file.process",
},
"tasks.run_inference": {
"queue": "gpu",
"routing_key": "gpu.infer",
},
}

启动指定队列的 Worker:

1
2
celery -A tasks worker -Q file -l info
celery -A tasks worker -Q gpu -l info

这样做的好处:

  • 避免耗时任务阻塞普通任务
  • 可以按机器能力拆分 Worker
  • GPU 任务和 CPU 任务可以隔离

10. 定时任务

Celery 可以配合 celery beat 做周期任务。

例如:

  • 每天凌晨清理临时文件
  • 每小时刷新缓存
  • 每 10 分钟同步一次数据
1
2
3
4
5
6
7
8
from celery.schedules import crontab

app.conf.beat_schedule = {
"cleanup-temp-files": {
"task": "tasks.cleanup_temp_files",
"schedule": crontab(hour=3, minute=0),
},
}

启动:

1
celery -A tasks beat -l info

如果要实际执行任务,除了 beat 还要有 worker 在跑。

11. FastAPI 和 Celery 怎么配合

最常见的配合方式是:

  1. FastAPI 暴露 HTTP 接口
  2. 接口中调用 Celery 提交任务
  3. Celery Worker 在后台执行
  4. 再通过一个查询接口返回任务状态/结果

11.1 什么时候用 BackgroundTasks,什么时候用 Celery

FastAPI 自带 BackgroundTasks,但它和 Celery 不是一类东西。

BackgroundTasks 更适合:

  • 很轻量的后台操作
  • 跟当前服务进程强绑定
  • 不要求失败重试
  • 不要求多机器扩展

Celery 更适合:

  • 任务耗时长
  • 任务量大
  • 需要重试
  • 需要队列隔离
  • 需要多个 Worker 扩展
  • 需要独立于 Web 进程运行

一句话:

轻任务用 BackgroundTasks,重任务用 Celery。

12. FastAPI + Celery 案例 1:提交任务并查询状态

这是最常见的模式。

12.1 目录结构

1
2
3
4
project/
├── main.py
├── celery_app.py
└── tasks.py

12.2 celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
from celery import Celery

celery_app = Celery(
"fastapi_demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

celery_app.conf.update(
task_track_started=True,
result_expires=3600,
)

12.3 tasks.py

1
2
3
4
5
6
7
8
import time

from celery_app import celery_app

@celery_app.task(name="tasks.long_time_add")
def long_time_add(a: int, b: int) -> int:
time.sleep(10)
return a + b

12.4 main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from fastapi import FastAPI
from celery.result import AsyncResult

from celery_app import celery_app
from tasks import long_time_add

app = FastAPI()

@app.post("/tasks/add")
def create_add_task(a: int, b: int):
task = long_time_add.delay(a, b)
return {
"message": "task submitted",
"task_id": task.id,
}

@app.get("/tasks/{task_id}")
def get_task_result(task_id: str):
task = AsyncResult(task_id, app=celery_app)

response = {
"task_id": task.id,
"status": task.status,
}

if task.successful():
response["result"] = task.result
elif task.failed():
response["error"] = str(task.result)

return response

12.5 启动方式

启动 FastAPI:

1
uvicorn main:app --reload

启动 Worker:

1
celery -A tasks worker -l info

请求流程:

  1. POST /tasks/add?a=3&b=5
  2. 接口立即返回 task_id
  3. 再调 GET /tasks/{task_id}
  4. 看任务是 PENDINGSTARTED 还是 SUCCESS

这个模式非常适合前后端分离项目。

13. FastAPI + Celery 案例 2:文件处理任务

这个场景很常见,比如:

  • 上传 PDF 后做文本提取
  • 上传图片后做缩略图
  • 上传音频后做转写

13.1 设计原则

不要把 UploadFile、数据库连接、请求对象直接传给 Celery。

原因:

  • Celery 任务参数通常需要能序列化
  • 这些对象往往不能直接被序列化
  • 跨进程后对象本身也失效了

正确做法:

  1. FastAPI 先把文件保存到磁盘或对象存储
  2. file_pathfile_url 传给 Celery
  3. Worker 根据路径再去处理文件

13.2 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pathlib import Path
import shutil
import uuid

from fastapi import FastAPI, File, UploadFile

from tasks import process_pdf

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/files/pdf")
def upload_pdf(file: UploadFile = File(...)):
file_id = f"{uuid.uuid4()}_{file.filename}"
save_path = UPLOAD_DIR / file_id

with save_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)

task = process_pdf.delay(str(save_path))

return {
"file_path": str(save_path),
"task_id": task.id,
}

对应的 Celery 任务:

1
2
3
4
5
6
7
8
9
from celery_app import celery_app

@celery_app.task(name="tasks.process_pdf")
def process_pdf(file_path: str):
# 这里做 PDF 文本抽取、切分、向量化等操作
return {
"file_path": file_path,
"message": "processed",
}

这个思路很适合 AI 项目里的文档解析链路。

14. FastAPI + Celery 案例 3:模型推理任务

对于 AI 模型开发,这个场景更有代表性。

例如:

  • 文本摘要
  • 图片分类
  • 批量 embedding
  • 长文本抽取

14.1 为什么适合 Celery

因为模型推理通常具备这些特点:

  • 耗时不稳定
  • 资源消耗高
  • 可能需要排队
  • 可能要分 CPU / GPU Worker

14.2 一个简化版例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fastapi import FastAPI

from tasks import run_inference

app = FastAPI()

@app.post("/infer")
def create_inference_task(text: str):
task = run_inference.apply_async(
kwargs={"text": text},
queue="gpu",
routing_key="gpu.infer",
)
return {"task_id": task.id}

对应任务:

1
2
3
4
5
6
7
8
9
10
11
12
import time

from celery_app import celery_app

@celery_app.task(name="tasks.run_inference")
def run_inference(text: str):
time.sleep(5)
return {
"text": text,
"label": "positive",
"score": 0.98,
}

这里的重点不在模型代码本身,而在架构思路:

  • API 层只负责接收请求
  • 推理层在 Worker 中执行
  • GPU 任务路由到专门队列
  • 可以单独扩容推理 Worker

如果将来一张 GPU 卡只跑一个 Worker,也很好管理。

15. 实战里很重要的细节

15.1 任务参数尽量简单

建议只传这些:

  • 字符串
  • 数字
  • 布尔值
  • 列表 / 字典
  • 文件路径
  • 数据库主键 ID

不建议直接传:

  • 数据库 Session
  • 请求对象 Request
  • UploadFile
  • 大模型实例
  • 打开的文件句柄

原则:

任务参数最好是“可序列化、可重建、可追踪”的。

15.2 任务尽量幂等

幂等的意思是:同一个任务重复执行多次,结果最好一致,或者至少不会造成严重副作用。

因为这些情况都可能导致重复执行:

  • Worker 崩溃
  • 消息重复投递
  • 任务重试
  • 手动补偿

例如发通知、写数据库、扣费等场景,幂等性非常重要。

15.3 不要在 API 进程里执行重活

FastAPI 本身应该尽量保持:

  • 响应快
  • 逻辑薄
  • 只做参数校验、鉴权、任务分发

重计算、长耗时 IO、批处理,尽量放 Worker。

15.4 结果不是必须永久保存

很多项目会犯一个问题:任务结果一直存 Redis,时间一久就堆积很多垃圾数据。

可以设置:

  • result_expires
  • 任务完成后主动清理
  • 只保留必要结果,不保留大对象

如果结果很大,不要直接把大块内容塞进 Redis,建议落库或存文件,再返回引用地址。

15.5 错误处理不要只靠日志

建议至少做到:

  • 任务失败后能看到 FAILURE
  • 能根据 task_id 查到失败原因
  • 关键任务开启自动重试
  • 必要时记录业务日志或落库

15.6 CPU / GPU / IO 任务分开

这点在 AI 项目里尤其重要。

建议:

  • 文件下载、上传、请求第三方 API 放一个队列
  • CPU 预处理放一个队列
  • GPU 推理放一个队列

这样不会出现“一个长推理任务把所有普通任务都堵住”的问题。

15.7 Worker 并发要按任务类型调

Celery 的并发不是越大越好,要看任务类型。

一般经验:

  • IO 密集型任务可以适当提高并发
  • CPU 密集型任务并发通常不要超过 CPU 核心数太多
  • GPU 推理任务通常一个 Worker 进程绑定一张卡,甚至一个队列只开 --concurrency=1

例如 GPU Worker 常见写法:

1
celery -A tasks worker -Q gpu --concurrency=1 -l info

如果推理模型本身已经很吃显存,再盲目开高并发,反而容易把机器打爆。

16. 常见坑

16.1 任务提交成功,但一直不执行

通常排查:

  • Redis 是否启动
  • Worker 是否启动
  • -A 指向的模块是否正确
  • 任务有没有注册到当前 Worker
  • 是否路由到了某个没人消费的队列

16.2 能查到任务 ID,但拿不到结果

通常是:

  • 没配置 backend
  • task_id 查错了
  • 结果过期被清掉了

16.3 任务函数里引用了 Web 层对象

比如:

  • 直接把 Request 传进任务
  • 把 SQLAlchemy Session 传进任务
  • 把上传文件对象传进任务

这类设计一般都会有问题。任务应该依赖“可序列化参数”,不要依赖当前请求上下文。

16.4 把 Celery 当成同步 RPC 用

有些人虽然用了 Celery,但提交完任务后立刻 .get() 等待结果,这样就又退回同步调用了。

如果接口本身必须立刻拿到结果,那就要重新评估:

  • 这个任务是否真的适合放到 Celery
  • 是否应该走同步推理接口
  • 是否应该拆成“提交任务 + 轮询结果”

17. 一套比较实用的学习顺序

建议按这个顺序掌握:

  1. 先跑通最小示例
  2. 理解 delay()AsyncResult
  3. 学会查任务状态
  4. 学会重试
  5. 学会队列与路由
  6. 再接入 FastAPI
  7. 最后再做定时任务、监控、扩容

18. 总结

Celery 的本质就是:

  • 把耗时任务从主线程 / 主服务里剥离出去
  • 通过消息队列异步执行
  • 通过 Worker 扩展执行能力
  • 通过结果后端查询状态和结果

对于 FastAPI 项目,可以把它理解成一组固定分工:

  • FastAPI:对外提供 API
  • Redis / RabbitMQ:负责排队
  • Celery Worker:真正执行任务
  • Result Backend:保存状态和结果

如果项目里已经出现“接口慢、任务重、需要重试、需要排队、需要异步处理”的问题,那 Celery 基本就是很自然的选择。

19. 参考

Ray 学习笔记

截至 2026-03-30,这份笔记主要参考:

  • Ray 官方文档 2.54.x
  • Celery 官方稳定版文档 5.6.x

如果当前目标是做 AI 模型开发,可以先抓住这个区分:

Celery 更像“任务队列和后台调度器”,Ray 更像“分布式计算和 GPU 资源调度层”。


1. Ray 是什么

Ray 是一个 Python 分布式计算框架,用来把普通 Python 代码扩展到:

  • 多进程
  • 多机
  • 多 GPU
  • 分布式训练
  • 批量推理
  • 在线推理服务

它的核心不是“消息队列”,而是“把 Python 函数和类,变成可调度的分布式任务”。

Ray 真正提供的是一套调度能力:普通的 Python 函数和类,加上 @ray.remote 之后,就能被当成分布式任务或远程 actor 来运行。


2. Ray 适合什么场景

Ray 典型适合这些场景:

  • 批量 embedding 生成
  • 多卡模型推理
  • 大规模数据预处理
  • 分布式训练
  • 超参搜索
  • 需要把模型常驻在 GPU 上的服务
  • 复杂 Python 工作流并行化

如果只是这些场景,通常不一定要用 Ray:

  • 发送短信、邮件、通知
  • 定时任务
  • 普通后台异步任务
  • 只需要重试、延迟执行、任务链

这些更偏 Celery 的强项。


3. Ray 的核心概念

Ray 入门先掌握 4 个东西:

3.1 Task

Task 就是“远程函数”。

特点:

  • 无状态
  • 适合并行执行独立任务
  • @ray.remote 标记

3.2 Actor

Actor 就是“远程对象”。

特点:

  • 有状态
  • 适合模型常驻内存 / GPU
  • 适合连接池、模型池、缓存、参数服务

3.3 ObjectRef

Ray 不会马上把结果返回给你,而是先返回一个“对象引用”。

  • xxx.remote() 返回的通常是 ObjectRef
  • ray.get() 才真正取回结果

这点很像 Future

3.4 Object Store

Ray 有自己的对象存储。

  • task / actor 的返回值会放到对象存储里
  • 大对象可以先 ray.put()
  • 后续多个任务共享这个引用,避免重复拷贝

4. 最小入门示例

4.1 安装

1
pip install -U "ray[default]"

如果你还要用训练或数据处理:

1
pip install -U "ray[train]" "ray[data]" torch

4.2 启动本地 Ray

最简单的方式:

1
2
3
import ray

ray.init()

说明:

  • ray.init() 会连接已有本地 Ray,或者启动一个新的本地实例
  • 如果你是 Celery worker 或生产环境进程,通常更建议显式连接已有集群:ray.init(address="auto")

4.3 第一个 Task

1
2
3
4
5
6
7
8
9
10
11
import ray

ray.init()

@ray.remote
def square(x: int) -> int:
return x * x

refs = [square.remote(i) for i in range(8)]
results = ray.get(refs)
print(results)

你要关注的是:

  • @ray.remote:把普通函数变成远程函数
  • square.remote(i):不是本地调用,而是提交任务
  • ray.get(refs):拿回结果

4.4 第一个 Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import ray

ray.init()

@ray.remote
class Counter:
def __init__(self):
self.value = 0

def incr(self, n: int = 1):
self.value += n
return self.value

def get(self):
return self.value

counter = Counter.remote()

ray.get(counter.incr.remote())
ray.get(counter.incr.remote(10))
print(ray.get(counter.get.remote()))

什么时候该用 Actor:

  • 模型加载很慢,不想每次任务都重新加载
  • 需要在内存里保留状态
  • 要做 GPU 常驻推理

4.5 共享大对象

1
2
3
4
5
6
7
8
9
10
11
12
import numpy as np
import ray

ray.init()

@ray.remote
def compute_sum(arr):
return arr.sum()

big_array_ref = ray.put(np.ones((10000, 1000)))
result = ray.get(compute_sum.remote(big_array_ref))
print(result)

经验:

  • 小参数直接传
  • 大对象尽量 ray.put()

5. Ray 常用 API

5.1 ray.init()

用于启动或连接 Ray。

常见写法:

1
2
3
ray.init()
ray.init(address="auto")
ray.init(address="ray://<head-ip>:10001")

含义:

  • ray.init():本地开发最方便
  • address="auto":只连接已有集群,如果没集群会报错,适合生产环境
  • ray://...:Ray Client 方式连接远端集群

5.2 ray.get()

取结果。

1
2
value = ray.get(ref)
values = ray.get(refs)

5.3 ray.wait()

适合任务很多时,先消费一部分结果,避免一次性堆太多任务或结果。

1
2
ready, pending = ray.wait(refs, num_returns=1)
result = ray.get(ready[0])

5.4 ray.put()

把大对象放进 Ray 对象存储。

1
obj_ref = ray.put(big_object)

6. Task 和 Actor 怎么选

场景 更适合
每个任务彼此独立、无状态 Task
模型要常驻内存 / GPU Actor
批量推理 worker 池 Actor
简单 map 并行 Task
连接池 / 缓存 / 参数服务 Actor

最常见的 AI 场景经验是:

  • CPU 并行清洗数据:Task
  • 模型推理:Actor
  • 分布式训练:优先用 Ray Train,而不是自己手搓 Ray Core

7. 从本地到集群

7.1 单机开发

直接:

1
ray.init()

7.2 手动启动单机 / 多机集群

头节点:

1
ray start --head --port=6379

工作节点:

1
ray start --address=<head-node-ip>:6379

Python 里连接:

1
2
3
import ray

ray.init(address="auto")

7.3 GPU 机器上限制可见显卡

如果你只想让 Ray 看到部分 GPU:

1
CUDA_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2

这在一台机器上做资源隔离时很有用。


8. 多张显卡怎么用

这部分是 Ray 很强的一块。

你先记住结论:

  • 单个任务 / Actor 用几张卡,就声明几张
  • 多个任务并行吃多卡,让 Ray 调度
  • 训练任务优先用 Ray Train
  • 批量推理优先用 Actor 池或 Ray Data

8.1 一张卡对应一个 Actor

这是最常见的“多卡推理”写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
import ray

ray.init(num_gpus=4)

@ray.remote(num_gpus=1)
class GPUWorker:
def info(self):
return {
"gpu_ids": ray.get_runtime_context().get_accelerator_ids()["GPU"],
"cuda_visible_devices": os.environ.get("CUDA_VISIBLE_DEVICES"),
}

workers = [GPUWorker.remote() for _ in range(4)]
print(ray.get([worker.info.remote() for worker in workers]))

效果:

  • 每个 Actor 占 1 张 GPU
  • Ray 会把不同 Actor 调度到不同卡
  • 大多数深度学习框架会自动遵守 CUDA_VISIBLE_DEVICES

8.2 单个任务使用多张卡

如果一个任务本身就要吃多卡:

1
2
3
4
5
6
7
8
9
10
import ray

ray.init(num_gpus=4)

@ray.remote(num_gpus=2)
def train_one_job():
import os
return os.environ.get("CUDA_VISIBLE_DEVICES")

print(ray.get(train_one_job.remote()))

这适合:

  • 单个训练任务需要 2 卡 / 4 卡
  • 单个推理进程要占多卡

8.3 分数 GPU

如果模型很小,也可以共享 GPU:

1
2
3
4
5
6
7
8
9
10
11
import ray
import time

ray.init(num_gpus=1)

@ray.remote(num_gpus=0.25)
def small_infer(x):
time.sleep(1)
return x

print(ray.get([small_infer.remote(i) for i in range(4)]))

注意:

  • 分数 GPU 只是“调度上的份额”
  • 显存控制还是你自己负责
  • 小模型、embedding、小批量推理更适合这样做

8.4 多卡训练优先用 Ray Train

如果你在做训练,不建议直接自己拼很多 @ray.remote

更推荐:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, get_device

def train_loop(config):
import torch

device = get_device()
model = torch.nn.Linear(10, 1).to(device)
x = torch.randn(32, 10, device=device)
y = model(x)
loss = y.mean()
loss.backward()

trainer = TorchTrainer(
train_loop,
scaling_config=ScalingConfig(
num_workers=4,
use_gpu=True,
),
)

trainer.fit()

这表示:

  • 4 个 worker
  • 每个 worker 1 张 GPU
  • 总共 4 张 GPU

8.5 一个 worker 吃多张卡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, get_devices

def train_loop(config):
import torch

devices = get_devices()
print(devices)

trainer = TorchTrainer(
train_loop,
scaling_config=ScalingConfig(
num_workers=2,
use_gpu=True,
resources_per_worker={"GPU": 2},
),
)

trainer.fit()

这表示:

  • 2 个 worker
  • 每个 worker 2 张 GPU
  • 总共 4 张 GPU

适合:

  • 每个 worker 本身要做张量并行
  • 每个训练进程要 2 卡或更多

8.6 需要成组预留资源时,用 Placement Group

有些任务需要“成组占住资源”,比如:

  • 2 个 actor 必须同时启动
  • 一个训练 job 要一次性拿到 2 CPU + 2 GPU
  • 希望资源尽量在同一台机器

这时用 Placement Group。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import ray
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

ray.init(num_gpus=2, num_cpus=8)

pg = placement_group(
[{"CPU": 2, "GPU": 1}, {"CPU": 2, "GPU": 1}],
strategy="STRICT_PACK",
)
ray.get(pg.ready())

@ray.remote(num_cpus=2, num_gpus=1)
class TrainerWorker:
def ready(self):
return "ok"

workers = [
TrainerWorker.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=i,
)
).remote()
for i in range(2)
]

print(ray.get([w.ready.remote() for w in workers]))

STRICT_PACK 的意思是:

  • 所有 bundle 必须放在同一台节点上
  • 如果放不下,就直接失败

这对多卡训练很常见。


9. AI 场景下更实用的写法

9.1 模型常驻 GPU 的 Actor

如果你做 embedding、rerank、分类、OCR、ASR,常见需求是:

  • 模型初始化很慢
  • 不想每个请求都重新加载模型
  • 想让不同 worker 常驻不同 GPU

这时最实用的方案往往是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class EmbeddingWorker:
def __init__(self, model_name: str):
from sentence_transformers import SentenceTransformer

self.model = SentenceTransformer(model_name, device="cuda")

def encode(self, texts: list[str]):
vectors = self.model.encode(texts, normalize_embeddings=True)
return vectors.tolist()

workers = [
EmbeddingWorker.remote("BAAI/bge-small-zh-v1.5")
for _ in range(2)
]

优势:

  • 模型只加载一次
  • Ray 自动把 worker 分到不同 GPU
  • 适合批量任务和服务化场景

9.2 批量推理更推荐 Ray Data

如果是大批量离线推理,比如:

  • 给 100 万条文本做 embedding
  • 大量图片跑分类 / OCR
  • 多卡批量推理

Ray Data 一般比你自己手写 for + remote() 更省心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pandas as pd
import ray

ray.init()

class EmbeddingPredictor:
def __init__(self, model_name: str):
from sentence_transformers import SentenceTransformer

self.model = SentenceTransformer(model_name, device="cuda")

def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
batch = batch.copy()
batch["embedding"] = self.model.encode(batch["text"].tolist()).tolist()
return batch

ds = ray.data.from_items(
[{"text": f"sample {i}"} for i in range(1000)]
)

result_ds = ds.map_batches(
EmbeddingPredictor,
fn_constructor_args=("BAAI/bge-small-zh-v1.5",),
batch_size=64,
batch_format="pandas",
compute=ray.data.ActorPoolStrategy(size=4),
num_gpus=1,
)

print(result_ds.take(2))

什么时候优先考虑 Ray Data:

  • 数据量大
  • 希望自动做并行读取和批处理
  • 想用 actor pool 跑多卡离线推理

10. Ray 和 Celery 的区别

很多人第一次会问:

我已经有 Celery 了,还要不要 Ray?

答案通常是:

不是替代关系,而是职责不同。

10.1 一张表看区别

维度 Celery Ray
核心定位 分布式任务队列 分布式计算框架
擅长 异步任务、重试、延迟、队列路由 并行计算、状态 Actor、GPU 调度
GPU 感知 很弱 很强
模型常驻 不擅长 很擅长
大对象共享 一般 很强
训练 / 多卡 / 集群算力 不擅长 强项
业务工作流编排 一般

10.2 什么时候只用 Celery

  • 发通知
  • 发邮件
  • 定时任务
  • 任务重试
  • 后台轻量业务任务

10.3 什么时候只用 Ray

  • 分布式训练
  • 多 GPU 推理
  • 批量 embedding
  • 大规模离线数据处理
  • 需要模型常驻内存 / GPU

10.4 什么时候 Ray + Celery 联合

这是很常见、也很推荐的工程做法:

  • Celery 负责“接住业务异步任务”
  • Ray 负责“真正吃 CPU / GPU 算力”

最典型架构:

1
2
3
4
5
6
FastAPI / Django
-> 提交 Celery 任务
-> Celery worker 收到任务
-> Celery worker 把重计算任务提交给 Ray
-> Ray task / actor 在 CPU / GPU 上执行
-> Celery 记录状态、重试、结果

这时职责很清晰:

  • Celery:任务排队、重试、路由、延迟调度、业务集成
  • Ray:并行计算、多机、多卡、模型常驻

11. Ray + Celery 推荐落地方式

这一节不是某个“官方一键集成组件”的说明,而是基于 Ray 官方能力和 Celery 官方能力整理出来的工程实践。

11.1 推荐原则

推荐这样设计:

  • 不要让 Celery 自己直接“管理 GPU”
  • 不要每个 Celery 任务里都临时启动一个 Ray 集群
  • Celery worker 只负责连接已有 Ray 集群并下发任务
  • 模型尽量放到 Ray Actor 里常驻

一句话:

Celery 做任务层,Ray 做算力层。

11.2 最实用的方案:Celery 任务内部调用 Ray Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from celery import Celery
import ray

celery_app = Celery(
"demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

def ensure_ray():
if not ray.is_initialized():
ray.init(address="auto", namespace="ml")

@ray.remote(num_gpus=1)
class EmbeddingWorker:
def __init__(self, model_name: str):
from sentence_transformers import SentenceTransformer

self.model = SentenceTransformer(model_name, device="cuda")

def encode(self, texts: list[str]):
return self.model.encode(texts).tolist()

def get_embedding_worker():
ensure_ray()
try:
return ray.get_actor("embedding_worker")
except ValueError:
return EmbeddingWorker.options(
name="embedding_worker",
lifetime="detached",
get_if_exists=True,
).remote("BAAI/bge-small-zh-v1.5")

@celery_app.task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=3,
)
def build_embeddings(self, texts: list[str]):
worker = get_embedding_worker()
ref = worker.encode.remote(texts)
vectors = ray.get(ref)
return {
"count": len(vectors),
"vectors": vectors,
}

这套写法的好处:

  • Celery 有重试和任务状态
  • Ray Actor 让模型只加载一次
  • GPU 真正由 Ray 调度
  • Celery 不直接碰 CUDA_VISIBLE_DEVICES

11.3 为什么这里要用 Named Actor

Named Actor 很适合下面这种需求:

  • 多个 Celery 任务都要访问同一个模型 worker
  • 模型很大,不想重复加载
  • 希望 worker 独立于单次任务生命周期存在

这里用到的 3 个点都很重要:

  • name="embedding_worker":给 actor 命名
  • get_if_exists=True:如果已有就复用
  • lifetime="detached":Celery 任务结束后 actor 依然活着

补充:

  • Detached Actor 不会自动回收
  • 不再需要时要手动 ray.kill(actor_handle),否则会一直占资源

11.4 如果要做多卡模型池

把一个 Named Actor 扩展成多个即可:

1
2
3
4
5
6
7
8
workers = [
EmbeddingWorker.options(
name=f"embedding_worker_{i}",
lifetime="detached",
get_if_exists=True,
).remote("BAAI/bge-small-zh-v1.5")
for i in range(4)
]

然后 Celery 里:

  • 轮询选一个
  • 按队列分流
  • 或者再包一个调度 Actor

11.5 Celery 队列怎么配合

重任务建议单独队列,比如:

1
build_embeddings.apply_async(args=[texts], queue="gpu")

对应 worker:

1
celery -A app worker -l info -Q gpu --concurrency=4

这样做的意义:

  • 把 GPU 相关任务跟普通业务任务隔离
  • Celery 侧也能做业务级限流

注意:

  • Celery 的 --concurrency 不等于 GPU 数
  • 真正的 GPU 分配还是 Ray 决定

12. 一个更合理的职责分层建议

如果你的项目同时有 Web、异步任务、模型推理,我更建议这样分:

推荐工具
Web 接口层 FastAPI / Django
任务编排层 Celery
分布式算力层 Ray
在线模型服务层 Ray Serve

这意味着:

  • 后台异步批处理:Celery + Ray
  • 在线低延迟模型服务:优先 Ray Serve

如果你把“在线推理 API”也硬塞进 Celery,一般不是最优解。


13. 常见坑

13.1 不要在循环里一边提交一边 ray.get()

错误思路:

1
2
for x in items:
result = ray.get(task.remote(x))

这样会把并行写成串行。

更合理:

1
2
refs = [task.remote(x) for x in items]
results = ray.get(refs)

13.2 任务别切得太细

如果单个任务非常轻,Ray 的调度开销会吃掉收益。

经验:

  • 小任务尽量 batch 化
  • 推理尽量做批处理

13.3 大对象别重复按值传

错误思路:

  • 每个 task 都传一份很大的模型参数

更合理:

  • ray.put()
  • 或者放进 Actor

13.4 GPU task 默认可能不复用 worker

Ray 文档提到:

  • 某些 GPU 任务可能会残留显存占用
  • 为避免资源泄漏,Ray 默认不会复用执行过 GPU task 的 worker 进程

如果你遇到 GPU 调度开销偏大,需要再回头看 worker 复用策略。

如果你确认自己的任务不会泄漏 GPU 资源,可以显式开启复用:

1
2
3
@ray.remote(num_gpus=1, max_calls=0)
def gpu_task():
...

13.5 Celery worker 不要偷偷起本地 Ray

在生产环境里,Celery 进程里更推荐:

1
ray.init(address="auto")

而不是:

1
ray.init()

原因:

  • ray.init() 可能在没有集群时直接起一个本地 Ray
  • 这样每台 Celery 机器都可能误起自己的小集群
  • 排查起来很麻烦

14. 一个现实中的选型建议

如果你的任务是下面这些,我建议这样选:

14.1 批量 embedding / rerank / OCR / ASR

优先:

  • Celery + Ray Actor
  • 数据量特别大时用 Ray Data

14.2 分布式训练

优先:

  • Ray Train

14.3 在线模型 API

优先:

  • Ray Serve

14.4 只是后台异步业务

优先:

  • Celery

15. 我建议你的学习顺序

如果你现在刚入门 Ray,可以按这个顺序学:

  1. ray.init()@ray.remoteray.get()
  2. Task 和 Actor 的区别
  3. ray.put()ray.wait()
  4. GPU 调度:num_gpus
  5. Named Actor / Detached Actor
  6. Placement Group
  7. Ray Data 批量推理
  8. Ray Train 多卡训练
  9. 再考虑和 Celery 联合

16. 官方文档和官方博客

下面这些我认为最值得先看。

16.1 Ray 官方文档

16.2 Celery 官方文档

16.3 官方博客 / 官方活动


17. 最后总结

你可以先把 Ray 理解成:

一个能把 Python 函数、对象、GPU、集群资源统一调度起来的分布式计算框架。

如果放到工程里:

  • Celery 解决“任务怎么排队、重试、延迟、编排”
  • Ray 解决“任务怎么真正并行地吃 CPU / GPU”

如果你后面是做 AI 系统,尤其是:

  • 多卡推理
  • 批量 embedding
  • 大模型训练 / 推理

那 Ray 很值得认真学。


18. FastAPI + Celery + Ray + 4 张 GPU 最小可运行项目

下面给你一份“单机 4 张 GPU”的最小结构。

目标:

  • FastAPI 负责接请求
  • Celery 负责异步任务和状态查询
  • Ray 负责 4 张 GPU 的模型调度
  • 每张 GPU 常驻 1 个 embedding worker

18.1 适用场景

这个结构很适合:

  • 批量生成 embedding
  • OCR / ASR / rerank 这类 GPU 推理任务
  • Web 请求不想阻塞
  • 想把“业务异步”和“GPU 算力”分层

不太适合:

  • 极低延迟在线推理

如果你追求在线低延迟,优先考虑 Ray Serve

18.2 目录结构

1
2
3
4
5
6
7
8
9
ray_embedding_demo/
├── app/
│ ├── __init__.py
│ ├── celery_app.py
│ ├── main.py
│ ├── ray_actors.py
│ ├── ray_runtime.py
│ └── tasks.py
└── requirements.txt

18.3 requirements.txt

1
2
3
4
5
6
7
8
fastapi
uvicorn[standard]
celery
redis
ray[default]
sentence-transformers
torch
numpy

18.4 app/celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import Celery

celery_app = Celery(
"ray_embedding_demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

celery_app.conf.update(
imports=("app.tasks",),
task_track_started=True,
task_default_queue="gpu",
timezone="Asia/Shanghai",
)

18.5 app/ray_runtime.py

1
2
3
4
5
6
7
import ray

RAY_NAMESPACE = "embedding-demo"

def ensure_ray():
if not ray.is_initialized():
ray.init(address="auto", namespace=RAY_NAMESPACE)

18.6 app/ray_actors.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from __future__ import annotations

import os

import ray

from app.ray_runtime import ensure_ray

MODEL_NAME = "BAAI/bge-small-zh-v1.5"
WORKER_COUNT = 4
WORKER_PREFIX = "embedding_worker"
ROUTER_NAME = "embedding_router"

def worker_name(worker_id: int) -> str:
return f"{WORKER_PREFIX}_{worker_id}"

@ray.remote(num_gpus=1)
class EmbeddingWorker:
def __init__(self, model_name: str, worker_id: int):
from sentence_transformers import SentenceTransformer

self.worker_id = worker_id
self.model = SentenceTransformer(model_name, device="cuda")

def encode(self, texts: list[str]) -> list[list[float]]:
vectors = self.model.encode(
texts,
batch_size=32,
normalize_embeddings=True,
)
return vectors.tolist()

def info(self) -> dict:
return {
"worker_id": self.worker_id,
"gpu_ids": ray.get_runtime_context().get_accelerator_ids()["GPU"],
"cuda_visible_devices": os.environ.get("CUDA_VISIBLE_DEVICES"),
}

@ray.remote
class EmbeddingRouter:
def __init__(self, worker_names: list[str]):
self.worker_names = worker_names
self.index = 0

def encode(self, texts: list[str]) -> list[list[float]]:
current_name = self.worker_names[self.index]
self.index = (self.index + 1) % len(self.worker_names)
worker = ray.get_actor(current_name)
return ray.get(worker.encode.remote(texts))

def state(self) -> dict:
return {
"worker_names": self.worker_names,
"next_index": self.index,
}

def ensure_embedding_cluster():
ensure_ray()

worker_names = []
for worker_id in range(WORKER_COUNT):
name = worker_name(worker_id)
EmbeddingWorker.options(
name=name,
lifetime="detached",
get_if_exists=True,
).remote(MODEL_NAME, worker_id)
worker_names.append(name)

EmbeddingRouter.options(
name=ROUTER_NAME,
lifetime="detached",
get_if_exists=True,
).remote(worker_names)

def get_router():
ensure_embedding_cluster()
return ray.get_actor(ROUTER_NAME)

def list_workers() -> list[dict]:
ensure_embedding_cluster()
handles = [ray.get_actor(worker_name(i)) for i in range(WORKER_COUNT)]
return ray.get([handle.info.remote() for handle in handles])

说明:

  • EmbeddingWorker:每个 actor 绑定 1 张 GPU
  • EmbeddingRouter:做一个最简单的轮询分发
  • lifetime="detached":worker 不跟着单个任务退出
  • get_if_exists=True:多进程启动时避免重复创建

18.7 app/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import ray

from app.celery_app import celery_app
from app.ray_actors import get_router

@celery_app.task(
bind=True,
autoretry_for=(Exception,),
retry_backoff=True,
retry_jitter=True,
max_retries=3,
)
def build_embeddings(self, texts: list[str]) -> dict:
router = get_router()
vectors = ray.get(router.encode.remote(texts))

return {
"count": len(vectors),
"dim": len(vectors[0]) if vectors else 0,
"vectors": vectors,
}

18.8 app/main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from contextlib import asynccontextmanager

from celery.result import AsyncResult
from fastapi import FastAPI
from pydantic import BaseModel, Field

from app.celery_app import celery_app
from app.ray_actors import ensure_embedding_cluster, list_workers
from app.tasks import build_embeddings

@asynccontextmanager
async def lifespan(app: FastAPI):
ensure_embedding_cluster()
yield

app = FastAPI(lifespan=lifespan)

class EmbeddingRequest(BaseModel):
texts: list[str] = Field(min_length=1)

@app.post("/embeddings/submit")
def submit_embeddings(payload: EmbeddingRequest):
task = build_embeddings.apply_async(args=[payload.texts], queue="gpu")
return {
"task_id": task.id,
"count": len(payload.texts),
"status": "submitted",
}

@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
result = AsyncResult(task_id, app=celery_app)
data = {
"task_id": task_id,
"status": result.status,
}

if result.successful():
data["result"] = result.result
elif result.failed():
data["error"] = str(result.result)

return data

@app.get("/ray/workers")
def get_ray_workers():
return list_workers()

18.9 启动顺序

先安装依赖:

1
pip install -r requirements.txt

启动 Redis:

1
redis-server

启动 Ray 集群。

注意:

  • Redis 默认也用 6379
  • Ray head 默认端口也常见写成 6379
  • 同机演示时最好显式给 Ray 换一个端口,避免冲突
1
2
ray stop
ray start --head --port=9339 --dashboard-port=8265 --num-gpus=4

启动 Celery worker:

1
celery -A app.celery_app:celery_app worker -l info -Q gpu --concurrency=4

启动 FastAPI:

1
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

18.10 试一下

提交任务:

1
2
3
4
5
curl -X POST "http://127.0.0.1:8000/embeddings/submit" \
-H "Content-Type: application/json" \
-d '{
"texts": ["你好", "Ray 和 Celery 可以联合使用", "多卡 embedding 服务"]
}'

返回类似:

1
2
3
4
5
{
"task_id": "c9e2b7c1-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"count": 3,
"status": "submitted"
}

查状态:

1
curl "http://127.0.0.1:8000/tasks/c9e2b7c1-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

查看 4 个 GPU worker:

1
curl "http://127.0.0.1:8000/ray/workers"

你应该能看到类似结果:

  • worker_id=0 对应 1 张卡
  • worker_id=1 对应 1 张卡
  • worker_id=2 对应 1 张卡
  • worker_id=3 对应 1 张卡

19. “4 张显卡做 embedding 服务” 应该怎么理解

上面的示例,实际做的是这件事:

1
2
3
4
5
6
7
客户端请求
-> FastAPI
-> Celery 异步提交
-> Celery 调用 Ray Router
-> Router 把请求轮询发给 4 个 GPU Actor
-> 对应 Actor 在各自 GPU 上推理
-> Celery 返回任务结果

这意味着:

  • 4 张卡不是给 Celery 的
  • 4 张卡是给 Ray Actor 池的
  • Celery 只是任务入口,不负责 GPU 资源分配

19.1 为什么是一张卡一个 Actor

这是最容易稳定运行的方式。

优点:

  • 模型只加载一次
  • 每张卡有自己的常驻进程
  • 不容易互相抢显存
  • 问题定位更清晰

适合:

  • embedding
  • rerank
  • OCR
  • 语音识别
  • 图像分类

19.2 吞吐量怎么提高

如果你觉得吞吐还不够,优先调这几个地方:

  1. 增大 self.model.encode(..., batch_size=32)batch_size
  2. 一次提交多条文本,而不是每次只送 1 条
  3. 把路由器改成按“负载”调度,而不是简单轮询
  4. 小模型场景下尝试分数 GPU,比如 num_gpus=0.5

最有效的通常不是“开更多 Celery 进程”,而是:

  • 增大单次批量
  • 让 GPU 更满

19.3 什么时候改成 Ray Data

如果你的场景从“在线异步任务”变成:

  • 一次处理几十万条
  • 离线批量跑 embedding
  • 数据已经在文件、表、对象存储里

那通常不该继续走 FastAPI -> Celery 这条链路,而应该直接上:

  • Ray Data
  • map_batches
  • actor pool

19.4 什么时候改成 Ray Serve

如果你的目标是:

  • 每个请求都想低延迟返回
  • 希望直接暴露模型 API
  • 想做副本扩缩容
  • 想配合在线服务治理

那比起 FastAPI + Celery + Ray,更适合直接用:

  • Ray Serve

一句话:

  • 批处理 / 后台任务:Celery + Ray
  • 在线模型服务:Ray Serve

19.5 这个示例有哪些简化

为了易懂,这里故意省略了一些生产细节:

  • 没有把结果写数据库
  • 没有做请求级限流
  • 没有做任务取消
  • 没有做 GPU worker 健康检查和自动重建
  • 没有做模型热更新

但核心链路已经是对的:

  • Celery 编排任务
  • Ray 管 GPU
  • 模型常驻 Actor

19.6 真上生产时我建议再补 5 个点

  1. 结果不要直接塞进 Celery backend,尤其是 embedding 向量很大时
  2. 结果存数据库、对象存储或向量库,Celery 只回传元信息
  3. 给不同模型建不同的 Ray Actor 池,不要混在一个池里
  4. 给 GPU 任务单独 Celery 队列,避免和普通业务任务互相影响
  5. 增加一个“模型路由层”,按模型名、batch 大小、GPU 余量去分配

19.7 如果从单机 4 卡扩到多机

这套代码基本不用大改。

你主要改的是部署:

头节点:

1
ray start --head --port=9339 --dashboard-port=8265

工作节点:

1
ray start --address=<head-ip>:9339 --num-gpus=4

Python 代码仍然可以继续用:

1
ray.init(address="auto")

这也是 Ray 很实用的一点:

  • 先单机写通
  • 再平移到多机

20. 一份更现实的落地建议

如果你当前项目是 RAG 或 AI 应用后台,我建议优先这么落:

  1. Web 请求进入 FastAPI
  2. 文件解析、embedding、rerank 任务交给 Celery
  3. Celery 只做任务编排和状态更新
  4. 具体模型推理交给 Ray Actor 池
  5. 大批量离线任务再单独引入 Ray Data

这个拆法的优点是:

  • 业务代码和算力代码边界清晰
  • GPU 不会被 Celery 乱占
  • 后续扩展到多机、多模型也更稳

Embedding 入门笔记

1. 什么是 Embedding

Embedding 做的事情可以先这样看:

  • 把文本、图片、代码等内容,转换成一串数字
  • 这串数字不是随便生成的,而是尽量保留原内容的“语义信息”
  • 语义越接近的内容,转成的向量通常越接近

更直接一点说,Embedding 是把人能理解的内容,变成机器更容易比较和计算的向量表示。


2. 为什么需要 Embedding

计算机不直接理解“意思”,它更擅长处理数字。

例如这两句话:

  • 今天天气很好
  • 今天阳光不错

如果只看关键词,它们并不完全一样;
但如果转成 embedding,它们在向量空间里通常会比较近,因为语义接近。

所以 embedding 的核心价值是:

  • 不只看字面是否一样
  • 更关注内容“意思像不像”

3. Embedding 的直观类比

可以把 embedding 想成“给每段内容找一个坐标”。

  • 语义相近的内容,坐标位置更近
  • 语义差别大的内容,坐标位置更远

例如:

  • “苹果手机”
  • “iPhone”

它们可能在向量空间里很接近。

而:

  • “苹果手机”
  • “高等数学”

它们通常会相距很远。


4. Embedding 的输出长什么样

Embedding 的结果一般是一个高维向量,例如:

1
[0.021, -0.184, 0.337, ..., 0.092]

特点:

  • 从形式上看,就是一串浮点数组成的列表
  • 维度可能是几百到几千
  • 人看不懂具体数值,但模型和检索系统可以利用它做计算

你不需要记住每个数值,只需要理解:

  • 向量本身没有直观意义
  • 向量之间的“距离/相似度”有意义

5. Embedding 主要解决什么问题

5.1 语义检索

最常见用途。

用户搜索:

  • “怎么提高学习效率”

文档里写的是:

  • “如何建立高效的学习习惯”

虽然字面不完全一致,但 embedding 检索可能仍然能找到这段内容。

5.2 推荐系统

  • 给用户推荐相似文章
  • 推荐相似商品
  • 推荐相似视频

5.3 聚类

把意思接近的内容自动分到一组。

5.4 分类

把新文本和已有标签进行相似度比较,辅助分类。

5.5 RAG

这是现在最常见的 AI 应用场景之一。

流程通常是:

  1. 先把知识库文本切块
  2. 每个块生成 embedding
  3. 用户提问时,也生成问题的 embedding
  4. 用相似度检索最相关的文本块
  5. 把检索结果交给大模型生成回答

6. Embedding 的基本工作流程

以文本为例:

第一步:准备文本

原始文档通常不会整篇直接做 embedding,而是先切成多个小块(chunk)。

原因:

  • 太长会超过模型限制
  • 粒度太大不利于精确检索
  • 分块后更适合做 RAG

第二步:调用 embedding 模型

把每个文本块输入 embedding 模型,得到一个向量。

第三步:存储向量

常见存储方式:

  • 向量数据库
  • FAISS
  • pgvector
  • Milvus
  • Weaviate

第四步:用户提问

把用户问题也转成 embedding。

第五步:计算相似度

把“问题向量”和“文本块向量”做相似度计算,找到最相近的几个结果。

第六步:返回结果或交给大模型

  • 可以直接返回检索结果
  • 也可以把结果交给 LLM 生成更完整的回答

7. 常见的相似度计算方式

Embedding 常见比较方式有:

  • 余弦相似度(Cosine Similarity)
  • 点积(Dot Product)
  • 欧氏距离(Euclidean Distance)

初学阶段重点理解一个结论:

  • 两个向量越相似,通常说明两段内容语义越接近

实际工程里,很多系统常用余弦相似度。


8. Embedding 和关键词搜索的区别

关键词搜索

优点:

  • 简单直接
  • 精确匹配效果好

缺点:

  • 容易受措辞影响
  • 同义词、近义表达处理较弱

Embedding 搜索

优点:

  • 更关注语义
  • 能处理“说法不同但意思接近”的情况

缺点:

  • 成本更高
  • 结果不一定完全可解释
  • 有时会召回“看起来像,但其实不对”的内容

结论:

  • 关键词搜索适合精确匹配
  • embedding 搜索适合语义检索
  • 很多实际系统会把两者结合起来

9. Embedding 在 RAG 里的位置

RAG 可以粗略理解成:

  • Embedding 负责“找资料”
  • 大模型负责“组织答案”

也就是说:

  • Embedding 决定你能不能找到相关上下文
  • LLM 决定你能不能把上下文回答得清楚

如果 embedding 检索不到有用内容,那么后面的生成效果通常也会变差。

所以在 RAG 里,embedding 不是配角,而是基础能力。


10. 初学者最容易混淆的几个点

10.1 Embedding 不是生成答案

Embedding 主要做“表示”和“检索”,不是直接生成自然语言回答。

10.2 Embedding 不等于关键词匹配

它更偏向语义空间中的相似性,而不是简单的词面重合。

10.3 向量维度不是越大越好

维度更高不一定代表效果一定更强,还要考虑:

  • 模型质量
  • 成本
  • 存储空间
  • 检索速度

10.4 文本切块很重要

很多检索效果差,不一定是 embedding 模型不行,可能是:

  • 切块太大
  • 切块太碎
  • 重叠设置不合理
  • 清洗文本质量差

11. 一个最小例子

假设知识库里有三段文本:

  1. Python 是一种编程语言
  2. 篮球是一项团队运动
  3. 机器学习是人工智能的重要分支

用户问题是:

什么是 AI 的一个重要领域?

系统流程:

  1. 先把这三段文本转成 embedding
  2. 再把用户问题转成 embedding
  3. 比较问题与三段文本的相似度
  4. 第 3 段大概率最相近
  5. 系统返回第 3 段,或者交给大模型生成最终答案

这就是 embedding 在检索中的基本逻辑。


12. 学习 Embedding 时建议先掌握的关键词

  • 向量(vector)
  • 维度(dimension)
  • 语义相似度(semantic similarity)
  • 文本切块(chunking)
  • 向量检索(vector search)
  • 召回(recall)
  • 重排(rerank)
  • RAG

13. 初学者学习路线

第一阶段:先懂概念

重点搞清楚:

  • embedding 是什么
  • 为什么能做语义检索
  • 它和关键词搜索有什么区别

第二阶段:能跑一个小 demo

例如:

  • 准备几段文本
  • 生成 embedding
  • 输入一个问题
  • 找出最相似的文本

第三阶段:结合 RAG 理解

重点理解:

  • 文档如何切块
  • 向量如何存储
  • 如何检索 top-k
  • 为什么还需要 rerank

第四阶段:再看工程优化

例如:

  • 检索召回率
  • 多语言效果
  • 成本控制
  • 延迟优化

14. 一句话总结

Embedding 的本质,是把内容变成可以计算“语义距离”的向量表示。
它是语义检索、推荐系统、聚类分析和 RAG 的基础能力之一。


15. Embedding 模型怎么选

初学者选 embedding 模型时,重点不要只盯着“排行榜”,而要先看是否适合自己的场景。

可以先看 4 个维度:

15.1 语言

  • 主要是中文,就优先选中文效果好的模型
  • 中英混合,就选多语言模型
  • 如果业务是代码、医学、法律等领域,最好找更贴近领域语料的模型

15.2 成本和速度

  • 在线 API:接入快,但按量付费
  • 本地开源模型:可控性高,但需要本地算力和部署成本

15.3 向量维度

  • 维度更高,不代表一定更好
  • 维度越高,通常存储成本和检索开销也会更高

15.4 是否适合检索任务

有些模型更偏“通用表征”,有些模型更偏“检索优化”。

做 RAG 时,优先选择明确用于 embedding / retrieval 的模型,而不是随便拿一个生成模型来代替。

15.5 初学者建议

如果你只是入门,建议先用一个成熟模型把流程跑通,不要一开始就在模型上反复切换。

可以按这个顺序来理解:

  • 第一阶段先验证流程能跑通
  • 第二阶段再比较不同模型的效果

16. 文本切块怎么做

Embedding 效果好不好,很多时候不只是模型问题,切块策略同样重要。

16.1 为什么要切块

因为原始文档通常太长,不适合直接拿整篇去做检索。

切块的目标是:

  • 每块内容尽量表达一个相对完整的意思
  • 不能太长,否则检索不精准
  • 也不能太短,否则上下文不足

16.2 常见切块方式

固定长度切块

比如每 300 字或每 500 tokens 切一块。

优点:

  • 简单
  • 容易实现

缺点:

  • 可能把完整段落切断

按段落 / 标题切块

根据自然段、标题、小节来切。

优点:

  • 语义更完整

缺点:

  • 长度不稳定

混合切块

先按标题和段落切,再对过长部分按长度继续细分。

这是实际项目里很常见的方式。

16.3 什么是 overlap

overlap 就是相邻 chunk 之间保留一部分重复内容。

作用:

  • 避免关键信息刚好被切断
  • 提高上下文连续性

例如:

  • chunk1: 第 1~300 字
  • chunk2: 第 260~560 字

这里就有 40 字重叠。

16.4 初学者的默认思路

可以先从下面这个简单策略开始:

  • 中文笔记或普通文档:每块 300~500 字
  • overlap:10%~20%
  • 优先按标题和段落切,长度超了再二次切分

这不是固定标准,只是适合作为第一版 baseline。


17. 向量数据库和 FAISS 是什么关系

很多初学者会把 embedding、FAISS、向量数据库混在一起,其实它们不是一回事。

17.1 Embedding 模型负责什么

负责把文本转成向量。

17.2 FAISS 负责什么

FAISS 是一个向量检索库,重点是:

  • 存向量
  • 建索引
  • 做近似或精确检索

它更像“本地检索引擎”,适合:

  • 学习
  • 单机实验
  • 本地部署

17.3 向量数据库负责什么

向量数据库除了向量检索,还经常提供:

  • 元数据过滤
  • 持久化
  • 集群能力
  • 权限和服务化接口

17.4 怎么选

  • 只是学习原理:先用 FAISS
  • 已经有 PostgreSQL:可以考虑 pgvector
  • 需要完整服务化能力:再考虑 Milvus、Weaviate、Pinecone 一类方案

职责可以这样拆开看:embedding 负责生成向量,FAISS 或向量数据库负责保存、索引和检索这些向量。


18. 为什么检索后还需要 Rerank

很多 RAG 系统不是“检索完直接交给大模型”,而是会多一步 rerank。

18.1 检索和 rerank 的区别

第一阶段检索:

  • 用 embedding 先快速找出 top-k 候选片段
  • 目标是“尽量别漏掉相关内容”

第二阶段 rerank:

  • 对候选片段重新打分排序
  • 目标是“把最相关的排到最前面”

18.2 为什么要这样做

因为 embedding 检索更像“粗筛”,速度快,但有时不够精细。
rerank 更像“精筛”,速度慢一些,但相关性通常更好。

常见做法是:

  1. embedding 先召回 top 10 或 top 20
  2. rerank 再选出 top 3 或 top 5
  3. 把这几个最相关片段交给 LLM

18.3 初学者什么时候需要 rerank

如果你的知识库还比较小,可以先不加 rerank。
当你发现“明明检索到了相关内容,但排序不理想”时,再加 rerank 会更有感觉。


19. RAG 的完整工作流

把 embedding 放到 RAG 里,可以拆成两条链路来看。

19.1 入库链路

  1. 读取原始文档
  2. 清洗文本
  3. 按规则切块
  4. 为每个 chunk 生成 embedding
  5. 保存 chunk 文本、元数据和向量
  6. 建立向量索引

这里通常还会一起保存:

  • doc_id
  • chunk_id
  • source
  • title
  • section
  • text

这样后面才能做引用返回和结果定位。

19.2 查询链路

  1. 用户输入问题
  2. 把问题转成 embedding
  3. 到向量库里召回 top-k chunks
  4. 如果有 rerank,再重排一次
  5. 把相关 chunks 拼进 prompt
  6. 交给 LLM 生成答案
  7. 返回答案和引用来源

19.3 在这个流程里谁最关键

  • chunking 决定“切得好不好”
  • embedding 决定“能不能找对”
  • rerank 决定“排得准不准”
  • LLM 决定“答得顺不顺”

所以 RAG 的质量不是只靠一个大模型决定的。


20. 一个最小可运行示例

下面这个例子演示:

  • 先把几段文本转成 embedding
  • 用 FAISS 建索引
  • 再用问题去检索最相近的内容

安装:

1
pip install sentence-transformers faiss-cpu numpy

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer

docs = [
"Python 是一种通用编程语言。",
"Celery 用来处理异步任务。",
"Embedding 可以把文本表示成向量。",
]

query = "什么技术可以把文本变成向量?"

# 这里只是示例,模型名可以按你的语言和场景替换
model = SentenceTransformer("BAAI/bge-small-zh-v1.5")

# normalize_embeddings=True 时,常配合内积检索
doc_vectors = model.encode(docs, normalize_embeddings=True)
doc_vectors = np.asarray(doc_vectors, dtype="float32")

index = faiss.IndexFlatIP(doc_vectors.shape[1])
index.add(doc_vectors)

query_vector = model.encode([query], normalize_embeddings=True)
query_vector = np.asarray(query_vector, dtype="float32")

scores, ids = index.search(query_vector, k=2)

for score, idx in zip(scores[0], ids[0]):
print(f"score={score:.4f} text={docs[idx]}")

如果结果里把:

  • Embedding 可以把文本表示成向量。

排在最前面,就说明最基本的语义检索已经跑通了。


21. 检索效果不好时怎么排查

很多时候不是“模型不行”,而是流程某一环出了问题。

建议按这个顺序排查:

21.1 先看 chunk

  • 是否切得太碎
  • 是否一块里塞了太多主题
  • 标题和正文是否被拆散

21.2 再看文本清洗

  • 是否有大量乱码
  • 是否混入无关模板文字
  • 是否把表格、代码块、换行结构破坏了

21.3 再看相似度设置

  • 向量是否做了归一化
  • 模型推荐的是 cosine、dot product 还是 L2
  • 索引方式是否和模型假设一致

21.4 再看召回数量

  • top_k 太小可能漏召回
  • top_k 太大又可能引入太多噪声

21.5 最后再考虑 rerank 和模型替换

先把 chunking、清洗、召回这些基础问题处理好,再换模型,效率会更高。


22. 参考博客 / 文档

适合初学者先看的几篇:

继续深入可以看:


23. 一句话总结

Embedding 不是单独存在的技术点,而是语义检索、向量数据库、rerank 和 RAG 整条链路中的基础环节。
真正做项目时,要把它放进“切块 -> 向量化 -> 检索 -> 重排 -> 生成”的完整流程里理解。

MySQL 学习笔记

1. MySQL 是什么

MySQL 是一个关系型数据库,用来存储:

  • 用户信息
  • 订单数据
  • 文档元数据
  • 任务记录
  • 聊天记录
  • 各种需要长期保存的结构化数据

先抓住它和 Redis 的分工差异:

MySQL 更适合存“长期、结构化、可查询的数据”,Redis 更适合存“高频、临时、快速访问的数据”。

2. 为什么项目里经常会有 MySQL

因为很多业务数据都需要:

  • 长期保存
  • 有明确字段结构
  • 支持条件查询
  • 支持关联查询
  • 保证数据一致性

例如:

  • 一个用户有哪些会话
  • 一个文档属于哪个用户
  • 一条任务记录当前是什么状态
  • 某段时间创建了多少条消息

这些都很适合用 MySQL 来存。

3. 在 AI 项目里 MySQL 常见用法

在 AI 模型开发场景里,MySQL 常见用途有:

  • 存用户表
  • 存会话表
  • 存消息表
  • 存文档表
  • 存任务记录表
  • 存模型调用日志
  • 存权限和配置

放回整套系统里看,各组件的分工大致是:

  • FastAPI / Drogon:接请求
  • Redis:做缓存、状态层、中间层
  • Celery:处理后台任务
  • MySQL:存长期结构化数据

4. MySQL 和 Redis 的区别

维度 MySQL Redis
数据模型 关系型表结构 键值型
持久化 强,适合长期存储 可持久化,但常用于短期数据
查询能力 强,支持 SQL 主要按 key 操作
适合场景 用户、订单、文档、消息 缓存、计数器、会话、队列
事务 支持 支持有限的原子操作,不是传统关系事务

简单理解:

  • 用户资料、文档记录、消息历史更适合 MySQL
  • 缓存结果、限流计数、短期任务状态更适合 Redis

5. 安装和启动

如果本地已经装好了 MySQL,可以直接启动服务。

如果只是临时学习,Docker 会更省事:

1
2
3
4
5
6
docker run -d \
--name mysql8 \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=123456 \
-e MYSQL_DATABASE=demo \
mysql:8

进入 MySQL:

1
mysql -h 127.0.0.1 -P 3306 -u root -p

查看数据库:

1
SHOW DATABASES;

选择数据库:

1
USE demo;

6. Python 连接 MySQL

Python 项目里很常见的是 pymysql

安装:

1
pip install pymysql

最基础的连接方式:

1
2
3
4
5
6
7
8
9
10
11
import pymysql

conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="demo",
charset="utf8mb4",
autocommit=False,
)

几个常见参数:

  • host:数据库地址
  • port:端口,MySQL 默认通常是 3306
  • user / password:用户名密码
  • database:要连接的库
  • charset="utf8mb4":推荐写上,避免中文和 emoji 编码问题
  • autocommit=False:默认手动提交事务

7. 连接和游标 cursor

你原来笔记里提到“游标对象的具体作用是什么”,可以这样理解:

连接 conn 负责连数据库,游标 cursor 负责执行 SQL 和取结果。

最常见写法:

1
cursor = conn.cursor()

更推荐:

1
2
3
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()

cursor 主要负责:

  • 执行 SQL
  • 传递参数
  • 获取查询结果
  • 提供 lastrowidrowcount 等信息

常见方法:

  • cursor.execute():执行一条 SQL
  • cursor.executemany():批量执行
  • cursor.fetchone():取一条
  • cursor.fetchmany(n):取前 n
  • cursor.fetchall():取全部

如果你希望查询结果按字典返回,而不是元组,可以这样写:

1
2
3
4
5
6
7
8
9
10
import pymysql.cursors

conn = pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="demo",
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)

这样查出来的数据会更接近:

1
{"id": 1, "name": "tom"}

8. 建表和常见字段类型

MySQL 最基础的 SQL 之一就是建表。

示例:

1
2
3
4
5
6
7
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(100) NOT NULL UNIQUE,
email VARCHAR(200) DEFAULT NULL,
age INT DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

常见字段类型:

  • INT:整数
  • BIGINT:更大的整数
  • VARCHAR(n):变长字符串
  • TEXT:长文本
  • DATETIME:日期时间
  • DATE:日期
  • DECIMAL(10,2):精确小数
  • BOOLEAN:布尔,底层通常可看成 TINYINT

常见约束:

  • PRIMARY KEY:主键
  • AUTO_INCREMENT:自增
  • NOT NULL:不能为空
  • UNIQUE:唯一
  • DEFAULT:默认值
  • FOREIGN KEY:外键

9. 表结构相关 SQL

这部分在实际开发里很常用。

9.1 创建表

1
2
3
4
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL
);

9.2 如果表已存在就跳过

你前面提到的“表已存在怎么办”,对应的就是这个场景。

如果担心“已经有同名表,再创建会报错”,可以写:

1
2
3
4
CREATE TABLE IF NOT EXISTS users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(100) NOT NULL
);

这样当表已存在时,会直接跳过,不会因为同名表报错。

9.3 修改表

1
ALTER TABLE users ADD COLUMN email VARCHAR(200);

9.4 删除表

1
DROP TABLE users;

如果担心表不存在报错:

1
DROP TABLE IF EXISTS users;

9.5 查看建表语句

1
SHOW CREATE TABLE users;

这个命令非常实用,因为它能直接看到当前表的真实结构、索引、字符集等信息。

10. 插入数据 INSERT

最基础的插入写法:

1
2
INSERT INTO users (username, email, age)
VALUES ('tom', 'tom@example.com', 18);

在 Python 里更推荐参数化写法:

1
2
3
4
5
6
7
with conn.cursor() as cursor:
sql = """
INSERT INTO users (username, email, age)
VALUES (%s, %s, %s)
"""
cursor.execute(sql, ("tom", "tom@example.com", 18))
conn.commit()

插入后拿自增主键:

1
user_id = cursor.lastrowid

11. 查询数据 SELECT

11.1 查全部字段

1
SELECT * FROM users;

11.2 只查部分字段

1
SELECT id, username FROM users;

11.3 带条件查询

1
2
3
SELECT id, username
FROM users
WHERE age >= 18;

11.4 排序

1
2
3
SELECT id, username
FROM users
ORDER BY created_at DESC;

11.5 分页

1
2
3
4
SELECT id, username
FROM users
ORDER BY id DESC
LIMIT 10 OFFSET 0;

也常写成:

1
LIMIT 0, 10;

12. 更新数据 UPDATE

1
2
3
UPDATE users
SET email = 'new@example.com'
WHERE id = 1;

一定要注意:

UPDATE 不写 WHERE,会更新整张表。

Python 示例:

1
2
3
4
5
with conn.cursor() as cursor:
sql = "UPDATE users SET email = %s WHERE id = %s"
affected_rows = cursor.execute(sql, ("new@example.com", 1))
conn.commit()
print(affected_rows)

13. 删除数据 DELETE

1
2
DELETE FROM users
WHERE id = 1;

同样要注意:

DELETE 不写 WHERE,会删整张表的数据。

如果你是想删整张表但保留结构,也可以:

1
TRUNCATE TABLE users;

14. 条件查询、排序、分组

这些是 SQL 里非常高频的能力。

14.1 WHERE

1
SELECT * FROM users WHERE age >= 18;

14.2 ORDER BY

1
SELECT * FROM users ORDER BY id DESC;

14.3 LIMIT

1
SELECT * FROM users LIMIT 20;

14.4 GROUP BY

1
2
3
SELECT status, COUNT(*) AS total
FROM tasks
GROUP BY status;

14.5 HAVING

HAVING 通常配合分组结果使用:

1
2
3
4
SELECT status, COUNT(*) AS total
FROM tasks
GROUP BY status
HAVING total > 10;

15. JOIN 关联查询

这是关系型数据库最重要的优势之一。

假设:

  • users 表存用户
  • documents 表存文档

查询“每篇文档属于哪个用户”:

1
2
3
SELECT d.id, d.title, u.username
FROM documents d
JOIN users u ON d.user_id = u.id;

常见 JOIN:

  • JOIN / INNER JOIN:两边都能匹配到才返回
  • LEFT JOIN:左表全部保留,右表匹配不到则为 NULL

例如:

1
2
3
SELECT u.id, u.username, d.title
FROM users u
LEFT JOIN documents d ON u.id = d.user_id;

16. 索引 INDEX

索引的作用可以粗暴理解成:

让查询更快,但会增加写入成本和占用空间。

例如给用户名加索引:

1
CREATE INDEX idx_users_username ON users(username);

适合加索引的字段通常有:

  • 经常出现在 WHERE 里的字段
  • 经常用于排序的字段
  • 经常用于 JOIN 的字段
  • 唯一约束字段

常见误区:

  • 不是索引越多越好
  • 很小的表不一定需要索引
  • 更新频繁的字段加太多索引会拖慢写入

17. 外键 FOREIGN KEY

外键用于表达表和表之间的引用关系。

例如:

1
2
3
4
5
6
7
CREATE TABLE documents (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
title VARCHAR(200) NOT NULL,
CONSTRAINT fk_documents_user
FOREIGN KEY (user_id) REFERENCES users(id)
);

它的作用是:

  • 保证引用关系合法
  • 避免出现“文档指向一个不存在用户”的情况

但很多实际项目里,也会选择在业务层保证关系,而不是强依赖数据库外键。

18. 事务 commitrollback

你原来的笔记里提到“事务提交”,这部分非常重要。

事务适合用于:

  • 多条 SQL 必须一起成功
  • 中间某一步失败时要整体撤回

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
try:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO accounts (name, balance) VALUES (%s, %s)",
("alice", 1000),
)
cursor.execute(
"INSERT INTO accounts (name, balance) VALUES (%s, %s)",
("bob", 500),
)
conn.commit()
except Exception:
conn.rollback()
raise

常见规则:

  • 成功后 conn.commit()
  • 失败时 conn.rollback()

如果连接时配置:

1
autocommit=True

就表示每条 SQL 默认自动提交。

学习阶段可以用,但项目里要清楚:

一旦涉及多步写操作,通常还是要认真控制事务边界。

19. 查询结果获取

这是你原来笔记里已经提到的内容,这里整理成更清晰的版本。

19.1 fetchone()

只取一条:

1
row = cursor.fetchone()

19.2 fetchmany(n)

取前 n 条:

1
rows = cursor.fetchmany(10)

19.3 fetchall()

取全部:

1
rows = cursor.fetchall()

19.4 lastrowid

拿最后插入的自增 ID:

1
new_id = cursor.lastrowid

19.5 rowcount

看本次 SQL 影响了多少行:

1
count = cursor.rowcount

20. execute()executemany()

20.1 execute()

执行一条 SQL:

1
2
3
4
cursor.execute(
"INSERT INTO users (username, age) VALUES (%s, %s)",
("tom", 18),
)

20.2 executemany()

批量执行:

1
2
3
4
5
6
7
8
9
10
data = [
("tom", 18),
("alice", 20),
("bob", 22),
]

cursor.executemany(
"INSERT INTO users (username, age) VALUES (%s, %s)",
data,
)

适合:

  • 批量插入
  • 批量更新同类数据

21. 一段比较完整的 PyMySQL 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import pymysql
import pymysql.cursors

conn = pymysql.connect(
host="127.0.0.1",
port=3306,
user="root",
password="123456",
database="demo",
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=False,
)

try:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(100) NOT NULL UNIQUE,
age INT NOT NULL DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)

cursor.execute(
"INSERT INTO users (username, age) VALUES (%s, %s)",
("tom", 18),
)

new_id = cursor.lastrowid

cursor.execute(
"SELECT id, username, age FROM users WHERE id = %s",
(new_id,),
)
row = cursor.fetchone()
print(row)

conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()

22. FastAPI 里怎么用 MySQL

在 FastAPI 项目里,MySQL 常见用途有:

  • 存用户
  • 存任务记录
  • 存会话和消息
  • 存文档元数据

最简单的同步示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymysql
from fastapi import FastAPI

app = FastAPI()

def get_conn():
return pymysql.connect(
host="127.0.0.1",
user="root",
password="123456",
database="demo",
charset="utf8mb4",
)

@app.get("/users/{user_id}")
def get_user(user_id: int):
conn = get_conn()
try:
with conn.cursor() as cursor:
cursor.execute(
"SELECT id, username FROM users WHERE id = %s",
(user_id,),
)
row = cursor.fetchone()
return {"data": row}
finally:
conn.close()

不过实际项目里通常会再往前走一步:

  • 使用连接池
  • 抽 Repository / Service 层
  • 使用 SQLAlchemy 等更完整的数据库层

23. AI 项目里比较常见的表设计

如果你在做文档问答、RAG、聊天系统,常见会有这些表:

  • users
  • sessions
  • messages
  • documents
  • document_chunks
  • tasks

例如:

  • documents:存文档标题、路径、状态、所属用户
  • tasks:存异步任务 ID、状态、错误信息
  • messages:存对话消息和角色

也就是说:

MySQL 更适合存“事实记录”和“长期业务数据”,不适合拿来做高频缓存层。

24. 常见坑

24.1 用字符串拼 SQL

错误示例:

1
sql = f"SELECT * FROM users WHERE username = '{username}'"

这会有 SQL 注入风险。

正确做法:

1
2
3
4
cursor.execute(
"SELECT * FROM users WHERE username = %s",
(username,),
)

24.2 忘记提交事务

执行了 INSERT / UPDATE / DELETE 之后,如果没有 commit(),数据可能不会真正写入。

24.3 UPDATE / DELETE 不带 WHERE

这会影响整张表。

24.4 滥用 SELECT *

开发初期问题不大,但项目里更推荐明确字段,避免:

  • 取了不需要的数据
  • 表结构变动后影响接口
  • 传输和解析成本增加

24.5 不建索引或乱建索引

都不对。

应该根据查询模式来设计索引。

24.6 字符集没统一

推荐统一使用:

  • 数据库字符集:utf8mb4
  • Python 连接:charset="utf8mb4"

24.7 把 MySQL 当缓存用

这会让数据库承担不适合它的高频压力。

25. 一套比较实用的学习顺序

建议这样学:

  1. 先学会创建库、建表、插入、查询、更新、删除
  2. 学会 WHEREORDER BYLIMIT
  3. 学会 GROUP BYJOIN
  4. 学会索引和事务
  5. 学会用 PyMySQL 连接和执行参数化 SQL
  6. 最后再接入 FastAPI、Celery、Redis

26. 总结

MySQL 的核心价值在于:

  • 存长期结构化数据
  • 支持 SQL 查询
  • 支持关联查询
  • 支持事务
  • 适合承载业务主数据

对于 AI 项目,可以把它理解成:

  • 用户数据存储层
  • 会话和消息持久化层
  • 文档元数据层
  • 任务记录层

如果把你前面几份笔记连起来看,可以把它们理解成一条链:

  • FastAPI / Drogon:接请求
  • Redis:缓存 / 状态 / 中间层
  • Celery:后台异步任务
  • MySQL:长期持久化数据

27. 参考

Redis 学习笔记

1. Redis 是什么

Redis 是一个高性能的内存型键值数据库,常被用来做:

  • 缓存
  • 会话存储
  • 计数器
  • 排行榜
  • 消息队列
  • 分布式锁
  • Celery 的 Broker / Result Backend

先把分工记住:MySQL 更适合放长期、结构化、需要稳定查询的数据,Redis 更适合放高频访问、生命周期短、对速度敏感的数据。

2. 为什么项目里经常会有 Redis

很多系统里都会遇到这些问题:

  • 某些数据访问特别频繁
  • 接口需要很快返回
  • 需要临时状态,不想每次都查数据库
  • 需要做限流、验证码、会话缓存
  • 需要一个轻量消息通道

这时候 Redis 很适合。

因为它的特点是:

  • 读写快
  • 支持多种数据结构
  • 支持过期时间
  • 支持原子操作
  • 既能做缓存,也能做简单消息系统

3. 在 AI 项目里 Redis 常见用法

在 AI 模型开发场景里,Redis 很常见:

  • 缓存热点查询结果
  • 缓存用户会话上下文
  • 给 Celery 当 Broker
  • 给 Celery 当任务结果存储
  • 存任务状态、进度、短期中间结果
  • 做限流
  • 用 Streams 传递流式输出 token

和你前面整理的两份笔记连起来理解:

  • FastAPI:接请求
  • Redis:缓存 / 排队 / 状态层
  • Celery:异步处理后台任务

4. Redis 和 MySQL 的区别

可以这样对比理解:

维度 Redis MySQL
数据类型 键值型,支持多种结构 关系型表结构
访问速度 很快,主要基于内存 相对慢一些,主要基于磁盘
适合场景 缓存、短期状态、计数、队列 长期存储、强结构化数据
查询方式 按 key 取值为主 SQL 查询
持久化重点 可选持久化,但常用于高频临时数据 天生适合长期持久化

简单理解:

  • 用户资料、订单、文档元数据更适合 MySQL
  • 验证码、登录态、缓存结果、限流计数更适合 Redis

5. 安装和启动

如果本地已经装好了 Redis,可以直接启动服务。

常见方式:

1
redis-server

如果只是临时学习,也可以用 Docker:

1
docker run -d --name redis -p 6379:6379 redis:7

测试服务是否正常:

1
redis-cli ping

如果返回:

1
PONG

说明 Redis 正常启动。

6. Python 连接 Redis

Python 项目里最常用的是 redis-py

安装:

1
pip install redis

最基础的连接方式:

1
2
3
4
5
6
7
8
9
10
11
import redis

r = redis.Redis(
host="127.0.0.1",
port=6379,
db=0,
decode_responses=True,
)

r.set("name", "tom")
print(r.get("name"))

说明:

  • db=0 表示使用第 0 个逻辑库
  • decode_responses=True 表示自动把结果解码为字符串

如果不加 decode_responses=True,很多结果会是 bytes

7. Redis 的基本概念

Redis 里最核心的概念就是:

  • key
  • value

所有数据都以 key -> value 的形式存储。

例如:

1
2
3
user:1:name -> tom
task:123:status -> SUCCESS
cache:article:100 -> {...}

实际项目里非常重要的一点是:

key 命名要有规则。

常见命名方式:

  • user:1001
  • session:token:abc123
  • task:ingest:001
  • cache:query:hash_xxx

这样后续排查和维护会清晰很多。

7.1 逻辑库 db

Redis 默认会提供多个逻辑库,常见写法里的:

  • db=0
  • redis://127.0.0.1:6379/1
  • redis://127.0.0.1:6379/2

这里最后的数字就是逻辑库编号。

它常用于做基础隔离,例如:

  • 0:普通缓存
  • 1:Celery broker
  • 2:Celery backend

不过要注意:

逻辑库只是“轻隔离”,不是严格意义上的多实例隔离。

如果项目越来越复杂,真正需要资源隔离、权限隔离、性能隔离时,通常还是会考虑拆 Redis 实例。

8. 常用通用命令

8.1 设置和获取

1
2
SET name tom
GET name

8.2 判断是否存在

1
EXISTS name

8.3 删除

1
DEL name

8.4 查看过期时间

1
TTL name

8.5 给 key 设置过期

1
EXPIRE name 60

表示 60 秒后过期。

8.6 直接带过期时间写入

1
SET code 123456 EX 300

表示写入验证码,并在 300 秒后自动过期。

8.7 查看库里的 key

学习阶段常见命令:

1
KEYS *

但要注意:

KEYS * 适合本地调试,不适合在线上大库里频繁使用。

更稳妥的方式通常是:

1
SCAN 0 MATCH user:* COUNT 100

SCAN 更适合在数据量较大时逐步遍历。

9. Redis 的核心数据结构

Redis 不只是简单字符串,它支持很多数据结构。这个是 Redis 很有价值的地方。

最常用的有:

  • String
  • Hash
  • List
  • Set
  • Sorted Set
  • Stream

10. String

这是 Redis 最基础、最常用的数据类型。

适合:

  • 普通缓存
  • 验证码
  • token
  • JSON 字符串
  • 计数器

10.1 基本命令

1
2
SET username alice
GET username

10.2 自增计数

1
2
3
SET page_views 0
INCR page_views
INCRBY page_views 5

这类操作很适合:

  • 接口访问次数统计
  • 点赞数
  • 限流计数

10.3 Python 示例

1
2
3
4
5
6
7
8
9
import redis

r = redis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)

r.set("article:1:title", "Redis Intro")
title = r.get("article:1:title")

r.set("counter:views", 0)
r.incr("counter:views")

11. Hash

Hash 就是一个 key 下挂多组 field-value。

适合:

  • 用户对象
  • 配置项
  • 文档元信息
  • 任务状态对象

11.1 基本命令

1
2
3
HSET user:1 name tom age 18 city shanghai
HGET user:1 name
HGETALL user:1

11.2 为什么 Hash 很常用

因为很多业务对象天然就是字段结构,比如:

1
2
3
4
user:1
name -> tom
age -> 18
city -> shanghai

相比把整个对象都塞进一个 JSON 字符串里,Hash 在局部更新字段时更方便。

11.3 Python 示例

1
2
r.hset("user:1", mapping={"name": "tom", "age": 18, "city": "shanghai"})
user = r.hgetall("user:1")

12. List

List 是有序列表。

适合:

  • 简单消息队列
  • 待处理任务列表
  • 最近操作记录

12.1 基本命令

1
2
3
LPUSH queue:tasks task1
LPUSH queue:tasks task2
RPOP queue:tasks

直观一点看,就是:

  • 左边入队
  • 右边出队

12.2 阻塞读取

1
BLPOP queue:tasks 0

表示如果队列为空就阻塞等待。

这可以做一个很简单的消费者模型,但它更适合轻量场景,不适合复杂消费确认机制。

12.3 使用建议

List 可以做简单队列,但如果项目要:

  • 消费组
  • 确认机制
  • 多消费者协作
  • 更强的消息语义

通常会考虑:

  • Redis Streams
  • RabbitMQ
  • Kafka

13. Set

Set 是无序且元素唯一的集合。

适合:

  • 去重
  • 标签集合
  • 记录某用户已点赞哪些内容
  • 共同好友 / 共同标签等集合计算

13.1 基本命令

1
2
3
SADD tags:article:1 redis python database
SMEMBERS tags:article:1
SISMEMBER tags:article:1 redis

13.2 集合运算

1
2
3
4
5
SADD set1 a b c
SADD set2 b c d
SINTER set1 set2
SUNION set1 set2
SDIFF set1 set2

这对“标签交集、权限集合、去重集合”这类场景非常方便。

14. Sorted Set

Sorted Set 是带分数、可排序的集合。

适合:

  • 排行榜
  • 按时间排序的数据
  • 按热度排序的数据

14.1 基本命令

1
2
3
4
ZADD ranking 100 tom 95 alice 88 bob
ZRANGE ranking 0 -1
ZREVRANGE ranking 0 -1
ZSCORE ranking tom

这里每个成员都有一个 score。

14.2 场景理解

例如文章热度榜:

  • article:100 -> 200
  • article:101 -> 150
  • article:102 -> 320

就可以按 score 排序快速拿到前 N 名。

15. Stream

Streams 是 Redis 较强的一类消息结构。

适合:

  • 事件流
  • 多消费者消费
  • 流式 token 传递
  • 日志型消息

在 AI 项目里,它很适合做:

  • Worker 持续写出 token
  • Gateway / FastAPI 持续消费 token
  • 流式返回给前端

15.1 基本命令

写入:

1
XADD chat:stream:1 * token hello

读取:

1
XREAD COUNT 10 STREAMS chat:stream:1 0

15.2 消费组

如果是多个消费者协作,可以用消费组:

1
2
XGROUP CREATE chat:stream:1 group1 0 MKSTREAM
XREADGROUP GROUP group1 consumer1 COUNT 10 STREAMS chat:stream:1 >

15.3 为什么 Streams 比 List 更适合流式场景

因为它支持:

  • 消息 ID
  • 消费组
  • 待确认消息
  • 更像日志流的结构

如果后面你做:

  • SSE
  • token 流输出
  • 异步处理链路

Streams 会比简单 List 更稳一些。

16. 过期时间和 TTL

Redis 很核心的一点就是支持 key 过期。

这在缓存场景里非常重要。

16.1 设置过期

1
SET session:token:abc user_1 EX 3600

表示 1 小时后自动过期。

16.2 查看剩余时间

1
TTL session:token:abc

16.3 为什么过期时间很重要

因为很多数据本来就不该永久保存,例如:

  • 登录态
  • 验证码
  • 缓存结果
  • 限流计数
  • 任务中间状态

如果不设置过期时间,Redis 很容易越堆越大。

17. 持久化

Redis 虽然主要是内存数据库,但也支持持久化。

常见方式:

  • RDB
  • AOF

17.1 RDB

它更像是按时间点保存一份内存快照。

特点:

  • 文件紧凑
  • 恢复快
  • 更适合定期备份

17.2 AOF

它的思路是把写命令按顺序追加记录下来。

特点:

  • 数据恢复通常更完整
  • 文件可能更大
  • 更强调操作日志式恢复

17.3 怎么理解

如果 Redis 里只是放缓存,丢了还能重建,那持久化要求没那么高。

如果 Redis 里存了:

  • 任务状态
  • 流式消息
  • 某些关键临时业务状态

那就要更认真考虑持久化策略。

18. Pub/Sub

Redis 还支持发布订阅。

基本命令:

1
2
SUBSCRIBE news
PUBLISH news "hello"

适合:

  • 简单实时通知
  • 临时消息广播

但它有一个明显特点:

如果订阅者不在线,消息一般不会帮你补回来。

所以在很多需要“可回放、可追踪”的场景里,Streams 通常比 Pub/Sub 更合适。

19. 事务和 Lua

Redis 也支持事务和 Lua 脚本。

19.1 事务

基本写法:

1
2
3
4
MULTI
SET key1 value1
INCR counter
EXEC

适合把一组命令放在一起执行。

19.2 Lua 脚本

Lua 常用于:

  • 多命令原子操作
  • 分布式锁安全释放
  • 把多个步骤封装成一个服务端操作

在实际项目里,Lua 经常是为了解决“先判断再删除”这种竞态问题。

20. 分布式锁

Redis 常被用来做分布式锁。

最常见写法是:

1
SET lock:task_1 request_abc NX EX 30

含义:

  • NX:只有 key 不存在时才设置成功
  • EX 30:30 秒后自动过期

如果返回成功,就表示拿到了锁。

20.1 为什么需要过期时间

因为如果加锁后进程挂了,没有过期时间的话,锁就可能一直不释放。

20.2 为什么不能直接 DEL

因为可能出现这种情况:

  1. 线程 A 拿到锁
  2. 锁过期
  3. 线程 B 拿到同名新锁
  4. 线程 A 这时候再执行 DEL
  5. 把线程 B 的锁删掉了

所以更安全的做法是:

  • 加锁时写入唯一值
  • 解锁时先判断 value 是否还是自己的
  • 判断和删除要放在一个原子操作里,通常用 Lua

20.3 Python 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import uuid

import redis

r = redis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)

lock_key = "lock:task:1"
lock_value = str(uuid.uuid4())

locked = r.set(lock_key, lock_value, nx=True, ex=30)

if locked:
try:
print("do something")
finally:
unlock_script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
r.eval(unlock_script, 1, lock_key, lock_value)

21. 缓存是 Redis 最常见的用途

21.1 最简单的缓存思路

先查 Redis:

  • 有数据就直接返回
  • 没数据再查数据库或调用模型服务
  • 查到结果后再写回 Redis

这就是最常见的缓存模式。

21.2 Python 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import json

import redis

r = redis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)

def get_article(article_id: int):
cache_key = f"article:{article_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)

data = {"id": article_id, "title": "Redis Intro"}
r.set(cache_key, json.dumps(data), ex=300)
return data

21.3 缓存设计里要注意什么

  • 一定要设计 TTL
  • 不要无脑缓存超大对象
  • key 命名要稳定
  • 更新数据库时要考虑缓存同步

22. 缓存穿透、击穿、雪崩

这是 Redis 学习里很常见的三个词。

22.1 缓存穿透

请求的数据本来就不存在:

  • Redis 没有
  • 数据库也没有
  • 每次都打到后端

常见处理:

  • 参数校验
  • 对空结果也做短期缓存
  • 布隆过滤器

22.2 缓存击穿

某个热点 key 失效瞬间,大量请求同时打到后端。

常见处理:

  • 热点 key 不要同时过期
  • 加锁重建缓存
  • 提前刷新

22.3 缓存雪崩

大量 key 在同一时间过期,导致后端压力暴涨。

常见处理:

  • TTL 加随机值
  • 分批过期
  • 多级缓存

23. 限流

Redis 很适合做接口限流,因为:

  • INCR 是原子操作
  • 配合 TTL 很方便

23.1 最简单的固定窗口限流

思路:

  1. 某个用户请求一次就 INCR
  2. 第一次请求时设置过期时间
  3. 超过阈值就拒绝

Python 示例:

1
2
3
4
5
6
7
8
9
10
import redis

r = redis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)

def allow_request(user_id: str, limit: int = 10, window: int = 60) -> bool:
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit

这个模式在:

  • 登录接口
  • 短信发送
  • AI 推理调用

都很常见。

24. Session / 登录态

Redis 也经常用来存登录态或短期会话。

例如:

1
session:token:abc123 -> user_id=1001

然后设置:

1
EXPIRE session:token:abc123 3600

这样用户 1 小时不活跃就自动失效。

25. FastAPI 里怎么用 Redis

FastAPI 项目里,Redis 常见用途有:

  • 缓存接口结果
  • 存验证码
  • 存 session
  • 做限流
  • 存短期任务状态

25.1 一个简单缓存例子

这里用 redis.asyncio 更符合 FastAPI 的异步风格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import json

from fastapi import FastAPI
from redis.asyncio import Redis

app = FastAPI()

redis_client = Redis(
host="127.0.0.1",
port=6379,
db=0,
decode_responses=True,
)

@app.get("/articles/{article_id}")
async def get_article(article_id: int):
cache_key = f"article:{article_id}"
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)

data = {"id": article_id, "title": "Redis Intro"}
await redis_client.set(cache_key, json.dumps(data), ex=300)
return data

补充:

  • redis.asyncio 更适合异步接口
  • 正式项目里通常会统一封装 Redis 客户端
  • 服务关闭时最好把客户端连接关闭掉

25.2 一个简单限流例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from fastapi import FastAPI, HTTPException, Request
from redis.asyncio import Redis

app = FastAPI()
redis_client = Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)

@app.get("/infer")
async def infer(request: Request):
client_ip = request.client.host
key = f"rate_limit:{client_ip}"

current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, 60)

if current > 20:
raise HTTPException(status_code=429, detail="too many requests")

return {"message": "ok"}

25.3 生命周期里管理 Redis 客户端

如果项目规模再大一点,通常会把 Redis 放到 FastAPI 生命周期里初始化和关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from contextlib import asynccontextmanager

from fastapi import FastAPI
from redis.asyncio import Redis

@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.redis = Redis(
host="127.0.0.1",
port=6379,
db=0,
decode_responses=True,
)
yield
await app.state.redis.aclose()

app = FastAPI(lifespan=lifespan)

这样做的好处:

  • 连接初始化位置统一
  • 更方便复用
  • 关闭服务时能正常释放资源

26. Celery 为什么经常配 Redis

你在 Celery.md 里已经接触到了这一点。

Redis 在 Celery 里通常可以做两件事:

  • Broker:存待执行任务消息
  • Result Backend:存任务状态和结果

示例:

1
2
3
4
5
6
7
from celery import Celery

app = Celery(
"demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

这里通常会把逻辑库分开:

  • /1:Broker
  • /2:Backend

这样排查和清理都更方便。

26.1 为什么很多人喜欢 Redis + Celery

因为它足够简单:

  • 环境轻
  • 容易本地启动
  • Python 项目接入快

但也要注意:

  • 不要无限保存任务结果
  • 大结果不要直接堆 Redis
  • 任务状态和业务长期数据要区分

27. AI / RAG 项目里 Redis 的几个实用位置

如果你做的是 RAG 或模型服务,Redis 常放在这些位置:

27.1 热点缓存

例如:

  • 相同 query 的检索结果缓存
  • embedding 结果缓存
  • 频繁访问的文档元数据缓存

27.2 会话上下文缓存

例如:

  • 最近 N 轮聊天记录
  • 临时上下文摘要
  • 会话状态

27.3 任务状态层

例如:

  • task:123:status
  • task:123:progress
  • task:123:result_ref

27.4 流式输出通道

例如:

  • Worker 生成 token 后写入 chat:stream:{task_id}
  • Gateway 或 API 服务持续消费
  • 对外通过 SSE 返回

这个设计非常适合“生成过程持续输出”的场景。

28. Redis Streams 在流式输出里的思路

一个简化流程可以这样理解:

  1. FastAPI 收到对话请求
  2. 把任务提交给 Celery
  3. Worker 调模型生成 token
  4. 每生成一段 token 就 XADD 到某个 Stream
  5. Gateway 或 API 层 XREAD / XREADGROUP
  6. 持续把内容推给前端

例如 Stream key:

1
chat:stream:task_1001

这个模式的好处:

  • 推理和对外接口解耦
  • 可以观察中间过程
  • 可以做断线恢复或补读

29. key 设计建议

Redis 真正到了项目里,key 设计非常重要。

建议:

  • 带业务前缀
  • 带对象类型
  • 带唯一 ID
  • 必要时带版本

例如:

  • user:1001
  • session:token:abc123
  • cache:article:100
  • task:ingest:001:status
  • chat:stream:task_1001

不要这样写:

  • a
  • test
  • temp1

这种 key 后期几乎不可维护。

30. Redis 内存管理要有意识

Redis 很快,但它是以内存为核心的,所以内存管理很重要。

需要注意:

  • 大 key 会拖慢操作
  • 无过期时间的缓存容易无限增长
  • Stream、List、Set 如果不清理会不断膨胀
  • 不要把超大模型结果直接塞进去

对 AI 项目尤其重要:

  • 大文本结果建议存数据库或文件
  • Redis 里尽量存引用、状态、索引、小块结果

30.1 排查时少用危险大命令

实际项目里还要注意一些“能用,但别乱用”的命令。

例如:

  • KEYS *
  • FLUSHDB
  • FLUSHALL

其中:

  • KEYS * 在大库里可能带来明显扫描压力
  • FLUSHDB 会清空当前库
  • FLUSHALL 会清空整个 Redis 所有库

本地学习可以用,但线上环境要非常谨慎。

31. 常见坑

31.1 把 Redis 当永久数据库

Redis 可以持久化,但它更常见的定位还是:

  • 缓存
  • 状态层
  • 高速临时存储

长期核心业务数据还是更适合数据库。

31.2 忘记设置 TTL

这是最常见的问题之一。

很多缓存、验证码、限流 key 如果没有 TTL,会一直堆着不清。

31.3 一个 Redis 库里什么都混着放

例如:

  • 缓存
  • Celery broker
  • Celery backend
  • 会话
  • 任务流

全部都混在同一个逻辑库里,会让排查和维护变得很乱。

至少应该在命名和逻辑库上做一定隔离。

31.4 把超大对象直接塞进 Redis

例如:

  • 整篇长文档
  • 大块模型输出
  • 大量 embedding

这些会明显增加内存压力,也会拖慢网络传输。

31.5 分布式锁写得不安全

只会 SETNX 或只会 DEL,但没有考虑:

  • 锁超时
  • 唯一 value
  • 原子释放

这种写法在并发场景里容易出问题。

31.6 用 Pub/Sub 做必须可靠消费的任务

Pub/Sub 更像广播,不是强消息队列。

如果你要求:

  • 消息可追踪
  • 消费可确认
  • 掉线后能补读

Streams 通常更合适。

32. 一套比较实用的学习顺序

建议按这个顺序掌握:

  1. 先学会连接 Redis 和 redis-cli
  2. 掌握 String、Hash、List、Set、Sorted Set
  3. 学会 TTL 和过期机制
  4. 学会缓存和限流
  5. 学会分布式锁的基本思路
  6. 再看 Pub/Sub 和 Streams
  7. 最后再把 Redis 接到 FastAPI 和 Celery

33. 总结

Redis 的核心价值可以概括成几句话:

  • 它很快,适合高频读写
  • 它支持多种数据结构,不只是一个简单缓存
  • 它天然适合存“短期状态”和“可过期数据”
  • 它很适合做 FastAPI 和 Celery 之间的中间层

在你这套 AI 模型开发学习链路里,可以这样理解它的位置:

  • FastAPI:提供接口
  • Redis:缓存、状态、消息中间层
  • Celery:执行后台异步任务
  • MySQL:长期结构化数据存储

如果后面你继续往“RAG 系统、推理服务、异步任务平台”方向走,Redis 几乎一定会成为一个基础组件。

34. 参考

内存泄漏检测与管理

时间:2026/04/16

关键词:RAII、unique_ptrshared_ptr 循环引用、Sanitizer、Valgrind、资源封装
核心目标:把“谁释放、何时释放、怎么定位泄漏”变成工程上可检查的规则。


1. 什么才叫内存泄漏

最典型的内存泄漏是:

  • 一块堆内存已经没有任何有效路径再访问它
  • 但它也永远不会被释放

例如:

1
2
3
void bad() {
int* p = new int(42);
}

函数结束后,p 没了,这块内存也没人能再 delete
这就是最标准的泄漏。

但工程里还要区分另一类问题:

  • 对象严格来说还“可达”
  • 但长期不释放,内存占用持续上涨

这未必是严格意义上的 leak,但同样会把服务拖垮。


2. 最常见的泄漏来源

2.1 裸 new / delete 配对失败

最常见的问题不是“不会 delete”,而是:

  • 提前 return
  • 中途 throw
  • 多分支路径漏掉释放

2.2 容器里放 owning raw pointer

1
2
std::vector<Foo*> items;
items.push_back(new Foo());

这种写法会把“谁来删”变成记忆题。

2.3 shared_ptr 循环引用

两个对象互相持有 shared_ptr,引用计数永远不会归零。

2.4 C 风格资源没及时封装

比如:

  • FILE*
  • malloc/free
  • socket / fd
  • 第三方库句柄

如果它们在业务代码里裸奔,后面很容易漏掉释放。


3. 第一原则:先别写出会泄漏的代码

现代 C++ 管理泄漏,重点不是“人工记得回收”,而是默认采用不容易泄漏的结构。

优先顺序通常是:

  1. 能值语义就值语义
  2. 能栈对象就栈对象
  3. 必须动态分配时优先 std::unique_ptr
  4. 确实需要共享拥有时才用 std::shared_ptr
  5. 裸指针和引用默认只表达观察,不表达拥有

这背后的核心思想就是 RAII:

  • 对象析构时自动释放资源

只要生命周期跟对象绑在一起,泄漏风险会大幅下降。


4. 一个典型泄漏例子

错误写法:

1
2
3
4
5
6
7
8
9
#include <memory>

struct Widget {};

Widget* create_widget(bool failed) {
Widget* p = new Widget();
if (failed) return nullptr; // 泄漏
return p;
}

问题不在 new 本身,而在:

  • 释放依赖调用路径是否完整

更稳妥的写法:

1
2
3
4
5
6
7
8
9
#include <memory>

struct Widget {};

std::unique_ptr<Widget> create_widget(bool failed) {
auto p = std::make_unique<Widget>();
if (failed) return nullptr;
return p;
}

这样即使中途提前返回,局部对象也会自动清理。


5. 智能指针也不是绝对安全

unique_ptr 很少造成泄漏,真正容易出问题的是 shared_ptr

1
2
3
4
5
6
#include <memory>

struct Node {
std::shared_ptr<Node> next;
std::shared_ptr<Node> prev; // 错:可能形成环
};

如果两个节点互相持有,引用计数就会卡住。

更常见的修正方式是:

1
2
3
4
5
6
#include <memory>

struct Node {
std::shared_ptr<Node> next;
std::weak_ptr<Node> prev; // 观察,不拥有
};

经验上:

  • 形成树状、图状、双向关系时,先主动检查是否存在环
  • “回指”通常更适合 weak_ptr

6. 泄漏和“内存一直涨”不是一回事

下面这些情况不一定是严格意义上的 leak:

  • 全局缓存只增不减
  • std::vector / std::string 容量长期不回收
  • 任务队列积压
  • 对象池尺寸只扩不缩

它们的问题是:

  • 生命周期策略不合理
  • 上限控制缺失

所以排查内存问题时,要先分清两类:

  1. 对象已经不可达,但没释放
  2. 对象还可达,但系统把它留得太久

前者更像 bug,后者更像管理问题,但两者都要处理。


7. 怎么检测泄漏

7.1 先上 Sanitizer

本地开发最常用的办法通常是编译时打开 Sanitizer:

1
2
clang++ -std=c++20 -g -O1 -fno-omit-frame-pointer -fsanitize=address main.cpp -o app
ASAN_OPTIONS=detect_leaks=1 ./app

如果你的工具链把 leak 检测拆开,也可以按需使用:

  • -fsanitize=leak

它的优点是:

  • 定位快
  • 栈回溯清楚
  • 很适合集成到测试里

7.2 再用 Valgrind 看存量问题

1
valgrind --leak-check=full --show-leak-kinds=all ./app

它更慢,但对一些历史代码排查仍然很有价值。

7.3 别只测“正常退出”

很多泄漏只在这些场景出现:

  • 异常路径
  • 超时取消
  • 重试逻辑
  • 长时间运行
  • 高并发压力

所以测试不能只跑正常路径。


8. 工程里的泄漏管理清单

真正有效的治理,通常不是靠某一个工具,而是靠几条长期规则:

  • 新代码默认不写“拥有语义的裸指针”
  • 业务代码里尽量不直接出现 new / delete
  • 工厂接口优先返回 std::unique_ptr
  • 容器里优先放对象或智能指针,不放“需要人工回收”的裸指针
  • 第三方资源在进入系统边界时立即封装成 RAII 类型
  • shared_ptr 关系图需要专门检查循环引用
  • 测试或 CI 中固定开启 Sanitizer 版本
  • 对缓存、对象池、队列设置上限,而不是默认无限增长

这才叫“管理”,不是出了问题再临时抓日志。


9. 一页总结

最值得记住的五条:

  1. 泄漏治理的核心不是“记得释放”,而是明确所有权
  2. 默认优先值语义、栈对象和 RAII
  3. 动态分配优先 unique_ptr,不是裸指针
  4. shared_ptr 最大的风险是循环引用
  5. 用 Sanitizer 和 Valgrind 查问题,不要靠肉眼猜

如果只记一句:

预防内存泄漏最有效的方法,不是手写更多 delete,而是让代码结构默认不需要手写 delete

现代 C++ 实践导读

时间:2026/04/09

这组笔记按“对象语义 -> 内存与所有权 -> 并发 -> 设计模式 -> 网络与协程 -> 常用工具”的顺序整理。
不建议按最早写作顺序读,建议优先按下面的编号顺序走。


1. 推荐阅读顺序

  1. 对象生命周期、特殊成员函数与移动语义
  2. 智能指针与所有权
  3. allocator、自定义内存分配与 pmr
  4. 生产者-消费者模式与阻塞队列
  5. 线程同步消息队列与线程池
  6. 工厂模式、多态与接口设计
  7. 游戏常见设计模式
  8. 对象布局、栈堆与未定义行为
  9. 网络服务基础:TCP 粘包、线程模型与 HTTP(S)
  10. C++20 协程入门与实践
  11. 现代 C++ 常用工具类型
  12. ranges 与 views
  13. 错误处理与 expected、异常设计
  14. 内存泄漏检测与管理

2. 这次整理做了什么

这次整理不只是改编号,还做了四件事:

  1. 把错名、重复和草稿笔记重构成清晰主题
  2. 把空白或过短内容补成可复习版本
  3. 新增一篇缺失但非常常用的工具类型笔记
  4. 补上了 ranges/views、错误处理设计,以及内存泄漏检测与管理这三块现代 C++ 高频主题

3. 如果时间有限

优先看这 5 篇:

  1. 生命周期、特殊成员函数与移动语义
  2. 智能指针与所有权
  3. 生产者-消费者模式与阻塞队列
  4. 线程池与消息队列
  5. 协程入门与实践

这几篇最直接影响现代 C++ 的工程写法。

Linux 高性能服务器编程

时间:2026/04/10

这组笔记当前已经覆盖 UDP、libevent 入门,以及一篇偏工程化的 Linux 服务端进阶专题。
阅读时可以先分清“传输语义”“事件驱动模型”“协议与线程分层”这三条线,再把它们串成完整服务端链路。


目录

1. UDP 通信

内容重点:

  • UDP 与 TCP 的语义差异
  • Linux 下的常用 UDP 接口
  • UDP 服务端的基本工作流
  • 在 UDP 上实现类似 TCP 的可靠传输思路

适合先看,因为它能把:

  • 报文边界
  • 丢包 / 乱序 / 重复
  • 可靠性到底由谁负责

这些底层问题先讲清楚。

2. [1]libevent.md

内容重点:

  • evconnlistener
  • TCP 监听与连接接入
  • bufferevent 的基础概念
  • 一个 echo server 的最小示例

更偏服务端事件驱动入门。

3. [2]libevent.md

内容重点:

  • bufferevent_socket_new
  • bufferevent_socket_connect
  • 回调、水位、超时
  • 一个最小客户端连接示例

更偏“如何基于 libevent 发起连接并管理 I/O”。

4. 高性能服务器编程笔记一

内容重点:

  • epoll 的 LT / ET 与 Reactor
  • 非阻塞 socket 的常见错误处理
  • timerfd / eventfd / signalfd
  • 线程池与 I/O 线程分层
  • HTTP 解析与应用层协议设计
  • Linux 网络调优参数与排障方法

更偏“把一个事件驱动服务端补成可落地工程模型”。


建议阅读顺序

  1. UDP.md
  2. [1]libevent.md
  3. [2]libevent.md
  4. 高性能服务器编程笔记一.md

原因很简单:

  • 先理解传输层语义
  • 再看事件驱动服务端如何接连接
  • 最后看 bufferevent 客户端和缓冲管理
  • 再补 epoll、线程模型、协议与调优这些工程问题

已补充的进阶主题

  • epoll 的 LT / ET 模式与 Reactor
  • 非阻塞 socket 的错误处理
  • timerfd / eventfd / signalfd
  • 线程池与 I/O 线程分层
  • HTTP 解析与应用层协议设计
  • Linux 网络调优参数与排障方法

后续仍可补的主题

  • sendfile / splice / mmap 与零拷贝
  • SO_REUSEPORT、RSS / RPS / XPS 与多队列
  • io_uring 与传统 Reactor 的差异
  • 数据库 / 缓存 / RPC 接入后的背压治理