楚天

惟楚有材,于斯为盛

FastAPI 学习笔记

1. FastAPI 是什么

FastAPI 是一个基于 Python 类型注解构建的 Web API 框架,适合用来开发:

  • 后端接口服务
  • AI 模型推理接口
  • 文件上传和处理接口
  • 内部微服务
  • 需要自动生成接口文档的项目

它的几个核心特点:

  • 开发效率高,写法接近普通 Python 函数
  • 自动参数校验
  • 自动生成 OpenAPI 文档
  • 支持异步
  • 和 Pydantic 配合很好,适合结构化数据处理

FastAPI 最实用的地方在于:你写的是普通 Python 函数,但它会顺手帮你把 HTTP 请求、参数校验和接口文档这些事情一起处理掉。

2. 为什么很多 AI 项目喜欢用 FastAPI

在 AI 项目里,FastAPI 很常见,因为它很适合做这些事情:

  • 暴露模型推理接口
  • 上传文档、图片、音频
  • 提供任务提交和状态查询接口
  • 给前端或其他服务提供统一 API
  • 和 Celery、Redis、数据库组合成一套服务

尤其是下面这种场景:

  1. 前端上传文件
  2. FastAPI 接收请求
  3. 把耗时任务交给 Celery
  4. 再通过 FastAPI 提供任务状态查询接口

这类架构在 RAG、推理平台、数据处理平台里非常常见。

3. 安装

最基础的安装:

1
pip install fastapi uvicorn

如果要做文件上传,还需要:

1
pip install python-multipart

说明:

  • fastapi:框架本身
  • uvicorn:ASGI Server,用来启动服务
  • python-multipart:处理表单和文件上传

4. 最小可运行示例

文件:main.py

1
2
3
4
5
6
7
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
return {"message": "hello fastapi"}

启动命令:

1
uvicorn main:app --reload

含义:

  • main:Python 文件名 main.py
  • app:文件中的 FastAPI 实例
  • --reload:代码变更后自动重启,适合开发环境

启动后可以访问:

  • http://127.0.0.1:8000/
  • http://127.0.0.1:8000/docs
  • http://127.0.0.1:8000/redoc

其中:

  • /docs:Swagger UI
  • /redoc:ReDoc 文档页面

5. FastAPI 的基本工作方式

可以把一个接口理解成:

  1. 定义一个路径
  2. 指定请求方法,比如 GETPOST
  3. 声明这个接口需要哪些参数
  4. FastAPI 自动帮你解析参数并做校验
  5. 函数返回值自动转换为 JSON 响应

示例:

1
2
3
4
5
6
7
from fastapi import FastAPI

app = FastAPI()

@app.get("/hello")
def hello(name: str):
return {"message": f"hello {name}"}

访问:

1
/hello?name=tom

返回:

1
{"message": "hello tom"}

这里的 name: str 会被 FastAPI 识别为查询参数。

6. 路由基础

FastAPI 支持常见的 HTTP 方法:

  • @app.get()
  • @app.post()
  • @app.put()
  • @app.delete()
  • @app.patch()

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi import FastAPI

app = FastAPI()

@app.get("/users")
def get_users():
return {"action": "list users"}

@app.post("/users")
def create_user():
return {"action": "create user"}

@app.get("/users/{user_id}")
def get_user(user_id: int):
return {"user_id": user_id}

说明:

  • /users 通常表示资源集合
  • /users/{user_id} 表示某个具体资源
  • user_id: int 会自动做类型校验

如果路径参数类型不对,FastAPI 会直接返回校验错误。

7. 路径参数、查询参数、请求体

这是 FastAPI 最重要的基础之一。

7.1 路径参数

路径里写在 {} 中的内容就是路径参数。

1
2
3
@app.get("/items/{item_id}")
def get_item(item_id: int):
return {"item_id": item_id}

例如:

1
/items/100

这里的 item_id 就是路径参数。

7.2 查询参数

函数里那些不在路径中的普通基础类型参数,通常会被当成查询参数。

1
2
3
@app.get("/items")
def list_items(page: int = 1, size: int = 10):
return {"page": page, "size": size}

例如:

1
/items?page=2&size=20

7.3 请求体

如果参数是 Pydantic 模型,FastAPI 会把它当成请求体。

1
2
3
4
5
6
7
8
9
from pydantic import BaseModel

class UserCreate(BaseModel):
name: str
age: int

@app.post("/users")
def create_user(user: UserCreate):
return user

请求:

1
2
3
4
{
"name": "alice",
"age": 18
}

结论:

  • 路径里的参数是路径参数
  • 简单类型参数通常是查询参数
  • Pydantic 模型通常是请求体

8. Pydantic 模型

FastAPI 的数据校验非常依赖 Pydantic。

最常见的用途:

  • 定义请求体结构
  • 定义返回体结构
  • 做字段校验
  • 约束字段类型

8.1 基本示例

1
2
3
4
5
6
from pydantic import BaseModel, Field

class ArticleCreate(BaseModel):
title: str = Field(min_length=1, max_length=100)
content: str
author: str

这里的好处是:

  • 字段缺失会报错
  • 类型不对会报错
  • 长度不满足约束会报错

8.2 作为返回模型

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class UserOut(BaseModel):
id: int
name: str

@app.get("/users/{user_id}", response_model=UserOut)
def get_user(user_id: int):
return {"id": user_id, "name": "tom", "password": "secret"}

返回给客户端时,password 这类不在 response_model 中的字段会被过滤掉。

这非常适合做:

  • 隐藏敏感字段
  • 固定接口返回结构
  • 保证前后端契约一致

9. defasync def 怎么选

这是 FastAPI 初学时很容易混淆的点。

9.1 用 async def 的情况

适合:

  • 调用异步数据库客户端
  • 调用异步 HTTP 客户端
  • WebSocket
  • 其他真正支持异步的 IO 场景

9.2 用普通 def 的情况

适合:

  • 普通同步逻辑
  • CPU 密集型计算
  • 调用同步库
  • 本来就不是异步的代码

注意:

async def 不是“更快”的意思,它只是更适合异步 IO。

如果你在 async def 里直接跑很重的同步任务,照样会阻塞。

对于重任务,通常应该交给:

  • Celery
  • 任务队列
  • 独立 Worker

10. 请求参数的更明确写法

FastAPI 虽然能自动推断参数来源,但实际项目里更推荐写明确一些。

可以使用:

  • Path
  • Query
  • Body

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi import Body, FastAPI, Path, Query

app = FastAPI()

@app.post("/items/{item_id}")
def update_item(
item_id: int = Path(..., ge=1),
keyword: str | None = Query(default=None, min_length=1),
data: dict = Body(...),
):
return {
"item_id": item_id,
"keyword": keyword,
"data": data,
}

这里:

  • Path(..., ge=1) 表示路径参数必须大于等于 1
  • Query(...) 用来描述查询参数校验
  • Body(...) 明确说明这个参数来自请求体

这种写法更清晰,也更适合团队协作。

11. APIRouter 路由拆分

项目一大,通常不会把所有接口都写在一个 main.py 里。

FastAPI 推荐使用 APIRouter 拆模块。

11.1 基本示例

文件:routers/health.py

1
2
3
4
5
6
7
from fastapi import APIRouter

router = APIRouter(prefix="/health", tags=["health"])

@router.get("")
def health_check():
return {"status": "ok"}

主文件:main.py

1
2
3
4
5
6
from fastapi import FastAPI

from routers.health import router as health_router

app = FastAPI()
app.include_router(health_router)

访问路径:

1
/health

11.2 为什么要拆 Router

好处:

  • 按业务模块组织代码
  • 主入口文件更简洁
  • 适合多人协作
  • 后续做版本管理更方便

常见拆分方式:

  • routers/user.py
  • routers/auth.py
  • routers/files.py
  • routers/tasks.py

12. 路由前缀和 API 版本管理

你原来的笔记重点写的是这部分,这里整理成更清晰的版本。

12.1 统一加前缀

如果整个项目都希望走 /api/v1 前缀,可以在 include_router() 时统一加上。

1
2
3
4
5
6
7
8
from fastapi import FastAPI

from routers.health import router as health_router

app = FastAPI()

root_prefix = "/api/v1"
app.include_router(health_router, prefix=root_prefix)

如果 health_router 本身是:

1
router = APIRouter(prefix="/health", tags=["health"])

那么最终路径就是:

1
/api/v1/health

12.2 在 Router 上直接带业务前缀

例如:

1
router = APIRouter(prefix="/users", tags=["users"])

再在主应用里挂上版本前缀:

1
app.include_router(user_router, prefix="/api/v1")

最后效果就是:

1
/api/v1/users

12.3 多版本应用挂载

如果你想同时保留多个版本,也可以挂多个 FastAPI 子应用。

1
2
3
4
5
6
7
8
9
from fastapi import FastAPI

app = FastAPI()

app_v1 = FastAPI(title="API V1")
app_v2 = FastAPI(title="API V2")

app.mount("/api/v1", app_v1)
app.mount("/api/v2", app_v2)

这种方式适合:

  • 两个版本差异非常大
  • 想彻底隔离文档和路由
  • 历史接口需要长期兼容

但如果只是小规模版本演进,通常直接用:

  • include_router(..., prefix="/api/v1")
  • include_router(..., prefix="/api/v2")

就够了。

12.4 版本管理建议

实际项目里一般这样做:

  • 先按模块拆 Router
  • 再统一加版本前缀
  • 不要一开始就把版本设计得过于复杂

多数项目初期使用:

1
/api/v1/xxx

已经足够。

13. 依赖注入 Depends

Depends 是 FastAPI 很重要的能力。

它的核心作用是:

  • 把公共逻辑抽出来复用
  • 给接口注入数据库连接、用户信息、分页参数等
  • 让路由函数更干净

13.1 基本示例

1
2
3
4
5
6
7
8
9
10
from fastapi import Depends, FastAPI

app = FastAPI()

def common_pagination(page: int = 1, size: int = 10):
return {"page": page, "size": size}

@app.get("/articles")
def list_articles(pagination=Depends(common_pagination)):
return pagination

说明:

  • pagesize 可以在多个接口中复用
  • 路由函数里只拿最终整理好的结果

13.2 常见依赖场景

  • 获取当前登录用户
  • 获取数据库 Session
  • 提取分页参数
  • 权限校验
  • 校验请求头

例如认证场景:

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import Depends, FastAPI, Header, HTTPException

app = FastAPI()

def verify_token(x_token: str = Header(...)):
if x_token != "demo-token":
raise HTTPException(status_code=401, detail="invalid token")
return x_token

@app.get("/secure")
def secure_api(token: str = Depends(verify_token)):
return {"token": token}

14. 异常处理

FastAPI 常用 HTTPException 抛业务错误。

1
2
3
4
5
6
7
8
9
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(user_id: int):
if user_id != 1:
raise HTTPException(status_code=404, detail="user not found")
return {"id": 1, "name": "tom"}

适合用在:

  • 资源不存在
  • 权限不足
  • 参数不合法
  • 业务状态不满足要求

补充理解:

  • 参数校验错误通常由 FastAPI 自动处理
  • 业务逻辑错误通常由你主动抛 HTTPException

15. 文件上传

这在 AI 项目里很常见,比如:

  • 上传 PDF
  • 上传图片
  • 上传音频
  • 上传知识库文档

15.1 单文件上传

1
2
3
4
5
6
7
8
9
10
from fastapi import FastAPI, File, UploadFile

app = FastAPI()

@app.post("/upload")
def upload_file(file: UploadFile = File(...)):
return {
"filename": file.filename,
"content_type": file.content_type,
}

15.2 保存到本地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pathlib import Path
import shutil

from fastapi import FastAPI, File, UploadFile

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/upload")
def upload_file(file: UploadFile = File(...)):
save_path = UPLOAD_DIR / file.filename

with save_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)

return {"path": str(save_path)}

15.3 实战建议

  • 不要直接相信上传文件名
  • 最好自己生成唯一文件名
  • 大文件处理不要阻塞接口太久
  • 上传成功后可以把处理任务交给 Celery

16. 表单与文件混合提交

有时接口既要文件,也要普通字段,比如:

  • 文件 + 文档类型
  • 图片 + 用户 ID
  • 音频 + 语言参数

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI, File, Form, UploadFile

app = FastAPI()

@app.post("/documents")
def create_document(
file: UploadFile = File(...),
doc_type: str = Form(...),
):
return {
"filename": file.filename,
"doc_type": doc_type,
}

17. 中间件和 CORS

17.1 中间件是什么

中间件是在请求进入路由前、响应返回客户端前统一插入的一层处理逻辑。

常见用途:

  • 记录请求日志
  • 统计耗时
  • 增加追踪 ID
  • 统一鉴权

17.2 CORS

如果前后端分离,浏览器经常会遇到跨域问题。

FastAPI 通常这样加:

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

开发环境常见,生产环境一般要更严格地限制来源。

18. 生命周期事件

很多项目需要在服务启动时做初始化,比如:

  • 建立连接池
  • 加载配置
  • 初始化日志
  • 加载模型

FastAPI 支持应用生命周期管理。

1
2
3
4
5
6
7
8
9
10
11
from contextlib import asynccontextmanager

from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
print("service starting")
yield
print("service stopping")

app = FastAPI(lifespan=lifespan)

如果项目里有大模型、向量库客户端、数据库连接池,这一层会比较重要。

19. BackgroundTasks 和 Celery 的区别

这点和你刚整理的 Celery.md 是连着的。

19.1 BackgroundTasks

适合:

  • 很轻量的后台操作
  • 和当前 Web 进程生命周期绑定
  • 不需要重试
  • 不需要独立扩容

示例:

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_log(message: str):
with open("app.log", "a", encoding="utf-8") as f:
f.write(message + "\n")

@app.post("/notify")
def notify(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "new request")
return {"message": "accepted"}

19.2 Celery

更适合:

  • 长耗时任务
  • 文件处理
  • 模型推理
  • 批量任务
  • 需要失败重试
  • 需要多机扩展

一句话:

轻任务用 BackgroundTasks,重任务用 Celery。

20. FastAPI 和 Celery 的典型配合方式

在 AI 项目里,比较常见的结构是:

  1. FastAPI 负责接收请求
  2. 接口做参数校验和鉴权
  3. 把耗时任务提交给 Celery
  4. 返回 task_id
  5. 前端轮询任务状态

例如:

1
2
3
4
5
6
7
8
9
10
from fastapi import FastAPI

from tasks import process_document

app = FastAPI()

@app.post("/tasks/documents")
def create_document_task(file_path: str):
task = process_document.delay(file_path)
return {"task_id": task.id}

这个思路尤其适合:

  • 文档解析
  • embedding 计算
  • 批量摘要
  • 图片处理
  • 音频转写

21. 响应模型和状态码

建议在正式项目里尽量把返回结构固定下来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi import FastAPI, status
from pydantic import BaseModel

app = FastAPI()

class UserCreate(BaseModel):
name: str

class UserOut(BaseModel):
id: int
name: str

@app.post("/users", response_model=UserOut, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate):
return {"id": 1, "name": user.name}

这样做的好处:

  • 文档更清晰
  • 前端更好联调
  • 返回结构更稳定

22. 测试

FastAPI 支持用 TestClient 做接口测试。

1
2
3
4
5
6
7
8
9
10
from fastapi.testclient import TestClient

from main import app

client = TestClient(app)

def test_read_root():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "hello fastapi"}

这个对接口开发很有用,特别是:

  • 改接口时防止回归
  • 验证参数校验
  • 验证权限逻辑

23. FastAPI 项目的工程结构

当项目从 demo 变成一个后端服务时,最重要的不是目录多,而是每一层职责清楚。

FastAPI 官方文档里会用 app/main.pyapp/dependencies.pyapp/routers/ 这种结构说明大型应用拆分。实际项目里可以在这个基础上继续细分成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
project/
├── app/
│ ├── main.py
│ ├── api/
│ │ └── v1/
│ │ ├── router.py
│ │ └── endpoints/
│ │ ├── health.py
│ │ ├── documents.py
│ │ ├── chat.py
│ │ └── tasks.py
│ ├── core/
│ │ ├── config.py
│ │ ├── logging.py
│ │ ├── security.py
│ │ └── exceptions.py
│ ├── schemas/
│ │ ├── document.py
│ │ ├── chat.py
│ │ └── task.py
│ ├── models/
│ │ ├── document.py
│ │ ├── chunk.py
│ │ └── message.py
│ ├── crud/
│ │ ├── document.py
│ │ └── task.py
│ ├── services/
│ │ ├── document_service.py
│ │ ├── rag_service.py
│ │ ├── embedding_service.py
│ │ └── llm_service.py
│ ├── clients/
│ │ ├── redis_client.py
│ │ ├── vector_store.py
│ │ └── llm_client.py
│ ├── db/
│ │ ├── session.py
│ │ └── base.py
│ └── workers/
│ ├── celery_app.py
│ └── tasks.py
├── tests/
│ ├── test_documents.py
│ └── test_chat.py
└── requirements.txt

不一定每个项目都要这么全。目录结构应该跟项目复杂度匹配,小项目可以先从 main.py + routers + schemas + services 开始。

23.1 各目录负责什么

  • main.py:应用入口,只负责创建 FastAPI 实例、注册中间件、注册路由、声明生命周期逻辑。
  • api/v1/router.py:汇总某个 API 版本下的所有 router,例如 /documents/chat/tasks
  • api/v1/endpoints/:HTTP 接口层,只处理请求参数、依赖注入、状态码和响应模型,不直接堆复杂业务。
  • schemas/:Pydantic 模型,定义请求体、返回体和接口契约。
  • models/:数据库 ORM 模型,对应 MySQL 表结构。
  • crud/:数据库读写封装,只做增删改查,不写复杂业务流程。
  • services/:业务逻辑层,负责组织多个 crudclient、算法模块完成一个完整业务动作。
  • clients/:外部服务或基础设施客户端,例如 Redis、向量库、LLM 服务、对象存储。
  • core/:全局配置、日志、安全、异常处理、公共常量。
  • workers/:Celery 应用和后台任务,处理文档解析、embedding、索引构建等耗时流程。
  • tests/:接口测试、业务测试、E2E 测试。

最关键的原则:

Router 不写重业务,Service 不关心 HTTP,CRUD 不关心业务流程。

23.2 一次请求在结构里的流动

以创建文档任务为例:

1
2
3
4
5
6
7
8
9
Client
-> FastAPI middleware
-> api/v1/endpoints/documents.py
-> Depends 获取配置、数据库 Session、用户信息
-> schemas/document.py 校验请求体
-> services/document_service.py 组织业务
-> crud/document.py 写入文档记录
-> workers/tasks.py 提交 Celery 任务
-> 返回 task_id

接口函数应该尽量薄,例如:

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import APIRouter, Depends, UploadFile

from app.db.session import get_db
from app.schemas.document import DocumentUploadResponse
from app.services.document_service import DocumentService

router = APIRouter(prefix="/documents", tags=["documents"])

@router.post("", response_model=DocumentUploadResponse)
def upload_document(file: UploadFile, db=Depends(get_db)):
service = DocumentService(db)
return service.create_ingest_task(file)

这个接口只做三件事:

  • 接收 HTTP 请求
  • 通过 Depends 拿到依赖
  • 调用 service 返回结果

文件保存、数据库写入、任务提交、错误处理都应该下沉到 service 或更底层。

23.3 main.py 应该保持很薄

main.py 更像“装配入口”,不要把业务逻辑都塞进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.api.v1.router import api_router
from app.core.config import settings

def create_app() -> FastAPI:
app = FastAPI(title=settings.app_name)

app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

app.include_router(api_router, prefix="/api/v1")

return app

app = create_app()

好处:

  • 应用创建逻辑清楚
  • 测试时可以复用 create_app()
  • 业务模块不会和框架入口互相缠住

23.4 API Router 汇总方式

可以用一个 api/v1/router.py 统一挂载各业务模块:

1
2
3
4
5
6
7
8
9
from fastapi import APIRouter

from app.api.v1.endpoints import chat, documents, health, tasks

api_router = APIRouter()
api_router.include_router(health.router)
api_router.include_router(documents.router)
api_router.include_router(tasks.router)
api_router.include_router(chat.router)

这样 main.py 只需要引入一个 api_router,不会随着接口数量变多而越来越乱。

23.5 配置层 core/config.py

正式项目不要把数据库地址、Redis 地址、模型地址直接写死在业务代码里。更好的做法是用配置对象统一管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env")

app_name: str = "rag-api"
mysql_url: str
redis_url: str
llm_base_url: str
embedding_model: str = "KaLM-Embedding-0.5B"
cors_origins: list[str] = ["http://localhost:3000"]

settings = Settings()

这样可以做到:

  • 本地、测试、生产环境分开配置
  • 敏感信息不写进代码
  • 配置类型可以被 Pydantic 校验

23.6 结合 RAG 项目的推荐结构

如果是你的 RAG 文档检索项目,可以这样对应:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
接口层:
- documents.py:上传文档、查询文档列表、删除文档
- tasks.py:查询 Celery 任务状态和进度
- chat.py:创建会话、提交问题、返回回答和 citations
- health.py:健康检查和依赖状态检查

业务层:
- document_service.py:保存文件、创建文档记录、提交 ingest 任务
- rag_service.py:Top-K 检索、上下文拼接、调用 LLM、保存 message/citation
- embedding_service.py:文本向量化、批量 embedding
- index_service.py:FAISS 索引构建、加载、保存

基础设施层:
- db/session.py:MySQL Session
- clients/redis_client.py:Redis 连接
- clients/llm_client.py:OpenAI-compatible LLM 调用
- clients/vector_store.py:FAISS 检索封装
- workers/tasks.py:文档解析、chunk、embedding、建索引

RAG 链路可以这样记:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
上传接口
-> document_service 创建文档和任务
-> Celery worker 解析文档
-> chunk 切片
-> embedding_service 生成向量
-> index_service 写入 FAISS
-> task 状态更新

问答接口
-> rag_service 接收问题
-> vector_store Top-K 检索
-> 拼接上下文和 Prompt
-> llm_client 调用模型
-> 保存 answer 和 citation
-> 返回给前端

23.7 什么时候需要拆得更细

如果项目只有几个接口:

1
2
3
4
main.py
routers/
schemas/
services/

就已经够用。

当出现下面情况时,再继续拆:

  • 接口超过十几个
  • 多个接口复用同一批业务逻辑
  • 数据库表开始变多
  • 出现 Celery、Redis、LLM、向量库等外部依赖
  • 测试变得难写
  • main.py 或某个 router 文件超过几百行

不要为了“看起来专业”一开始就堆很多目录。目录结构的目的不是复杂,而是让业务边界清楚。

24. 常见坑

24.1 把所有代码都写在一个文件里

初学时能跑,项目一大就很难维护。

建议尽快学会:

  • APIRouter
  • 模块拆分
  • 业务逻辑下沉到 service 层

24.2 在 async def 里跑重同步任务

这会阻塞事件循环。

如果是:

  • 长时间计算
  • 文件处理
  • 模型推理

通常不应该直接堆在接口函数里。

24.3 请求体验证和返回结构不统一

如果不定义 Pydantic 模型,接口会越来越乱。

建议:

  • 请求体尽量用模型
  • 返回值尽量用 response_model

24.4 忽略文件上传安全问题

例如:

  • 直接使用用户传上来的文件名
  • 不限制文件类型
  • 不限制文件大小

这些在正式项目里都容易出问题。

24.5 把 FastAPI 当成“万能后台线程框架”

FastAPI 主要是 Web API 框架,不是专门的任务调度系统。

如果任务已经明显是:

  • 重任务
  • 批处理
  • 需要排队
  • 需要重试

就应该交给 Celery 之类的任务系统。

25. 一套比较实用的学习顺序

建议按这个顺序掌握:

  1. 先跑通最小示例
  2. 理解路由、路径参数、查询参数
  3. 学会用 Pydantic 定义请求体和返回体
  4. 学会拆 APIRouter
  5. 学会 Depends
  6. 学会文件上传
  7. 学会 BackgroundTasks
  8. 最后再接入 Celery、数据库、鉴权

26. 总结

FastAPI 的核心价值在于:

  • 用很少的代码快速暴露 API
  • 自动完成参数解析和校验
  • 自动生成接口文档
  • 很适合和 Pydantic、Celery、Redis、数据库组合

对于 AI 项目,可以把它理解成:

  • 对外接口入口
  • 请求校验层
  • 文件上传入口
  • 任务分发入口
  • 状态查询入口

如果你后面要继续整理 AI 模型开发相关知识点,FastAPI、Redis、Celery 这三份笔记其实可以看成一条链:

  • FastAPI:接请求
  • Redis:做缓存 / 消息中间件
  • Celery:处理后台任务

27. 参考

Qwen 模型演进笔记

1. 先说明“最新模型”指的是谁

截至 2026-04-20,如果说“Qwen 最新模型”,其实要分两类看:

  • 最新通用 API 旗舰模型:Qwen3.6-Plus
    发布时间:2026-04-02
  • 最新开源主力模型:Qwen3.6-35B-A3B
    发布时间:2026-04-17

如果你更关心:

  • 云端调用、Agent、编程能力:重点看 Qwen3.6-Plus
  • 本地部署、开源权重、性价比:重点看 Qwen3.6-35B-A3B

2. Qwen2.5 是什么

Qwen2.5 是 Qwen 在 2024-09-19 发布的一代重要开源模型系列。

这一代的定位可以理解成:

把“通用聊天模型”真正做到了更稳、更全、更适合工程落地。

Qwen2.5 这一代包含:

  • 通用语言模型:Qwen2.5
  • 代码模型:Qwen2.5-Coder
  • 数学模型:Qwen2.5-Math

开源尺寸覆盖比较广:

  • 0.5B
  • 1.5B
  • 3B
  • 7B
  • 14B
  • 32B
  • 72B

3. Qwen2.5 的主要改进点

根据官方介绍,Qwen2.5 相比 Qwen2 的核心提升主要在下面几块:

3.1 知识、代码、数学能力更强

官方提到 Qwen2.5 基于最多 18T tokens 的大规模数据训练,相比 Qwen2:

  • 知识能力更强
  • 代码能力提升明显
  • 数学能力提升明显

这说明 Qwen2.5 不只是“能聊天”,而是开始更像一个泛用底座模型。

3.2 指令跟随更稳

Qwen2.5 一个很实用的提升是:

  • 更能听懂要求
  • 更能按要求输出
  • 对不同 system prompt 更稳

这在工程里很重要,因为很多模型不是“不会做”,而是:

  • 格式不稳
  • 角色设定容易跑偏
  • 提示词一换风格就漂

Qwen2.5 在这方面明显更适合做产品。

3.3 结构化输出更好

Qwen2.5 官方特别强调了这几点:

  • 更理解结构化数据
  • 更擅长表格理解
  • 更擅长输出结构化结果
  • 特别是 JSON 输出更可靠

这个优点对工程很关键,因为很多业务不是只要一段自然语言,而是要:

  • JSON
  • 表单字段
  • 工具调用参数
  • SQL / API 请求体

3.4 长文本生成更实用

Qwen2.5 支持:

  • 最长 128K 上下文
  • 最多 8K 生成长度

对长文总结、长对话、多轮问答、文档问答更友好。

3.5 多语言能力更强

Qwen2.5 支持 29+ 种语言,继续保持中英双强,同时兼顾多语种能力。

3.6 专项模型更成熟

这一代不只是通用模型升级,专项模型也更成体系:

  • Qwen2.5-Coder:代码相关数据更多,适合编程助手
  • Qwen2.5-Math:支持中英文数学问题,并引入 CoTPoTTIR

这意味着:

Qwen2.5 开始不是“一个模型打天下”,而是形成了通用底座 + 专项模型的完整产品线。


4. Qwen2.5 的优点

如果从“开发者和部署者”的角度看,Qwen2.5 的优点主要有这些:

  • 尺寸覆盖广,小到 0.5B,大到 72B,方便按机器选型
  • 作为 dense model,结构相对直观,部署门槛低
  • 128K 上下文对很多 RAG 和长文场景已经够用
  • 指令跟随和 JSON 输出更稳,适合业务接入
  • Qwen2.5-CoderQwen2.5-Math 让专项任务更好选型
  • 已经很好适配 TransformersvLLMOllama

一句话概括:

Qwen2.5 的强项是“稳、全、好落地”,非常适合作为通用开源基座。


5. 从 Qwen2.5 到最新模型,发生了什么变化

如果把后续几代串起来看,Qwen 的升级路线很清楚:

Qwen2.5

  • 重点是把通用基础能力做强
  • 强化结构化输出、长文本、代码、数学

Qwen3

  • 重点是把“推理模式 + 快速模式”融合
  • 开始明显强化 agent 和 reasoning 路线

Qwen3.5

  • 重点转向“原生多模态 Agent”
  • 不再只是文本模型,而是朝着文字、图像、音频、视频统一理解发展

Qwen3.6

  • 重点是把 Agent 真正往实战推进
  • 明显强化编程 Agent、工具调用、长链路任务执行、多模态推理

一句话看演进:

Qwen2.5 更像一个强通用模型,Qwen3.x 更像朝“能思考、能看、能调用工具、能完成任务”的 Agent 底座在走。


6. Qwen3:相对 Qwen2.5 的关键升级

Qwen3 发布于 2025-04-29,它是 Qwen 从“强聊天模型”走向“推理模型 + Agent 模型”的关键一代。

6.1 Hybrid Thinking Modes

Qwen3 引入了一个很重要的设计:

  • Thinking Mode
  • Non-Thinking Mode

也就是:

  • 复杂问题时,可以多想一会
  • 简单问题时,可以直接快速回答

这个设计的意义很大,因为它开始把“推理预算”显式化。

6.2 训练数据规模显著提升

Qwen3 官方提到:

  • Qwen2.5:约 18T tokens
  • Qwen3:约 36T tokens

也就是差不多翻倍。

6.3 多语言支持明显扩展

Qwen3 支持 119 种语言和方言,相比 Qwen2.5 的 29+ 更进一步。

6.4 Agent 和编码能力更强

官方明确提到:

  • 强化了 coding 和 agentic capabilities
  • 增强了对 MCP 的支持

这说明 Qwen3 不只是问答模型,而是开始面向:

  • 工具调用
  • 环境交互
  • 多步任务执行

6.5 MoE 带来更好的效率

Qwen3 这一代不仅有 dense 模型,也有 MoE 模型:

  • Qwen3-30B-A3B
  • Qwen3-235B-A22B

MoE 的意义是:

总参数很大,但每次推理只激活一部分参数,从而兼顾能力和成本。

官方还提到:

  • Qwen3-4B 可以逼近 Qwen2.5-72B-Instruct
  • Qwen3 的 MoE 模型可以用更少激活参数达到接近 Qwen2.5 dense 模型的效果

7. Qwen3.5:相对 Qwen2.5 / Qwen3 的关键升级

Qwen3.5 发布于 2026-02-17

这一代最重要的变化不是单点指标,而是方向变了:

从文本大模型,进一步走向“原生多模态 Agent”。

7.1 原生多模态

官方将 Qwen3.5-397B-A17B 定义为:

  • native vision-language model

也就是说它不是简单地“LLM 外接视觉模块”,而是更强调模型本身对多模态信息的统一处理。

7.2 架构更先进

Qwen3.5 采用:

  • linear attention
  • Gated Delta Networks
  • sparse MoE

官方给出的核心信息是:

  • 总参数 397B
  • 每次前向仅激活 17B

这类架构的目标很明确:

  • 降低推理成本
  • 提高长上下文和多模态处理效率

7.3 语言覆盖更大

Qwen3.5 把语言和方言支持从 119 扩展到 201

这意味着它更适合:

  • 全球化产品
  • 多语检索
  • 多语客服
  • 多语 Agent

7.4 Hosted 版本更适合做 Agent

官方提到 Qwen3.5-Plus 具备:

  • 默认 1M context window
  • 官方内置工具
  • adaptive tool use

这和 Qwen2.5 的差别已经非常明显了。

Qwen2.5 还是:

  • 更偏“问答 / 聊天 / 通用推理”

而 Qwen3.5-Plus 开始变成:

  • 更偏“能理解环境、能调用工具、能在长上下文里持续工作”

8. Qwen3.6-Plus:截至 2026-04-20 的最新 API 旗舰

Qwen3.6-Plus 发布于 2026-04-02

如果你问“现在最强的 Qwen 云端 Agent 模型是谁”,答案基本就是它。

8.1 相比 Qwen3.5-Plus 的主要改进

官方给出的核心升级点非常明确:

  • 编程 Agent 能力显著增强
  • 多模态感知和推理更强
  • 默认 1M 上下文
  • 更稳定、更可靠,更面向真实开发者场景

8.2 它最强的地方是什么

Qwen3.6-Plus 最大的变化不是“普通聊天更会聊”,而是:

  • 更会写代码
  • 更会改代码
  • 更会处理仓库级任务
  • 更会终端操作和工具调用
  • 更适合长链路、多步骤任务

官方原话对应的方向大致可以概括为:

  • reasoning
  • memory
  • execution

这三个能力被整合得更紧了。

8.3 它的优点

从实际使用角度看,Qwen3.6-Plus 的优点主要是:

  • 更强的 Agentic Coding
  • 更适合复杂仓库理解
  • 更适合前端开发和自动化任务
  • 更适合长任务规划
  • 更适合工具调用和多轮任务执行
  • 更适合多模态输入场景

如果你要做的是:

  • AI 编程助手
  • 自动化开发 Agent
  • Repo 级代码分析
  • 多步工作流 Agent

那 Qwen3.6-Plus 的定位已经明显高于 Qwen2.5。


9. Qwen3.6-35B-A3B:截至 2026-04-20 的最新开源主力

Qwen3.6-35B-A3B 发布于 2026-04-17

这是一款很值得关注的模型,因为它很能代表现在开源模型的方向:

用更少的激活参数,做出更强的 Agent 和编码能力。

9.1 它的核心特点

  • 35B 总参数
  • 3B 激活参数
  • MoE 架构
  • 原生多模态
  • 兼顾 thinkingnon-thinking

9.2 相比上一代的改进

官方强调它:

  • 大幅超过 Qwen3.5-35B-A3B
  • 在 agentic coding 上提升明显
  • 能与更大的 dense 模型竞争

尤其适合:

  • 本地部署
  • 开源私有化
  • 成本敏感场景
  • 想要较强代码 Agent 能力,但又不想上超大模型

9.3 它的优点

  • 开源,适合自部署
  • 激活参数低,推理成本相对更友好
  • 代码 Agent 能力强
  • 多模态能力也不错
  • 能对接 OpenClawClaude CodeQwen Code
  • 支持 preserve_thinking,更适合连续 Agent 任务

一句话概括:

如果 Qwen3.6-Plus 是云端旗舰,那么 Qwen3.6-35B-A3B 更像是“开源部署里很有竞争力的一把刀”。


10. Qwen2.5 和最新模型怎么对比

维度 Qwen2.5 Qwen3.6-Plus / Qwen3.6-35B-A3B
主要定位 通用基础模型 Agent 化、编程化、多模态化
核心优势 稳定、通用、结构化输出好 会思考、会调用工具、会执行复杂任务
上下文 最高 128K Hosted 版默认可到 1M
输出风格 更适合问答、结构化响应 更适合多步任务、复杂工作流
代码能力 强,尤其 Qwen2.5-Coder 更强调 Agentic Coding 和仓库级任务
多模态 不是这一代的核心重点 已经是核心方向之一
部署选择 适合通用本地部署 最新开源版更适合高级 Agent 部署
成本效率 dense 结构,直观易懂 MoE 更强调“能力 / 成本”比

11. 怎么选

如果你现在要选型,可以这样理解:

选 Qwen2.5 的场景

  • 想找成熟稳定的通用开源模型
  • 更关注文本问答、RAG、结构化输出
  • 本地部署机器一般,希望模型链路简单
  • 需要一套通用模型 + coder/math 专项模型组合

选 Qwen3.6-35B-A3B 的场景

  • 想自部署最新开源模型
  • 重点是代码 Agent、任务执行、复杂工作流
  • 希望性能强,但推理成本不要太夸张
  • 希望兼顾多模态能力

选 Qwen3.6-Plus 的场景

  • 不执着于开源权重
  • 更看重当前最强云端能力
  • 想做编程 Agent、工具调用 Agent、长链路任务规划
  • 需要默认 1M 上下文和更强的实际执行能力

12. 高频结论

  • Qwen2.5 的核心价值是“稳、全、好落地”。
  • Qwen3 开始,Qwen 明显往 reasoning + agent 方向转。
  • Qwen3.5 是“原生多模态 Agent”方向的重要节点。
  • Qwen3.6-Plus 是截至 2026-04-20 的最新 API 旗舰,更偏真实世界 Agent。
  • Qwen3.6-35B-A3B 是截至 2026-04-20 的最新开源主力,更适合自部署和高性价比 Agent 场景。

13. 推荐阅读

建议按下面顺序看:

  1. Qwen2.5:先理解它为什么是一个成熟通用底座
  2. Qwen3:看它如何把 thinking / non-thinking 合到一起
  3. Qwen3.5:看它怎么走向原生多模态 Agent
  4. Qwen3.6-Plus:看最新 Agent 旗舰在“编码 + 工具 + 多模态”上怎么继续升级
  5. Qwen3.6-35B-A3B:看开源版本如何在成本和能力之间做平衡

官方文章:


14. 一句话总结

Qwen2.5 的关键词是“强通用底座”,而最新的 Qwen3.6 系列关键词已经变成了:

原生多模态、Agent、编程执行、长上下文、高效率 MoE。

LLM 部署学习笔记

1. 先抓住部署的核心矛盾

部署大模型,本质上一直在解决 3 个问题:

  • 模型权重怎么加载进来
  • 有限的 GPU / CPU / 磁盘内存怎么分配
  • 多张卡之间怎么协调,才能兼顾“能跑、跑得稳、跑得快”

如果把它再压缩成一句话:

大模型部署,本质就是“权重放哪、缓存放哪、计算在哪做、卡和卡之间怎么通信”。


2. 模型加载:模型到底是怎么被搬进来的

2.1 传统加载流程

普通 PyTorch 模型通常这样加载:

  1. 先创建模型结构
  2. 再把权重文件读进内存
  3. 再把权重拷贝进模型
  4. 再把模型搬到 GPU

小模型这样没问题,但大模型会有明显问题:

  • 创建模型时,会先生成一份“空模型参数”
  • 读取 state_dict 时,又会在 CPU 内存里出现第二份权重
  • 最后再拷到 GPU,还会产生一次设备搬运

所以大模型加载时,最容易遇到的第一个问题不是算不动,而是:

还没开始推理,CPU 内存或显存就先爆了。


2.2 为什么大模型加载难

大模型加载难,主要难在下面几件事:

  • 参数量太大,权重本身就已经很占内存
  • 传统加载流程会出现“双份内存峰值”
  • 单卡显存不一定放得下完整模型
  • 就算 GPU 放不下,CPU 内存也不一定足够
  • 真正上线时还要给 KV Cache、临时 buffer、CUDA runtime 留空间

例如只看权重本体:

  • 7B 参数,FP16/BF16 大约需要 14GB
  • 70B 参数,FP16/BF16 大约需要 140GB

这还只是权重,不包含:

  • KV Cache
  • 中间激活
  • CUDA allocator 预留空间
  • 框架工作区和临时 buffer

2.3 更合理的大模型加载流程

大模型部署里,更常见的是这种思路:

  1. 先只读取配置,不真实分配完整参数
  2. 先构建一个“空壳模型”
  3. 提前规划每一层该放到哪块设备
  4. 按分片逐步加载权重
  5. 加载完后再做 warmup 和运行时缓存初始化

这就是很多框架里常见的:

  • meta device
  • init_empty_weights
  • device_map
  • load_checkpoint_and_dispatch
  • low_cpu_mem_usage=True

一句话理解:

先规划,再搬运;先知道每层去哪,再真正把权重加载进去。


2.4 典型加载手段

1)分片权重

大模型通常不会保存成一个超大的单文件,而是拆成多个 shard。

这样做的好处:

  • 单次读入的文件更小
  • CPU 峰值内存更容易控制
  • 可以边读边放到目标设备

常见格式有:

  • pytorch_model.bin
  • model-00001-of-000xx.safetensors

现在更推荐 safetensors,因为它更适合做安全加载和高效读取。

2)空壳初始化

先在 meta 设备上把模型结构搭出来,只保留:

  • shape
  • dtype
  • 层结构信息

此时不真正占用完整参数内存。

这一步的意义很大,因为只有先得到“模型骨架”,后面才能:

  • 估算每层参数量
  • 计算 device map
  • 决定哪些层放 GPU、哪些层放 CPU、哪些层甚至落盘

3)自动或手动 device_map

加载前先决定每一层该去哪。

典型策略:

  • device_map="auto":框架自动做层到设备的映射
  • 手动指定:自己定义哪些层放 cuda:0cuda:1cpu

这一步解决的是:

不是把整个模型整体搬到一个设备上,而是按层切开后分别放。

4)CPU / NVMe Offload

如果 GPU 还是装不下,就把一部分权重放到:

  • CPU 内存
  • NVMe 磁盘

推理时再按层搬到 GPU。

这类方案的本质是:

用更便宜、更大的存储层,换取“能跑起来”的可能性。

代价也很明显:

  • 延迟会上升
  • PCIe 传输会成为瓶颈
  • 更适合吞吐优先,不一定适合极低延迟

2.5 一个常见加载示例

如果是 Hugging Face Transformers,最常见的写法会是:

1
2
3
4
5
6
7
8
9
10
11
12
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

model_id = "meta-llama/Meta-Llama-3-8B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.bfloat16,
device_map="auto",
low_cpu_mem_usage=True,
)

这里几项的意义分别是:

  • torch_dtype=torch.bfloat16:减小权重占用
  • device_map="auto":自动把层分配到多个设备
  • low_cpu_mem_usage=True:尽量降低 CPU 加载峰值

如果模型更大,还可能配合:

  • load_checkpoint_and_dispatch
  • max_memory
  • offload_folder

2.6 服务真正启动时发生了什么

部署系统真正启动时,通常不只是“把模型读进来”,还会顺手做这些事:

  • 初始化 CUDA context
  • 创建 tokenizer
  • 加载权重并分配设备
  • 预留 runtime workspace
  • 预分配或按需分配 KV Cache
  • 执行一次 warmup,让 kernel、graph、memory pool 进入稳定状态

所以线上冷启动慢,往往不只是“模型文件大”,而是整套运行时都在初始化。


3. 内存分配:显存到底都花到哪里了

3.1 推理时内存的主要组成

推理阶段最重要的几部分内存通常是:

内存项 作用 特点
权重 weights 存模型参数 静态为主,启动时确定
激活 activations 当前 forward 的中间结果 瞬时占用,prefill 更明显
KV Cache 保存历史 token 的 key/value 会随上下文和并发增长
Workspace / 临时 buffer kernel、attention、通信缓冲 容易被忽略
CUDA allocator 保留区 运行时内存池 看起来“没用完”,实际已被预留

部署里最常见的误区之一:

以为显存主要都花在权重上,但很多时候真正把服务顶爆的是 KV Cache。


3.2 权重占用怎么估算

最基础的估算公式:

1
权重内存 ≈ 参数量 × 每个参数字节数

常见精度:

精度 每参数字节数
FP32 4 Bytes
FP16 2 Bytes
BF16 2 Bytes
INT8 1 Byte
4-bit 约 0.5 Byte,外加量化元数据

所以只看权重大致可以这么记:

  • 7B @ FP16/BF1614GB
  • 13B @ FP16/BF1626GB
  • 70B @ FP16/BF16140GB

注意这只是“裸权重体积”,实际部署还要留余量。


3.3 KV Cache 为什么这么重要

自回归推理不是每次都从头算完整上下文,而是把历史 token 的注意力结果缓存起来。

这个缓存就是 KV Cache。

它的近似规模可以理解为:

1
KV Cache ≈ batch × seq_len × layers × num_kv_heads × head_dim × 2 × bytes

这里的 2 是因为要存:

  • Key
  • Value

几个关键结论:

  • 上下文越长,KV Cache 越大
  • 并发请求越多,KV Cache 越大
  • 层数越多,KV Cache 越大
  • GQA / MQA 会减少 num_kv_heads,因此能降低 KV Cache 压力

所以部署时经常会出现这种现象:

  • 模型能加载成功
  • 一跑高并发或长上下文就 OOM

这通常不是权重问题,而是 KV Cache 膨胀了。


3.4 Prefill 和 Decode 的内存特征不同

大模型推理通常可以分成两个阶段:

Prefill

也就是先把整段输入 prompt 跑一遍。

特点:

  • 计算量大
  • 激活更明显
  • 更偏“算力瓶颈”

Decode

也就是一 token 一 token 地生成。

特点:

  • 每步计算量没那么大
  • 但 KV Cache 要一直留着
  • 更容易受显存和内存带宽影响

一句话记忆:

Prefill 更像“重算一遍输入”,Decode 更像“带着缓存持续往前推”。


3.5 为什么会有显存碎片

在真实服务里,请求长度通常不一样:

  • 有的 prompt 很短
  • 有的输出很长
  • 有的请求很快结束
  • 有的请求会长期占着上下文

如果 KV Cache 按大块连续内存预留,就容易出现:

  • 过度预分配
  • 空洞
  • 碎片
  • 大量“看起来空着,但其实不好再利用”的显存

这也是为什么 vLLM 的 PagedAttention 很重要:

它把 KV Cache 按 block 管理,像操作系统分页一样按需分配,而不是死板地整段连续申请。


3.6 常见的内存优化手段

1)降低权重精度

常见选择:

  • BF16 / FP16
  • INT8
  • 4-bit

作用:

  • 直接压缩权重内存
  • 往往也能降低带宽压力

但要注意:

权重量化通常主要解决“模型本体太大”,不一定直接解决 KV Cache 过大。

2)KV Cache 分块管理

这类思路常见于:

  • PagedAttention
  • paged KV cache
  • block-based cache allocator

作用:

  • 降低碎片
  • 提高 cache 复用率
  • 提升 batch 能力

3)Continuous Batching / In-flight Batching

不是等整个 batch 全结束才接新请求,而是:

  • 谁先结束就先释放
  • 空出来的位置立即塞新请求

这样可以让 GPU 持续保持高利用率。

4)Offload

把一部分数据放到:

  • CPU
  • NVMe

适合:

  • 显存不够
  • 可以接受额外延迟

5)控制上下文和并发

线上很多时候最有效的办法非常朴素:

  • 限制 max_model_len
  • 限制 max_num_seqs
  • 限制 max_new_tokens

因为这些参数直接决定了 KV Cache 的上限。


4. 多卡协调:多张 GPU 是怎么配合的

4.1 为什么需要多卡

需要多卡,通常有 3 个原因:

  • 单卡放不下模型
  • 单卡吞吐不够
  • 希望用更多卡降低单请求延迟或提升整体并发

但“多卡”不是一个方案,而是一组不同的并行方式。


4.2 数据并行 Data Parallel

最容易理解的一种方式:

  • 每张 GPU 放一整份完整模型
  • 不同请求分发到不同 GPU

优点:

  • 最简单
  • 卡间通信最少
  • 很适合模型能完整放入单卡时做水平扩展

缺点:

  • 每张卡都要复制完整权重
  • 模型太大时根本放不下

适合场景:

  • 单卡能装下模型
  • 主要目标是提高 QPS

4.3 张量并行 Tensor Parallel

张量并行的思路是:

  • 不复制整层
  • 而是把一层里的大矩阵切到多张卡上

例如一个大的线性层,可以按列切、按行切,让多张 GPU 同时算一层。

优点:

  • 单卡装不下时还能跑
  • 对超大模型很常见

代价:

  • 几乎每一层都会发生通信
  • 对 GPU 间互联非常敏感

典型通信包括:

  • AllReduce
  • AllGather
  • ReduceScatter

一句话理解:

Tensor Parallel 是“同一层一起算”,所以卡和卡之间必须频繁交换部分结果。


4.4 流水线并行 Pipeline Parallel

流水线并行是把模型按层切成几段:

  • 前几层在 GPU0
  • 中间层在 GPU1
  • 后几层在 GPU2

数据像流水线一样一段段往后传。

优点:

  • 每张卡只放一部分层
  • 适合层数很多、模型特别深的情况

缺点:

  • 会有 pipeline bubble
  • 微批次设计更复杂
  • 延迟不一定最优

一句话理解:

Pipeline Parallel 是“不同卡负责不同楼层”,输入像坐电梯一样逐层往上走。


4.5 FSDP / ZeRO

这类方案的核心不是“把一层切开一起算”,而是:

把参数、梯度、优化器状态分片,谁需要时再临时聚合。

最典型的是训练或微调场景。

FSDP 背后的核心动作可以这样理解:

  • 平时每张卡只保留自己那一份 shard
  • 前向计算到某层时,先 all-gather 出完整参数
  • 算完后,把不需要的部分释放
  • 反向时再做 reduce-scatter

它解决的是:

  • 大模型训练时显存冗余过高
  • DDP 每卡都存一整份参数太浪费

在部署里要注意:

  • 纯在线低延迟推理,主流更常见的是 Tensor Parallel
  • FSDP / ZeRO 更常用于训练、SFT、继续预训练
  • 但 ZeRO-Inference 这类方案会把“分片和 offload”思路带到推理场景

4.6 多卡通信时到底在传什么

多卡协同时,常见通信原语如下:

通信原语 常见用途
Broadcast 把参数或控制信息同步给所有卡
AllReduce 汇总每张卡的部分结果并让所有卡都拿到
AllGather 把各卡持有的分片拼回完整结果
ReduceScatter 先归约再分片返回
Send / Recv 流水线并行中前后 stage 传递激活

底层常见通信库:

  • NCCL
  • RCCL

如果是部署视角,需要记住一个关键点:

多卡不只是“卡多了”,而是“通信也多了”。如果通信成本压不住,卡越多不一定越快。


4.7 什么时候选哪种多卡方案

场景 更常见的选择
模型能放进单卡,只想提 QPS Data Parallel
模型单卡放不下,但在同机多卡内运行 Tensor Parallel
模型很深,想把层按 stage 切开 Pipeline Parallel
训练 / SFT / 继续预训练,显存压力极大 FSDP / ZeRO
显存不够,但可以接受更高延迟 CPU / NVMe Offload

很多真实系统是混合方案,例如:

  • TP + PP
  • TP + Data Parallel
  • ZeRO + Offload

4.8 多卡部署里最真实的瓶颈

线上真正会卡住的地方,通常不是论文里那一句“支持多卡”,而是这些细节:

  • GPU 之间是 NVLink 还是只有 PCIe
  • 通信拓扑是否对称
  • 小 batch 下通信时间会不会盖过计算时间
  • scheduler 是否能把动态长度请求组织好
  • KV Cache 是否会在高并发下失控
  • 某一张卡是否变成热点卡

所以多卡性能调优,本质是在看:

  • 算得快不快
  • 传得快不快
  • 分得均不均

5. 把三件事串起来:一条完整的部署链路

从服务启动到返回 token,大致会经历这条路径:

  1. 读取模型配置和 tokenizer
  2. 创建空壳模型,估算权重规模
  3. 计算 device_map 或并行策略
  4. 分片加载权重到 GPU / CPU / NVMe
  5. 初始化 runtime buffer 和 KV Cache 管理器
  6. 收到请求后,scheduler 做 batch 编排
  7. 先做 prefill
  8. 再进入 decode,持续使用 KV Cache
  9. 如果是多卡,则在层间或 stage 间做通信
  10. 请求结束后释放对应 KV block

这条链路里:

  • 模型加载决定“能不能启动”
  • 内存分配决定“能不能稳定跑”
  • 多卡协调决定“吞吐和延迟能不能达标”

6. 高频结论

  • 大模型加载难,首先难在内存峰值,而不是算力。
  • 推理阶段最核心的两块内存通常是“权重 + KV Cache”。
  • 权重决定你能否把模型装进系统,KV Cache 决定你能支撑多少上下文和并发。
  • 4-bit / 8-bit 量化主要缓解权重压力,不自动等于 KV Cache 压力消失。
  • 模型能跑起来,不代表服务能稳定支撑高并发。
  • 多卡不总是更快;如果互联差、通信重,延迟可能更差。
  • 部署里最常见的几类手段是:分片加载、精度压缩、KV Cache 优化、动态 batching、多卡并行。

7. 延伸阅读:推荐几篇博客

下面这几篇我按“加载 -> 内存 -> 多卡”这条线推荐,基本都是官方博客或官方文档,适合继续往下读。

1)模型加载

2)内存分配 / KV Cache

3)FSDP 和分片思想

4)Offload 和异构内存

5)多卡通信


8. 建议的阅读顺序

如果你想按一条比较顺的路径学习,我建议:

  1. 先看 Hugging Face 的大模型加载文章
  2. 再看 vLLM 的 PagedAttention
  3. 再看 DeepSpeed 的 ZeRO-Inference
  4. 再看 NVIDIA 那篇多卡通信博客
  5. 最后补 FSDP,把“分片和通信”的训练视角补齐

9. 一句话总结

大模型部署的核心,不是“把模型跑起来”这么简单,而是:

用尽量少的显存和通信开销,把权重、缓存和计算稳定地组织起来。

AI 模型开发进度记录

整理时间:2026-04-20
说明:当前文件基于现有笔记和方案文档整理,不额外虚构未发生的开发进展。

1. 项目目标

目标是在本地完成一个可部署、可演示、可维护的科研知识增强生成式 AI 系统,核心能力包括:

  • 文档上传与管理
  • 异步解析、切块、索引构建
  • 基于 RAG 的问答
  • 引用返回
  • 多轮会话与上下文管理
  • 流式输出

当前确定的主技术路线:

  • C++ 网关:Drogon
  • Python 服务:FastAPI
  • 异步任务:Celery
  • 数据存储:MySQL
  • 缓存与状态层:Redis
  • 检索能力:Embedding + Chunk + FAISS / rerank

参考总方案:

2. 当前阶段判断

截至 2026-04-20,从当前目录可见材料判断,项目目前主要处于:

方案设计完成 + 技术储备基本成型 + 进入工程落地前/落地初期

也就是说,方向和技术路线已经比较清楚,但还缺少连续的实现记录、联调结果和里程碑沉淀。这次整理的重点,就是把已有材料收拢成一份后面能持续更新的主记录。

3. 已完成进展

3.1 方案与路线已经明确

已完成的高价值产出:

  • 已产出整体方案文档,明确系统目标、架构分层、8 周开发路线和 MVP 范围
  • 已确定采用 C++ Gateway + Python RAG Service 的双服务方案
  • 已明确 MySQL、Redis、Celery、Embedding、流式输出等关键模块的位置和职责

对应文档:

3.2 后端基础框架学习已完成一轮

你已经完成一轮和主线高度相关的基础框架学习,且内容覆盖比较完整:

模块 状态 说明 对应笔记
Drogon 已完成基础学习 对 C++ Web 网关、路由、控制器、文件上传、与内部服务协作有了基础认知 Drogon.md
FastAPI 已完成基础学习 已覆盖 API、参数、请求体、异步、上传、任务入口等内容 FasthAPI.md
Celery 已完成基础学习 已掌握任务队列、Worker、状态、重试、任务查询、与 FastAPI 配合 Celery.md
Redis 已完成基础学习 已覆盖缓存、任务状态、中间结果、计数、TTL 等常见使用场景 Redis.md
MySQL 已完成基础学习 已覆盖连接、建表、字段设计、任务记录和业务数据存储思路 MySQL.md
Ray 已完成基础学习 已对分布式计算/GPU 调度有初步储备,适合作为后续扩展项 Ray.md
Embedding 已完成基础学习 已理解向量表示在语义检索/RAG 中的作用 embedding.md

3.3 检索与模型相关知识已开始进入专项阶段

2026-04-19 这组笔记说明你的学习已经从“通用后端栈”切到“RAG/检索/训练专项”:

专题 状态 当前意义 对应笔记
Chunk 已完成入门与策略理解 为文档切分和召回效果打基础 Chunk学习笔记.md
句子嵌入模型 已完成系统性梳理 已开始从业务目标、数据、训练、部署全链路理解 embedding 句子嵌入模型.md
数据处理与评估 已完成一轮整理 已开始关注数据清洗、评估集、badcase 和指标 数据处理与数据集评估.md
LoRA 微调训练 已完成基础梳理 说明你已经开始考虑后续模型训练/调优路线 LoRA微调训练.md

3.4 当前真正“落地完成”的部分

基于现有文件,可以确认已经完成的是:

  • 项目目标与交付路线的规划
  • 主技术栈的选型和学习
  • 检索/RAG 关键知识点的初步铺垫
  • 一份适合继续追踪的主进度文档框架

目前还不能从现有目录中确认已经完成的内容:

  • 实际工程仓库初始化
  • Docker / docker-compose 启动记录
  • Drogon / FastAPI / Celery 服务跑通记录
  • 数据库表结构落地
  • 上传、切块、索引、问答联调结果
  • 测试、验收、bug 排查日志

4. 时间线整理

按现有文件时间整理出的开发脉络如下:

日期 进展 说明
2026-03-16 完成总体方案研究 产出 deep-research-report.md,明确 8 周路线和总体架构
2026-03-24 完成 FastAPI / Celery / Redis 第一轮学习 开始打通 Python 服务、异步任务、状态层相关知识
2026-03-28 完成 Drogon / MySQL 第一轮学习 开始补齐 C++ 网关与关系型数据层
2026-03-29 完成 Embedding 学习 检索和向量表示方向开始成型
2026-03-30 完成 Ray 学习 对后续并行计算 / GPU 调度有预研储备
2026-04-19 完成 Chunk / 句子嵌入 / 数据评估 / LoRA 专题整理 学习重点从框架切到检索与模型侧
2026-04-20 建立正式进度记录 将已有资料整理成统一的开发记录入口

5. 阶段拆分与当前状态

阶段一:方向确认与技术选型

状态:已完成

已完成内容:

  • 明确项目目标和 MVP 范围
  • 明确双服务架构
  • 明确数据层、任务层、检索层、流式层的职责分工

阶段二:基础知识补齐

状态:已完成第一轮

已完成内容:

  • Web 服务框架学习
  • 存储与缓存学习
  • 异步任务机制学习
  • 向量检索和训练相关基础学习

当前缺口:

  • 还需要把“概念理解”转成“最小可运行系统”
  • 还缺少工程级联调记录和可验证产物

阶段三:最小系统落地

状态:待开始或未记录

建议按下面顺序推进:

  1. 建立工程目录与 docker-compose
  2. 跑通 MySQL、Redis
  3. 跑通 Drogon health 接口
  4. 跑通 FastAPI health 接口
  5. 跑通 Celery demo task 和任务状态查询
  6. 打通“上传文件 -> 创建任务 -> 查询状态”的最小闭环

阶段四:RAG 核心链路

状态:待开始或未记录

核心目标:

  1. 文档解析与切块
  2. Embedding 生成
  3. 向量索引构建
  4. 检索与重排
  5. 问答生成
  6. 引用返回

阶段五:工程化与体验优化

状态:待开始或未记录

后续重点:

  1. 流式输出
  2. 会话记忆
  3. 配置管理
  4. 限流与鉴权
  5. 测试与部署

6. 当前最值得推进的任务

如果你接下来要进入真正开发,我建议优先记录和推进下面这些 P0 事项:

优先级 任务 目标产出
P0 初始化工程骨架 有明确目录结构、启动脚本、配置文件
P0 跑通 MySQL / Redis 基础依赖可稳定启动
P0 建立 Drogon 最小服务 提供 /health,可启动可访问
P0 建立 FastAPI + Celery 最小服务 能提交 demo task 并查询状态
P0 设计数据表 至少有 documentstaskssessionsmessages
P0 打通上传闭环 上传后生成 doc_idtask_id
P0 补充实际开发日志 每次开发记录输入、输出、问题、下一步

7. 当前记录存在的问题

目前你的资料质量其实不低,但“开发记录”层面有几个明显问题:

  • 学习笔记很多,但实际开发日志没有集中沉淀
  • 方案很完整,但还没有对应的阶段性验收记录
  • 缺少“今天做了什么、结果如何、卡在哪里、下一步做什么”的连续记录
  • 缺少最小闭环的里程碑标记,后续很容易出现学习很多但难以判断工程进展的情况

8. 后续建议的记录方式

后续建议把这份文件当成总入口,只保留高价值信息:

  • 当前阶段
  • 最近里程碑
  • 已完成事项
  • 当前阻塞点
  • 下一步

具体每天或每次开发的细节,可以按下面模板追加。

9. 开发日志模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## YYYY-MM-DD

### 今日目标
-

### 已完成
-

### 产出
- 文档:
- 代码:
- 接口:
- 数据表:

### 遇到的问题
-

### 解决方式
-

### 当前结论
-

### 下一步
-

10. 一句话总结当前进度

截至 2026-04-20,你的 AI 模型开发项目已经完成了方案设计和核心技术储备,正在从“学明白”转向“做出来”的临界点;下一阶段的关键不是继续扩大学习范围,而是尽快产出第一个可运行的最小闭环,并持续补充真实开发记录。

Chunk 学习笔记

1. 什么是 Chunk

在 embedding、检索和 RAG 场景里,chunk 指的是把原始文档切成一段一段可独立编码、可独立检索的小文本单元。

对应的过程通常叫:

  • chunking
  • 文本切块
  • 文档切分

可以先把 chunk 看成检索流程里的最小工作单元。它不是随便切一刀,而是既要保留语义完整性,也要方便后续编码和检索。


2. 为什么要做 Chunk

原始文档通常不适合整篇直接送去做 embedding 或检索,原因一般有三个:

  • 太长,超过模型输入限制
  • 粒度太粗,检索结果不精准
  • 一篇文档往往包含多个主题,整篇编码会让语义混在一起

切块的目的,是让每一块:

  • 尽量只表达一个相对完整的意思
  • 长度不要太长,避免语义被稀释
  • 长度也不要太短,避免上下文不足

3. Chunk 对检索效果为什么这么关键

检索效果不好时,很多人的第一反应是换 embedding 模型,但工程里更常见的问题其实出在 chunk 策略。

实际项目里,chunk 策略会直接影响:

  • 向量质量
  • 召回命中率
  • rerank 效果
  • 最终 LLM 回答质量

常见问题包括:

  • chunk 太大,相关信息被埋在长文本里,召回不准
  • chunk 太碎,单块语义不完整,召回上来后也不好用
  • 切断了标题和正文的关系,导致上下文丢失
  • overlap 设置不合理,重复太多或连续性不够

所以可以把它理解成:

embedding 决定“怎么表示”,chunking 决定“切得对不对”。


4. 一个好 Chunk 的目标

好的 chunk 通常要满足这几个条件:

  • 语义尽量完整
  • 长度尽量稳定
  • 有明确边界
  • 便于加标题、来源、页码等元数据
  • 便于后续召回、去重、引用和展示

如果一个 chunk 满足下面任意一种情况,通常都说明切得不够好:

  • 只有一句残缺的话
  • 标题和正文被拆开
  • 一个 chunk 同时塞了多个不相关主题
  • 文本大部分是模板、导航、页脚、噪声内容

5. 常见的 Chunk 策略

5.1 固定长度切块

例如每 300 字、每 500 tokens 切一块。

优点:

  • 简单
  • 稳定
  • 容易批量处理

缺点:

  • 可能把完整段落或完整论点切断

适合:

  • 初版 baseline
  • 结构不稳定的原始文本

5.2 按段落切块

按照自然段来切。

优点:

  • 语义边界自然
  • 可读性通常更好

缺点:

  • 长度不稳定
  • 某些段落可能过长或过短

适合:

  • 普通说明文档
  • 笔记
  • 博客文章

5.3 按标题和小节切块

按照文档层级来切,例如:

  • 一级标题
  • 二级标题
  • 小节内容

优点:

  • 结构清晰
  • 更适合知识库和技术文档

缺点:

  • 某些小节可能太长,还需要二次细分

适合:

  • API 文档
  • 产品文档
  • 教程型内容

5.4 混合切块

先按标题或段落切,再对过长部分按固定长度继续细分。

这是实际项目里很常见的一种方案,因为它兼顾了:

  • 语义完整性
  • 长度可控
  • 工程实现难度

6. 什么是 Overlap

overlap 是相邻 chunk 之间保留的一部分重复内容。

作用:

  • 避免关键信息刚好被切断
  • 提高上下文连续性
  • 减少边界句子丢信息的问题

例如:

  • chunk1: 第 1~300
  • chunk2: 第 260~560

这里就有 40 字重叠。

但 overlap 也不是越大越好。

如果 overlap 太大,会带来:

  • 存储冗余
  • 检索结果重复
  • top-k 被相似 chunk 挤占

因此,overlap 要处理的是下面这组平衡:

  • 上下文连续性
  • 检索去重成本

7. Chunk Size 怎么定

chunk 大小没有全场景通用的固定答案,应该结合下面几件事一起看:

  • 文档类型
  • 模型最大输入长度
  • 用户 query 的粒度
  • 是否有 rerank
  • LLM 最终要吃多少上下文

7.1 中文场景一个常见起点

如果是中文知识库、普通笔记或教程类文本,可以先从下面这个 baseline 开始:

  • 每块 300~500
  • overlap 取 10%~20%
  • 优先按标题和段落切,超长再二次切分

这不是固定标准,但很适合作为第一版默认策略。

7.2 用 token 还是字数

更严格的工程实现通常按 token 控制,因为模型真正受限的是 token 数。

但在中文笔记场景里:

  • 用字数做初版规则更直观
  • 用 token 做最终线上控制更稳妥

8. 不同文档类型怎么切

8.1 FAQ

优先按一问一答切。

原因:

  • 每条 FAQ 本身就是天然语义单元
  • 非常适合检索

8.2 API 文档

优先按接口、参数、返回值、错误码分块。

原因:

  • 用户检索时往往是精确问题
  • 结构化切分更利于命中

8.3 教程和博客

优先按标题和段落切,再对过长部分二次细分。

原因:

  • 这类文档有明显章节结构
  • 语义常围绕小节展开

8.4 法规、合同、制度文档

优先按条款、章节切,再保留章节编号。

原因:

  • 编号本身很重要
  • 后续引用和回溯都依赖这些元数据

9. Chunk 不只是文本,还要带元数据

实际工程里,向量库里存的往往不只是 chunk 文本本身,还包括一组元数据。

常见字段:

  • chunk_id
  • doc_id
  • title
  • section
  • source
  • page
  • url
  • create_time

这些元数据很重要,因为它们会影响:

  • 过滤
  • 去重
  • 排序
  • 页面展示
  • 来源追踪

一句话说:

没有元数据的 chunk,很难在真实系统里长期维护。


10. Chunk 怎么评估好不好

chunk 没有一个单独的万能评分,通常要放到检索链路里一起评估。

最常见的评估方式有:

  • 看 Recall@K 是否提升
  • 看 badcase 是否减少
  • 看召回结果是否更少重复
  • 看 LLM 最终回答是否更稳定

排查时,通常先看这几个问题:

  1. 召回不到,是不是 chunk 太大
  2. 召回到了但答不好,是不是 chunk 太碎
  3. top-k 很重复,是不是 overlap 太大
  4. 标题命中了但正文没进来,是不是标题和内容被拆开了

11. 常见坑

11.1 只按固定长度切,不看语义边界

这样最容易把一句完整的话切断。

11.2 标题和正文分离

很多技术文档里,标题本身就是关键检索信号,和正文拆开后效果通常会明显变差。

11.3 chunk 太碎

如果一个 chunk 只剩一句短句,虽然向量容易算,但语义往往不完整。

11.4 chunk 太大

如果一块里塞进多个主题,检索时很容易语义发散。

11.5 overlap 太大

会导致召回结果里出现很多几乎一样的片段,影响 top-k 质量。

11.6 没做清洗就直接切

页眉、页脚、导航栏、版权信息、空白符噪声,都会污染 chunk 质量。


12. 一个实用的默认方案

如果你是第一次给中文知识库做 chunk,可以先这样做:

  1. 先清洗无关噪声
  2. 按标题和段落切
  3. 过长段落按 300~500 字继续切
  4. overlap 先取 10%~15%
  5. 保留 doc_idtitlesectionsource
  6. 离线抽样检查 top-k 检索效果
  7. 根据 badcase 再回调 chunk size 和 overlap

这套方案不是最优,但通常足够当第一版生产 baseline。


13. Chunk、Embedding、RAG 的关系

可以把三者简单理解成:

  • chunking 决定“怎么切”
  • embedding 决定“怎么表示”
  • retrieval / rerank 决定“怎么找”
  • LLM 决定“怎么答”

如果 chunk 切得不对,后面的 embedding、rerank、LLM 往往都只能被动补救。


14. 一句话总结

Chunk 的核心不是“把文档切小”,而是:

把文档切成既适合向量表示、又适合检索召回、还能支撑最终回答的语义单元。

LoRA 微调训练笔记

1. 什么是 LoRA

LoRA(Low-Rank Adaptation)是一种参数高效微调方法。

它的核心思路不是更新整个基座模型,而是:

  • 冻结原始预训练权重
  • 只在部分线性层旁边插入可训练的低秩矩阵
  • 训练时只更新这部分新增参数

对句子嵌入模型来说,LoRA 特别适合下面几类场景:

  • 显存有限,无法稳定做全量微调
  • 想快速做领域适配,例如法律、医疗、电商、代码检索
  • 希望保留同一个基座模型,同时维护多个领域 adapter
  • 想减少模型发布体积,只保存 adapter 权重

可以先把 LoRA 看成一种轻量改造:不大动原来的基座模型,只额外训练一小部分增量参数。和全量微调相比,它更省显存,也更适合快速做领域适配。


2. LoRA 的算法原理

假设原始线性层权重是 W,全量微调要学的是一个完整的权重更新量 ΔW

W' = W + ΔW

LoRA 认为这个更新量通常可以被一个低秩分解近似表示:

ΔW ≈ sBA

其中:

  • A 的形状通常可写作 r x d_in
  • B 的形状通常可写作 d_out x r
  • r 是秩,远小于 d_ind_out
  • s 是缩放因子,常见写法是 alpha / r

因此 LoRA 后的线性层可写成:

W' = W + sBA

训练时:

  • W 冻结不动
  • 只训练 AB

这样做的直接收益是:

  • 可训练参数从 d_out x d_in 降到 r x (d_in + d_out)
  • 反向传播只覆盖 LoRA 参数,显存占用更低
  • 一个基座模型可以挂多个 adapter,便于多任务和多领域切换

常见初始化方式还会让 B 初始为 0,这样刚插入 LoRA 时输出和原模型基本一致,训练过程就相当于逐步学出一个增量更新。


3. LoRA 在句子嵌入训练中的位置

LoRA 只改变“哪些参数参与训练”,但不会改变 embedding 任务本身。

通常不变的东西有:

  • 训练数据格式
  • loss 设计
  • pooling 方式
  • 向量归一化策略
  • 评估指标

真正变化的是:

  • 原来训练全部 encoder 参数
  • 现在只训练插入到 attention 或线性层里的 LoRA 参数

所以如果你原本是这样训练 embedding:

  • query-positive 对配合对比学习
  • query-positive-negative 三元组配合 triplet loss
  • STS 打分数据配合回归 loss

换成 LoRA 后,训练主流程通常不用重写,主要改的是参数更新范围。


4. 一个适合 embedding 的 LoRA 训练流程

检索类句子嵌入任务里,比较实用的一条路线通常是:

  1. 选一个已有 embedding 基座,例如 BGE、E5、GTE、MiniLM、MPNet
  2. 保持原有文本预处理、prompt 前缀、pooling 逻辑不变
  3. 给编码器的部分线性层插入 LoRA
  4. query-positivequery-positive-negative 数据做对比学习
  5. 优先使用 batch 内负样本,再配合 hard negative
  6. 用 Recall@K、MRR、NDCG 做离线评估
  7. 只保存 adapter,部署时加载到相同基座模型上
  8. 如有需要再把 LoRA 权重 merge 回基座权重

关键点要记住:

  • LoRA 解决的是“训练成本”问题
  • 数据质量、难例挖掘、线上预处理一致性,依然决定最终效果

5. LoRA 训练时重点关注哪些超参数

5.1 r(rank)

  • 决定低秩矩阵容量
  • 常见起点:8 / 16 / 32 / 64
  • r 越大,表达能力越强,但参数量和显存也会上升

5.2 lora_alpha

  • 决定 LoRA 更新量的缩放强度
  • 常见会设置成 r 的 2 倍或 4 倍

5.3 lora_dropout

  • 常见取值:0.0 ~ 0.1
  • 数据量较小或容易过拟合时,可以适当加一点

5.4 target_modules

  • BERT / MiniLM 一类 encoder,常见目标层是 queryvalue
  • Llama / Qwen 一类 Transformer,常见目标层是 q_projv_proj
  • 如果不想按架构手写模块名,也可以考虑 all-linear

5.5 learning rate

  • LoRA 通常可以比全量微调用更大的学习率
  • 工程上常从 1e-42e-4 开始试,再根据验证集回调

此外,embedding 任务原本重要的参数依旧重要:

  • batch size
  • max length
  • hard negative 数量
  • warmup ratio
  • 训练轮数

6. 一个常见实现方式:Sentence Transformers + PEFT

如果本来就在做句子嵌入,最直接的工程组合通常是:

  • Sentence Transformers 负责 embedding 训练流程
  • PEFT 负责 LoRA adapter 注入与保存

一个常见写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from datasets import load_dataset
from peft import LoraConfig, TaskType
from sentence_transformers import (
SentenceTransformer,
SentenceTransformerTrainer,
SentenceTransformerTrainingArguments,
)
from sentence_transformers.losses import MultipleNegativesRankingLoss

# 1. 选择基座模型
model = SentenceTransformer("BAAI/bge-small-zh-v1.5")

# 2. 配置 LoRA
peft_config = LoraConfig(
task_type=TaskType.FEATURE_EXTRACTION,
inference_mode=False,
r=32,
lora_alpha=64,
lora_dropout=0.05,
target_modules=["query", "value"],
)
model.add_adapter(peft_config)

# 3. 准备数据
train_dataset = load_dataset("json", data_files="train.jsonl", split="train")
eval_dataset = load_dataset("json", data_files="dev.jsonl", split="train")

# 4. 定义 loss
loss = MultipleNegativesRankingLoss(model)

# 5. 训练参数
args = SentenceTransformerTrainingArguments(
output_dir="outputs/bge-small-zh-lora",
num_train_epochs=1,
per_device_train_batch_size=128,
per_device_eval_batch_size=128,
learning_rate=1e-4,
warmup_ratio=0.1,
fp16=True,
)

# 6. 启动训练
trainer = SentenceTransformerTrainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
loss=loss,
)
trainer.train()

# 7. 保存 adapter
model.save_pretrained("outputs/bge-small-zh-lora/final")

如果你的数据是检索三元组,也可以把 loss 换成:

  • TripletLoss
  • CachedMultipleNegativesRankingLoss
  • MultipleNegativesRankingLoss

经验上:

  • 检索召回优先,通常先试 MultipleNegativesRankingLoss
  • batch 很大时,可优先考虑 CachedMultipleNegativesRankingLoss
  • 有明确三元组时,可尝试 TripletLoss

7. LoRA 层到底做了什么

如果原来的线性层输出写成:

y = Wx

那么加上 LoRA 后,输出就变成:

y = Wx + sBAx

也就是说:

  • 主干路径还是原始的 Wx
  • LoRA 只是在旁边额外加了一条低秩增量路径

训练完成后,通常有两种部署方式。

7.1 保留 adapter

优点:

  • 体积小
  • 方便多领域切换
  • 基座模型只保留一份

缺点:

  • 部署时需要同时管理 base model 和 adapter

7.2 merge 回基座权重

优点:

  • 部署形态更像普通模型
  • 推理链路更简单

缺点:

  • 不方便在同一个基座上频繁切换多个 adapter

8. LoRA 适合什么,不适合什么

LoRA 更适合:

  • 中小规模领域适配
  • 训练资源紧张
  • 想快速验证方向
  • 同一个基座服务多个垂类任务

LoRA 不一定最优的情况:

  • 域迁移特别大,任务和原始预训练差异非常明显
  • 你追求极限指标,并且训练资源足够
  • 需要同时修改很多非线性模块或特殊组件

这时更适合的思路可能是:

  • 全量微调
  • LoRA + 更强数据构造
  • LoRA 的变体,例如 DoRA、rsLoRA、QLoRA

9. 一份可直接抄的 LoRA 检查清单

  1. 基座模型是否适合你的语种和场景
  2. 训练和推理的文本预处理是否完全一致
  3. pooling 方式是否与原模型保持一致
  4. target_modules 是否匹配当前模型架构
  5. loss 是否与任务目标一致
  6. 是否准备了足够强的 hard negative
  7. 是否有稳定的验证集和 badcase 集
  8. 保存的是 adapter,还是 merge 后模型
  9. 部署时是否保证 base model 版本完全一致
  10. 线上是否监控向量分布、召回率和 badcase

10. 论文、官方文档与博客参考

论文与官方资料:

博客参考:


11. 一句话总结

LoRA 用在句子嵌入模型上,归根到底做的是:

保留原来的 embedding 训练目标,只把全量参数更新改成低秩增量更新。

重点依然是数据质量、难例构造和线上一致性。

句子嵌入模型开发与部署全流程

补充专题笔记

1. 什么是句子嵌入模型

句子嵌入模型(Sentence Embedding Model)做的事情很直接:

  • 把一句话、一个 query、一个标题、一个段落,编码成一个固定长度的向量
  • 让语义相近的句子,在向量空间里距离更近
  • 让语义无关或相反的句子,在向量空间里距离更远

它最常见的用途是:

  • 语义检索
  • 问答召回
  • 相似句匹配
  • 去重与聚类
  • 推荐系统召回
  • RAG 检索

可以把它理解成一种语义编码器:它把句子的意思压缩成后续可比较、可检索、可排序的向量表示。


2. 在开发前,先明确业务目标

做句子嵌入模型,第一步不是训练,而是先确定你到底要解决什么问题。

常见目标有三类:

2.1 语义相似度

例如:

  • “怎么学好 C++”
  • “如何提升 C++ 学习效果”

这类任务关注的是两句话意思是否接近。

适合场景:

  • 相似问题匹配
  • FAQ 匹配
  • 文本去重

2.2 检索召回

例如:

  • 用户输入 query
  • 系统从知识库、商品库、帖子库中召回最相关内容

适合场景:

  • 搜索
  • RAG
  • 推荐系统第一阶段召回

2.3 分类辅助表示

把句子向量作为下游模型输入,做:

  • 分类
  • 聚类
  • 异常检测

这三类任务虽然都用 embedding,但数据组织方式、训练目标、评估指标并不完全相同。

所以第一件事是确定:

  • 输入是什么
  • 输出要服务什么业务
  • 最重要的指标是什么
  • 延迟、成本、吞吐量限制是多少

2.4 为什么做句子嵌入还要懂 BM25

BM25 是经典的稀疏检索算法,核心还是依赖 query 和 document 的词项匹配来打分。

它主要考虑三件事:

  • 某个词在当前文档里出现了多少次
  • 这个词在整个语料库里是否稀有
  • 当前文档是否过长,需要做长度归一化

可以把它简单理解成:

一个词越能代表当前文档、又越不常见,它对相关性分数的贡献就越大。

BM25 的优点是:

  • 不需要训练,冷启动快
  • 对专有名词、品牌名、版本号、报错信息、函数名这类精确词特别敏感
  • 可解释性强,容易分析为什么命中或没命中

BM25 的局限是:

  • 不擅长理解同义表达
  • query 和文档不共享关键词时,很难召回
  • 对深层语义和改写表达支持较弱

而句子嵌入模型更擅长的是:

  • 语义相近但字面不同的表达
  • 口语化 query 和正式文档之间的匹配
  • 跨表述方式的泛化召回

所以在搜索、知识库检索、RAG 里,BM25 和 embedding 往往不是替代关系,而是互补关系:

  • BM25 更擅长“字面相关”
  • embedding 更擅长“语义相关”

多数线上系统里,更常见的不是二选一,而是:

  • BM25 召回一批结果
  • 向量检索再召回一批结果
  • 合并、去重、融合
  • 最后交给 rerank 或大模型进一步处理

3. 一个完整的工程流程

句子嵌入模型从立项到上线,通常可以拆成下面几个阶段:

  1. 定义任务与指标
  2. 准备训练数据与评估集
  3. 选择基座模型
  4. 设计训练目标与样本组织方式
  5. 训练与调参
  6. 离线评估与 badcase 分析
  7. 导出模型并构建推理服务
  8. 构建 BM25、向量索引或在线召回链路
  9. 灰度上线
  10. 监控、回流、持续迭代

如果只记一个主线,可以记成:

先定义业务,再准备数据,再训练,再评估,再部署,再根据线上反馈迭代。


4. 数据准备的详细过程

数据准备通常决定了句子嵌入模型最后 70% 以上的效果上限。

4.1 明确训练样本的基本单位

不同任务,对数据格式的要求不同。

相似度任务

通常需要句子对:

  • sentence_a
  • sentence_b
  • label

其中 label 可以是:

  • 二分类标签:相似 / 不相似
  • 打分标签:0 到 1,或 0 到 5

示例:

1
2
{"sentence_a":"怎么学习操作系统","sentence_b":"如何系统学习操作系统","label":1}
{"sentence_a":"苹果手机怎么截图","sentence_b":"线性代数怎么入门","label":0}

检索任务

通常需要三元组或多元组:

  • query
  • positive
  • negativehard_negative

示例:

1
{"query":"C++ 线程池实现","positive":"一个使用条件变量和任务队列实现线程池的教程","hard_negative":"进程池和线程池的区别说明"}

聚类或无监督任务

这类任务可以只准备原始文本,但效果往往不如有监督数据稳定。

4.2 数据来源

句子嵌入模型的数据,常见来源有:

  • 搜索日志
  • 点击日志
  • 问答系统日志
  • FAQ 数据
  • 商品标题和类目
  • 用户查询与文档标题
  • 人工标注数据
  • 现有分类数据集
  • 开源语义匹配数据集

工程上最常见的做法是:

  • 先用业务日志构造弱监督数据
  • 再配合少量高质量人工标注数据做校正

4.3 正样本如何构造

正样本的核心是:语义上应该接近,并且符合业务目标。

常见构造方式:

来自点击或转化日志

例如:

  • 用户 query -> 点击文档
  • 用户 query -> 点击商品
  • 用户 query -> 最终停留或转化内容

这类数据往往最贴近真实业务。

但要注意:

  • 点击不一定等于相关
  • 位置偏置会污染标签
  • 热门内容可能被误当作高相关内容

所以一般需要加过滤规则,例如:

  • 去掉极短停留
  • 去掉误点击
  • 去掉曝光高但满意度低的样本

来自同义改写

例如把一句话做:

  • 同义改写
  • 问法改写
  • 标题与正文摘要配对

这类方法适合构造句子相似度训练集。

来自结构化业务关系

例如:

  • 问题 -> 标准答案标题
  • 商品标题 -> 标准类目
  • 文档标题 -> 文档摘要
  • 代码问题 -> 解决方案描述

如果业务里本身就存在稳定映射关系,这类数据通常质量较高。

4.4 负样本如何构造

负样本质量直接决定模型区分能力。

负样本通常分三层:

随机负样本

从语料里随机采样无关文本。

优点:

  • 简单
  • 容易构造

缺点:

  • 太容易
  • 对模型提升有限

同域负样本

从相同主题、相同类目、相同场景里采样不匹配内容。

例如:

  • query 是“C++ 线程池”
  • 负样本不是“美食做法”
  • 而是“C++ 协程原理”

这种负样本更接近真实线上干扰项。

困难负样本(Hard Negative)

这是效果提升最关键的一类数据。

所谓困难负样本,就是:

  • 表面很像
  • 关键词很接近
  • 但真实标签不匹配

例如:

  • query:“Python 多线程和多进程区别”
  • positive:“Python 中多线程与多进程的对比说明”
  • hard negative:“Python 线程池使用教程”

困难负样本常见构造方式:

  • 用 BM25 检索出的高分非正样本
  • 用旧 embedding 模型召回出的相似错例
  • 用交叉编码器打分后筛出的高混淆样本
  • 人工从 badcase 中挑选

一句话总结:

句子嵌入模型真正拉开差距,往往靠的不是更多随机负样本,而是更好的困难负样本。

4.5 数据清洗

原始数据通常很脏,必须清洗。

重点包括:

  • 去重
  • 去除乱码
  • 去除过短文本
  • 去除极长文本
  • 去除纯模板文本
  • 去除广告、脏词、无意义符号
  • 统一全角半角
  • 统一大小写
  • 统一空白符

如果是中文场景,还要注意:

  • 简繁统一
  • 标点统一
  • URL、邮箱、手机号是否脱敏
  • 时间、数字、金额是否要做归一化

4.6 数据切分

至少要分成三份:

  • 训练集
  • 验证集
  • 测试集

常见比例:

  • 8:1:1
  • 9:0.5:0.5

重点不是比例,而是避免数据泄漏。

例如:

  • 同一问题的改写版本,不要同时出现在训练和测试里
  • 同一文档的多个片段,不要一部分在训练一部分在测试
  • 同一用户会话,不要跨集合乱分

否则评估结果会虚高。

4.7 标注规范

如果有人工标注,一定要先写标注规则。

至少要明确:

  • 什么叫相似
  • 什么叫部分相关
  • 什么叫不相关
  • 遇到歧义怎么处理
  • 长文本与短 query 如何判断

建议使用三档或四档标签:

  • 2:强相关
  • 1:部分相关
  • 0:不相关

这样后面既可以做分类,也可以映射成回归分数。

4.8 构建评估集

训练集和评估集不要混在一起。

评估集应当满足:

  • 规模不必特别大,但质量一定要高
  • 覆盖核心业务场景
  • 覆盖热门 query 和长尾 query
  • 覆盖容易混淆的 case
  • 尽量有人工校验

一个实用做法是:

  • 先准备一份标准评估集
  • 再维护一份 badcase 集

badcase 集是后续迭代最重要的资产之一。

4.9 数据处理与数据集评估专题

数据质量很大程度上决定模型效果上限,这部分详细内容已拆分到独立笔记:数据处理与数据集评估

这里先记 4 个最重要的结论:

  • 数据处理不只是“清洗脏数据”,还包括样本分布分析、标签质量检查、数据切分和泄漏排查
  • 训练集大不代表有效,重复样本、简单负样本、分布失衡和标签噪声都会拉低训练质量
  • 评估集不是越大越好,而是要覆盖核心场景、长尾 query、困难样本和真实线上分布
  • 如果没有稳定的数据快照、评估集和 badcase 集,模型迭代通常很难定位问题

5. 模型开发的详细过程

5.1 选择模型架构

句子嵌入最常见的是双塔编码结构(Bi-Encoder):

  • query 单独编码
  • document 单独编码
  • 通过向量相似度做匹配

优点:

  • 可提前离线编码文档
  • 适合大规模检索
  • 在线速度快

缺点:

  • 交互建模较弱
  • 极细粒度匹配不如交叉编码器

如果业务是海量召回,优先考虑双塔。

如果业务是小规模精排,可以在召回后增加 Cross-Encoder 做二阶段排序。

5.2 选择基座模型

一般不会从零训练,而是基于现有预训练 Transformer 做微调。

常见选择思路:

  • 中文任务优先选中文或中英双语基座
  • 长文本场景关注最大长度
  • 在线低延迟场景优先小模型
  • 追求效果可以用更强基座,但成本更高

选择时要看:

  • 语种支持
  • 最大输入长度
  • 向量维度
  • 推理速度
  • 显存占用
  • 是否方便导出 ONNX
  • 是否便于量化

5.3 文本预处理策略

训练前要统一训练和推理时的文本处理逻辑。

例如:

  • 是否截断到固定长度
  • 是否保留标点
  • 是否保留换行
  • 标题和正文是否拼接
  • query 前是否加任务前缀

如果训练时用了:

  • query: xxx
  • passage: xxx

那部署时也必须保持一致。

训练和服务预处理不一致,是最常见的效果回退原因之一。

5.4 训练目标如何选择

不同任务常用不同 loss。

对比学习

目标是:

  • 拉近正样本
  • 拉远负样本

常见形式:

  • Contrastive Loss
  • InfoNCE
  • Multiple Negatives Ranking Loss

适合:

  • 检索
  • 匹配
  • 召回

Triplet Loss

输入一般是:

  • anchor
  • positive
  • negative

要求 anchor 更接近 positive,远离 negative。

适合有明确三元组数据的场景。

Cosine Regression / MSE

适合有相似度分数标签的数据,例如:

  • STS 数据
  • 0 到 1 或 0 到 5 的语义打分

蒸馏

用更强 teacher 模型指导 student 模型学习。

常见方式:

  • 用 Cross-Encoder 对句子对打分
  • 用 teacher embedding 作为软标签

蒸馏适合:

  • 小模型加速上线
  • 域内数据不足但有较强 teacher 时

5.5 一个常见训练配方

检索类任务里,常见流程是:

  1. 选择一个已有 embedding 基座
  2. 用 query-positive 对做对比学习
  3. 在 batch 内把其他样本当作负样本
  4. 再额外加入 hard negative
  5. 周期性评估 recall@k、mrr、ndcg
  6. 通过 badcase 继续补数据

这类方法比单纯随机负样本更接近真实检索场景。

5.6 训练时需要关注的超参数

重点关注:

  • batch size
  • learning rate
  • max length
  • warmup steps
  • temperature
  • dropout
  • hard negative 数量
  • 训练轮数

经验上:

  • batch 太小,对比学习效果往往受影响
  • max length 太短,会截断关键信息
  • 训练太久,容易过拟合特定表达方式

5.7 是否需要做难例挖掘

大多数业务场景都需要。

常见做法:

  1. 先训练一个初版模型
  2. 用这个模型去召回相似结果
  3. 从错召回结果中挑选 hard negative
  4. 回灌重新训练

这就是一个标准的闭环:

  • 训练
  • 召回
  • 找错例
  • 再训练

很多效果提升,都是靠这个闭环慢慢做出来的。

5.8 LoRA 微调

LoRA 详细内容已拆分到独立笔记:LoRA微调训练

这里先记 3 个最重要的结论:

  • LoRA 的本质是冻结基座权重,只训练低秩增量参数
  • 对 embedding 任务来说,通常不改数据格式、loss 和 pooling,只改参数更新范围
  • LoRA 解决的是训练成本问题,最终效果依然主要取决于数据质量、hard negative 和训练部署一致性

6. 评估与验收

6.1 离线评估指标

不同任务看不同指标。

相似度任务

常见指标:

  • Spearman
  • Pearson
  • Accuracy
  • F1

检索任务

常见指标:

  • Recall@K
  • Precision@K
  • MRR
  • MAP
  • NDCG

如果模型主要用于线上召回,最应关注的通常是:

  • Recall@10
  • Recall@50
  • MRR@10

6.2 线上指标

离线指标好,不代表线上一定好。

线上更关注:

  • 点击率
  • 转化率
  • 用户停留
  • 问题命中率
  • RAG 回答满意度
  • 首次召回正确率

6.3 Badcase 分析

评估阶段不能只看平均分。

必须看错在哪里。

常见 badcase 类型:

  • 同义表达没召回
  • 关键词相近但语义不符
  • 长文本被截断
  • 否定句理解错误
  • 数字、时间、版本号理解错误
  • 专业术语、缩写、别名处理差

分析 badcase 时要问三个问题:

  1. 是数据问题,还是模型问题
  2. 是预处理问题,还是标签问题
  3. 是召回问题,还是排序问题

7. 部署上线的详细过程

训练完成后,不是直接上线模型文件,而是要把它接入完整推理链路。

7.1 部署前先确定线上形态

常见有三种:

离线编码 + 在线检索

最常见。

流程:

  1. 文档库离线批量生成 embedding
  2. 写入向量库或索引
  3. 用户 query 在线编码
  4. 与向量索引做近邻检索,必要时并行走 BM25
  5. 合并结果后返回 topK

适合:

  • 搜索
  • RAG
  • 推荐召回

在线双边编码

query 和候选内容都在线编码。

适合候选集较小的场景,但成本会高很多。

召回 + 精排双阶段

流程:

  1. 用句子嵌入模型做第一阶段召回
  2. 用更强的排序模型做第二阶段重排

这是实际业务里非常常见的方案。

7.2 模型导出

训练好后通常需要把模型导出成便于部署的格式。

常见形态:

  • PyTorch 原始权重
  • ONNX
  • TensorRT

选择原则:

  • 如果研发环境灵活,直接用 PyTorch 也可以
  • 如果强调推理速度和跨语言部署,ONNX 很常见
  • 如果是 GPU 高吞吐场景,可进一步做 TensorRT 优化

7.3 推理服务设计

一个 embedding 服务通常要解决下面几个问题:

  • 输入文本预处理
  • tokenizer 编码
  • 模型推理
  • pooling
  • 向量归一化
  • 批处理
  • 结果返回

其中 pooling 很关键,常见方式有:

  • CLS pooling
  • Mean pooling
  • Masked mean pooling

如果训练时用的是 mean pooling,部署时也必须保持一致。

7.4 向量归一化

很多检索系统会对向量做 L2 normalization。

这样在使用余弦相似度时更稳定,也更方便用点积近似计算。

注意:

  • 训练时是否归一化
  • 离线建库时是否归一化
  • 在线 query 是否归一化

三者必须一致。

7.5 向量索引与存储

常见选择:

  • FAISS
  • Milvus
  • pgvector
  • Elasticsearch 向量检索
  • Weaviate

核心不是工具名字,而是要考虑:

  • 数据规模
  • 延迟要求
  • 更新频率
  • 是否需要过滤条件
  • 是否要混合检索

如果数据量很大,通常会用 ANN 检索。

常见索引类型:

  • IVF
  • HNSW
  • PQ

它们本质是在召回速度、内存占用、召回率之间做权衡。

BM25 与混合检索

如果线上只有向量检索,常见漏召回场景包括:

  • 专有名词
  • 产品型号和版本号
  • 报错码、函数名、接口名
  • 语料里很稀有但业务上非常关键的关键词

如果线上只有 BM25,也常见下面的问题:

  • 同义表达召回不出来
  • query 更口语化,文档更书面化
  • 用户问题和文档标题没有共享关键词

所以实际工程里很常见的做法是混合检索:

  1. 用 BM25 召回 topK
  2. 用 embedding 召回 topK
  3. doc_idchunk_id 去重
  4. 用融合策略得到最终候选集
  5. 需要时再做 rerank

常见融合方式有:

  • Reciprocal Rank Fusion(RRF)
  • 加权分数融合
  • 先并集召回,再交给 Cross-Encoder 或 reranker

工程上要注意:

  • BM25 分数和向量相似度分数通常不在同一量纲,不能直接生硬相加
  • chunk 切分策略会同时影响 BM25 和向量召回效果
  • 过滤条件、权限控制、时间范围等规则要在两路召回里保持一致

7.6 离线建库流程

文档侧通常这样做:

  1. 清洗原始文本
  2. 切块或抽取标题摘要
  3. 生成 doc_id 与元数据
  4. 批量编码生成 embedding
  5. 写入向量库
  6. 建索引
  7. 做抽样质检

元数据通常包括:

  • doc_id
  • title
  • source
  • category
  • create_time
  • version

这些字段后面常用于过滤、排序和回溯。

7.7 在线查询流程

一个典型请求链路通常是:

  1. 用户输入 query
  2. 文本清洗与标准化
  3. 生成 query embedding
  4. 并行走 BM25 和向量召回
  5. 合并、去重并取候选 topK
  6. 按业务规则过滤
  7. 如有需要再做重排
  8. 返回结果或交给大模型生成答案

7.8 版本控制

部署时一定要给下面内容做版本号:

  • 模型版本
  • tokenizer 版本
  • 预处理规则版本
  • 索引版本
  • 数据快照版本

否则一旦效果回退,很难定位问题。

7.9 灰度上线

不要全量直接切换。

建议:

  • 先影子流量验证
  • 再小流量灰度
  • 再按业务指标逐步放量

重点观察:

  • 延迟
  • 超时率
  • 召回结果质量
  • 点击和转化变化

8. 上线后的监控与持续迭代

句子嵌入模型不是“训完一次就结束”,而是需要长期维护。

8.1 监控哪些指标

服务指标

  • P50 / P95 / P99 延迟
  • QPS
  • 错误率
  • 超时率
  • GPU / CPU 利用率

召回指标

  • topK 命中率
  • 空结果率
  • 重复结果率
  • query 平均召回分数

业务指标

  • 点击率
  • 转化率
  • 用户满意度
  • RAG 答案采纳率

8.2 监控向量分布

还要关注 embedding 自身是否异常。

例如:

  • 向量范数是否突然漂移
  • 相似度分布是否异常集中
  • 不同版本的向量是否不兼容

如果向量分布明显漂移,通常意味着:

  • 预处理变了
  • tokenizer 变了
  • pooling 变了
  • 模型加载错了

8.3 数据回流

线上数据要不断回流到训练系统。

可回流的内容包括:

  • 新 query
  • 新点击
  • 新 badcase
  • 人工纠错
  • 用户拒绝或不满意样本

这会形成一个持续优化闭环:

  1. 上线模型
  2. 收集线上日志
  3. 发现 badcase
  4. 更新数据集
  5. 重新训练和评估
  6. 再上线

9. 一个最常见的落地方案

如果是做中文知识库检索或 RAG,最常见的工程方案往往是:

第一步:准备文档数据

  • 清洗文档
  • 分段切块
  • 补充标题、来源、标签

第二步:准备 query-doc 训练对

来源可以是:

  • 搜索点击日志
  • FAQ 问题与答案标题
  • 人工构造的问法改写

第三步:构造 hard negative

  • 用 BM25 召回看起来相关但实际不对的文档
  • 用旧模型召回错例

第四步:微调双塔模型

  • query 和 passage 分开编码
  • 用对比学习训练
  • 周期性评估 Recall@K

第五步:离线编码知识库

  • 把每个 chunk 编码成向量
  • 写入向量数据库
  • 同步建立 BM25 倒排索引

第六步:部署在线查询服务

  • 用户 query 实时编码
  • 并行执行 BM25 召回和向量召回
  • 合并去重后取 topK

第七步:加一个 rerank 或 LLM

  • 检索类场景可加 rerank
  • RAG 场景可把 topK chunk 交给大模型

这条路线属于最常见、最实用、最容易先做出效果的一种方案。


10. 常见问题与坑

10.1 只换更大的模型,但不改数据

很多时候效果上不去,不是模型不够大,而是:

  • 正样本质量差
  • 负样本太简单
  • 标注不一致

10.2 训练和部署处理不一致

例如:

  • 训练时有前缀,线上没有
  • 训练时做归一化,线上没做
  • 训练时 mean pooling,线上用了 CLS

这会直接造成效果明显下降。

10.3 只看平均指标,不看 badcase

平均分提升,不代表关键场景真的提升。

很多严重问题只能从 badcase 中看出来。

10.4 离线效果好,线上效果差

常见原因:

  • 评估集不真实
  • 数据分布漂移
  • 线上 query 更短、更口语化
  • 文档切块方式不合理
  • 召回后没有做业务过滤和重排

10.5 文本太长却直接截断

句子嵌入模型虽然叫“句子”,但很多业务里实际输入是:

  • 标题 + 摘要
  • query + 上下文
  • 文档 chunk

如果文本经常超长,必须重新设计:

  • 切块策略
  • 标题拼接策略
  • 最大长度

10.6 误以为 BM25 可以被 embedding 完全替代

很多团队刚做向量检索时,会默认认为 embedding 上线后就不再需要 BM25。

这通常不成立。

在下面这些场景里,BM25 往往仍然很重要:

  • 搜索词里包含产品名、错误码、版本号
  • 用户就是想找某个精确术语
  • 业务要求结果必须可解释
  • 冷启动阶段还没有足够训练数据

更稳妥的思路通常是:

  • 先保留 BM25
  • 再增加向量召回
  • 最后通过融合和重排把两者优势结合起来

11. 一份可执行的开发清单

如果从零开始做一个句子嵌入项目,可以按下面清单推进:

  1. 明确业务任务是相似度、检索还是召回
  2. 确定线上目标指标和延迟预算
  3. 收集原始文本和日志数据
  4. 构造正样本、负样本、hard negative
  5. 清洗数据并切分 train / valid / test
  6. 建立标准评估集和 badcase 集
  7. 选择基座模型并定义预处理规则
  8. 选择 loss,开始训练第一版模型
  9. 做离线评估和 badcase 分析
  10. 导出模型并实现推理服务
  11. 建设 BM25 / 向量索引和离线建库流程
  12. 小流量灰度上线
  13. 监控线上指标并回流数据
  14. 持续做难例挖掘和版本迭代

12. 一句话总结

句子嵌入模型项目要想真正落地,核心不是“把模型跑起来”,而是把下面几件事串成闭环:

  • 用业务目标定义任务
  • 用高质量数据决定效果上限
  • 用 hard negative 提升区分能力
  • 用稳定推理链路保证线上一致性
  • 用监控和数据回流持续迭代

所以从工程角度看:

句子嵌入模型 = 数据构造能力 + 训练能力 + 检索部署能力 + 持续迭代能力。

数据处理与数据集评估笔记

1. 为什么数据决定模型上限

对句子嵌入、检索召回和 RAG 场景来说,模型当然重要,但很多时候真正决定上限的是数据。

常见原因有:

  • 正样本质量决定模型学到什么叫“相关”
  • 负样本质量决定模型能否真正学会区分
  • 样本分布决定模型上线后是否能泛化
  • 标签噪声决定训练信号是否稳定
  • 评估集质量决定你是否能看清真实问题

先记住一个判断:模型是在数据里学规律。如果数据本身混乱、偏斜或失真,训练出来的结果大概率也会带着同样的问题。


2. 数据处理到底包括什么

很多人一提数据处理,只想到“删乱码、去空值”,但在 embedding 项目里,数据处理通常至少包括下面这些部分:

  • 数据采集
  • 样本构造
  • 数据清洗
  • 去重和归一化
  • 标签治理
  • 样本分布分析
  • 训练 / 验证 / 测试切分
  • 评估集构建
  • 数据版本管理

换个更完整的说法:

数据处理 = 数据工程 + 数据质量控制 + 数据评估设计。


3. 先定义样本单位,再谈处理策略

在做数据处理之前,第一件事不是清洗,而是先明确“一个样本到底是什么”。

不同任务里,样本单位并不一样:

3.1 相似度任务

通常是句子对:

  • sentence_a
  • sentence_b
  • label

3.2 检索任务

通常是:

  • query
  • positive
  • negativehard_negative

3.3 文档向量库构建

通常是:

  • doc_id
  • chunk_id
  • text
  • metadata

如果样本单位没定义清楚,后面很多统计都会失真。
比如你以“句子对”为单位去算分布,和以“query”为单位去算分布,结论会完全不同。


4. 数据处理的主线流程

一个比较稳的流程通常是:

  1. 明确任务目标和样本单位
  2. 采集原始数据
  3. 构造正样本、负样本、hard negative
  4. 做文本清洗和标准化
  5. 做去重、去模板、去噪
  6. 分析样本分布和标签分布
  7. 切分 train / valid / test
  8. 构建标准评估集和 badcase 集
  9. 训练第一版模型
  10. 用 badcase 反推数据问题,再回流修正

这条链路里,最容易被忽视的不是清洗,而是:

  • 分布分析
  • 泄漏排查
  • 评估集设计

5. 数据清洗要做什么

5.1 基础清洗

最基础的一层通常包括:

  • 去空文本
  • 去乱码
  • 去无意义符号
  • 去极短文本
  • 去极长文本
  • 统一空白符
  • 统一全角半角
  • 统一大小写

如果是中文场景,还经常要考虑:

  • 简繁统一
  • 标点统一
  • URL、邮箱、手机号脱敏
  • 时间、数字、金额是否归一化

5.2 噪声清洗

很多业务数据不是“脏”,而是“有大量无效结构”。

例如:

  • 页眉页脚
  • 导航栏
  • 广告词
  • 模板开场白
  • 重复免责声明
  • OCR 识别错误文本

这些内容如果不清掉,会让模型学到大量无关模式。

5.3 去重

去重非常重要,因为重复样本会让训练和评估都失真。

常见去重对象:

  • 完全重复文本
  • 近重复文本
  • 同一 query 的重复点击对
  • 同一文档切块后的重复片段

如果不做去重,常见后果有:

  • 模型过度拟合高频表达
  • 评估结果虚高
  • 数据规模看起来很大,但有效信息量并不高

6. 样本分布为什么必须看

很多训练失败,不是模型没学会,而是数据分布本身有问题。

至少要看下面几类分布。

6.1 文本长度分布

要看:

  • query 长度分布
  • document / passage 长度分布
  • chunk 长度分布

如果训练集里大部分文本很短,但线上是长 query 或长段落,模型上线后就容易掉点。

6.2 标签分布

例如:

  • 正负样本比例
  • 2 / 1 / 0 多档标签比例
  • query 是否只有正样本没有负样本

如果标签极度失衡,模型往往会学到偏置策略,而不是学到真正的语义边界。

6.3 场景分布

要看数据是否覆盖不同业务场景,例如:

  • FAQ
  • 搜索
  • 商品检索
  • 知识库问答
  • 代码检索

如果训练样本只覆盖热门场景,长尾场景通常会明显掉效果。

6.4 热门与长尾分布

要区分:

  • 高频 query
  • 长尾 query
  • 高频文档
  • 稀有文档

如果训练集被热门 query 主导,模型可能对高频表达很强,但对长尾表达泛化很差。

6.5 正样本来源分布

正样本如果同时来自多种来源,例如:

  • 点击日志
  • 人工标注
  • FAQ 映射
  • 改写生成

最好分别统计占比,因为不同来源噪声强度不一样。

6.6 负样本难度分布

不能只看负样本数量,还要看难度结构。

通常可以分成:

  • 随机负样本
  • 同域负样本
  • hard negative

如果全是简单负样本,模型离线 loss 可能很好看,但上线区分能力依然不够。


7. 标签质量怎么检查

数据量大不代表标签质量高。

标签问题通常来自:

  • 点击不等于相关
  • 曝光位置偏置
  • 人工标注标准不一致
  • 自动构造规则太粗糙
  • 历史系统的错误被继承进新数据

可以从下面几方面检查:

7.1 抽样人工复核

最直接也最有效。

重点抽样:

  • 高频 query
  • 长尾 query
  • 高相似但错召回样本
  • 模型和标签明显冲突的样本

7.2 看标签一致性

例如同一个 query 是否出现下面这种情况:

  • 同一文档有时标正样本
  • 有时又标成负样本

这种冲突标签会直接污染训练信号。

7.3 看模型反常样本

如果一个样本长期表现为:

  • 标签是正样本,但模型始终给很低分
  • 标签是负样本,但模型和人工都觉得很相关

那就要怀疑数据或标签本身有问题,而不是先怀疑模型。


8. 正样本和负样本要怎么治理

8.1 正样本治理

正样本要优先保证“真相关”。

对于日志数据,常见过滤规则包括:

  • 去掉极短停留
  • 去掉误点击
  • 去掉曝光高但满意度低的内容
  • 去掉跳出率特别高的点击对

如果是结构化映射数据,还要检查:

  • 映射关系是否过期
  • 标题和正文是否错配
  • 类目标签是否长期维护

8.2 负样本治理

负样本的核心不是“越多越好”,而是“越像线上干扰项越好”。

常见做法:

  • 先用随机负样本保证基础区分
  • 再补同域负样本
  • 最后重点挖 hard negative

8.3 Hard Negative 是最值得花时间的部分

hard negative 常见来源:

  • BM25 高分误召回
  • 旧 embedding 模型的错召回
  • Cross-Encoder 高混淆样本
  • 人工 badcase 回流

很多时候,模型的真实提升不来自更多数据,而来自更强的 hard negative。


9. 训练集、验证集、测试集怎么切

切分数据时,重点不是比例,而是避免数据泄漏。

9.1 常见切分比例

  • 8:1:1
  • 9:0.5:0.5

9.2 更重要的是避免泄漏

例如:

  • 同一问题的改写版本,不要分到不同集合
  • 同一文档的多个 chunk,不要乱落到 train 和 test
  • 同一用户会话,不要一部分在训练、一部分在测试
  • 同一商品不同标题变体,不要跨集合乱分

如果泄漏存在,评估结果通常会明显虚高。

9.3 切分方式最好按业务实体来做

例如按下面这些维度切,通常更稳:

  • query_id
  • doc_id
  • 按会话
  • 按用户

而不是简单随机切行。


10. 评估集应该怎么设计

评估集不是简单从训练数据里抽一点出来,而是要刻意设计。

一个好的评估集通常要满足:

  • 覆盖核心业务场景
  • 覆盖热门 query 和长尾 query
  • 覆盖高混淆样本
  • 尽量有人工校验
  • 分布尽量接近线上真实请求

10.1 标准评估集

这是每次训练都要跑的稳定基线。

要求:

  • 版本稳定
  • 标注质量高
  • 用来做横向对比

10.2 Badcase 集

这是最重要的补充集。

来源通常是:

  • 线上错召回
  • 用户投诉
  • 人工复核发现的高价值错误
  • 新业务新术语

badcase 集的作用不是看平均分,而是防止关键错误反复出现。

10.3 分层评估

如果场景复杂,建议把评估集再分层:

  • 热门 query 集
  • 长尾 query 集
  • 专业术语集
  • 否定表达集
  • 数字、时间、版本号敏感集

这样你才能知道模型到底是“整体提升”,还是只在某一类场景提升。


11. 数据集评估到底评什么

除了看模型指标,还应该单独评估数据集本身。

至少可以看下面几类指标。

11.1 规模指标

  • 样本总量
  • 唯一 query 数
  • 唯一 doc 数
  • 平均每个 query 的正样本数
  • 平均每个 query 的负样本数

11.2 分布指标

  • 文本长度分布
  • 标签分布
  • 场景分布
  • 来源分布
  • 热门 / 长尾分布

11.3 质量指标

  • 重复率
  • 近重复率
  • 空文本比例
  • 模板文本比例
  • 乱码比例
  • 标签冲突率

11.4 难度指标

  • hard negative 占比
  • 高混淆样本占比
  • 多跳或长文本样本占比

如果这些指标长期不看,训练往往会变成“看 loss 猜问题”。


12. 一个很实用的数据诊断思路

如果模型效果不好,可以先按下面顺序排查:

  1. 看训练和测试是否泄漏
  2. 看是否有大量重复样本
  3. 看标签是否冲突
  4. 看负样本是不是太简单
  5. 看热门样本是否占比过高
  6. 看长尾 query 是否覆盖不足
  7. 看评估集是否真的接近线上

这条排查顺序很实用,因为很多问题根本不是模型结构导致的。


13. 数据版本管理为什么重要

如果你每次训练都在用“最新数据”,但没有快照和版本号,后面几乎一定会遇到这类问题:

  • 效果变好了,不知道是哪批数据带来的
  • 效果变差了,不知道是哪次清洗规则改坏了
  • badcase 修好了,但下个版本又回来了

所以至少要给下面这些东西做版本管理:

  • 原始数据快照
  • 清洗规则版本
  • 样本构造规则版本
  • hard negative 生成版本
  • 评估集版本
  • badcase 集版本

14. 一个适合落地的默认做法

如果你现在要从零做一套数据处理和数据集评估流程,可以先按这个 baseline 走:

  1. 先定义样本单位和标签规范
  2. 从业务日志和结构化关系里构造正样本
  3. 补随机负样本、同域负样本和 hard negative
  4. 做基础清洗、去重和归一化
  5. 统计长度分布、标签分布、场景分布和来源分布
  6. query_iddoc_id 切 train / valid / test
  7. 建一份小而稳定的标准评估集
  8. 再维护一份持续追加的 badcase 集
  9. 每次训练都固定跑离线评估和分层评估
  10. 把线上 badcase 持续回流

这套流程不花哨,但很稳,也最容易持续迭代。


15. 一份可直接抄的检查清单

  1. 是否明确了样本单位
  2. 是否定义了标注规则
  3. 是否做了基础清洗和噪声清洗
  4. 是否做了完全去重和近重复检查
  5. 是否统计了长度分布和标签分布
  6. 是否区分了热门和长尾样本
  7. 是否统计了正样本和负样本来源
  8. 是否有足够强的 hard negative
  9. 是否避免了 train / test 泄漏
  10. 是否有稳定的评估集和 badcase 集
  11. 是否给数据和规则做了版本管理

16. 一句话总结

数据处理和数据集评估的核心不是“把脏数据洗干净”,而是:

让训练数据更真实、分布更合理、标签更稳定、评估更可信。

对 embedding 项目来说,很多时候真正决定模型上限的,不是你换了多大的模型,而是你把数据治理做到了什么程度。

Celery 学习笔记

1. Celery 是什么

Celery 是一个基于 Python 的分布式任务队列,用来把“耗时任务”从主业务流程里拆出去异步执行。

典型场景:

  • 发送邮件、短信、消息通知
  • 文件转码、图片处理、PDF 解析
  • 大模型推理、批量数据处理
  • 定时任务、周期性任务
  • 需要失败重试的后台任务

先记住最实用的分工:FastAPI 负责接请求和返回结果,Celery 负责把耗时任务放到后台慢慢执行。

2. 为什么要用 Celery

如果一个接口内部要做很久的事情,比如:

  • 调第三方接口要 10 秒
  • 处理一批文件要 2 分钟
  • 模型推理要占满 CPU / GPU

那就不适合一直阻塞 HTTP 请求。更合理的做法是:

  1. FastAPI 收到请求
  2. 把任务丢给 Celery
  3. 立刻返回一个 task_id
  4. 前端或调用方再通过 task_id 查询执行结果

这样接口响应会更快,系统也更容易扩展。

3. Celery 核心架构

Celery 主要由 3 个角色组成:

3.1 Broker

消息中间件,负责存放“待执行任务”。

常见选择:

  • Redis
  • RabbitMQ

它就是任务排队和等待消费的地方。

3.2 Worker

执行任务的工作进程。

它会不断从 Broker 中取任务,然后真正执行 Python 函数。

3.3 Result Backend

结果存储,用来保存任务状态和返回值。

常见选择:

  • Redis
  • Database
  • RPC

如果你只关心“任务有没有执行”,不关心返回结果,也可以不配置结果后端;但实际项目里通常还是会配置,方便查状态。

4. 工作流程

Celery 的完整链路可以这样理解:

  1. 客户端或 FastAPI 调用一个 Celery 任务
  2. 任务消息被发送到 Broker
  3. Worker 从 Broker 取出任务
  4. Worker 执行任务函数
  5. 执行结果和状态写入 Result Backend
  6. 业务侧通过 task_id 查询任务状态和结果

对应的状态通常有:

  • PENDING:任务还没开始,或者还查不到
  • STARTED:任务已开始执行
  • SUCCESS:执行成功
  • FAILURE:执行失败
  • RETRY:正在重试
  • REVOKED:任务被撤销

说明:

  • 默认不一定会看到 STARTED
  • 如果想更明确地看到运行中状态,可以开启 task_track_started=True

5. 安装

1
pip install celery redis

如果使用 Redis 作为 Broker 和 Result Backend,需要先启动 Redis。

6. 最小可运行示例

6.1 编写 Celery 应用

文件:celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
from celery import Celery

app = Celery(
"demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

app.conf.update(
task_track_started=True,
timezone="Asia/Shanghai",
)

说明:

  • /1 表示 Redis 的第 1 个逻辑库,用来当 Broker
  • /2 表示 Redis 的第 2 个逻辑库,用来存任务结果
  • Broker 和 Backend 分开写更清晰,排查问题也更方便

6.2 定义任务

文件:tasks.py

1
2
3
4
5
6
7
8
import time

from celery_app import app

@app.task
def add(a: int, b: int) -> int:
time.sleep(3)
return a + b

这里的 add 虽然是普通 Python 函数,但加了 @app.task 之后,它就成了 Celery 任务。

6.3 启动 Worker

1
celery -A tasks worker -l info

含义:

  • -A tasks:表示 Celery 应用和任务定义在 tasks.py
  • worker:启动 worker 进程
  • -l info:日志级别为 info

6.4 提交任务

1
2
3
4
from tasks import add

result = add.delay(3, 5)
print(result.id)

注意:

  • delay() 只是“提交任务”
  • 它不会在当前进程里同步执行
  • 返回的是 AsyncResult
  • 真正执行任务的是独立的 Worker

6.5 查询任务结果

1
2
3
4
5
6
7
8
9
10
11
from celery.result import AsyncResult

from celery_app import app

task = AsyncResult("任务ID", app=app)

print(task.status)
print(task.successful())

if task.successful():
print(task.get())

常用属性和方法:

  • task.id:任务 ID
  • task.status:任务状态
  • task.successful():是否执行成功
  • task.failed():是否执行失败
  • task.ready():任务是否结束
  • task.get():获取结果,可能会阻塞

7. delay()apply_async() 的区别

7.1 delay()

最简单的调用方式,其实就是 apply_async() 的简写。

1
add.delay(3, 5)

适合:

  • 普通异步提交
  • 不需要额外参数时

7.2 apply_async()

更灵活,支持倒计时、定时执行、路由到指定队列等。

1
add.apply_async(args=[3, 5], countdown=10)

常见参数:

  • args / kwargs:任务参数
  • countdown=10:10 秒后执行
  • eta=...:指定某个时间点执行
  • expires=60:60 秒后任务过期
  • queue="email":指定进入哪个队列
  • routing_key="task.email":指定路由键

结论:

  • 简单任务用 delay()
  • 有调度需求时用 apply_async()

8. 任务重试

很多后台任务依赖外部资源,比如:

  • 第三方 API
  • 数据库
  • 对象存储
  • 模型服务

这些服务不稳定时,直接失败往往不合理,更适合自动重试。

1
2
3
4
5
6
7
8
9
from celery_app import app

@app.task(bind=True, max_retries=3, default_retry_delay=5)
def fetch_remote_data(self, url: str):
try:
# 这里假设会调用外部接口
raise RuntimeError("temporary error")
except Exception as exc:
raise self.retry(exc=exc)

说明:

  • bind=True 后,任务第一个参数会变成 self
  • self.retry(...) 可以重新入队
  • max_retries=3 表示最多重试 3 次
  • default_retry_delay=5 表示默认每次间隔 5 秒

适合有“短暂失败可能恢复”的任务,不适合参数本身就错误的任务。

9. 队列与路由

当任务类型越来越多时,通常不会把所有任务都丢到一个队列里。

常见拆法:

  • default:普通任务
  • email:发邮件
  • file:文件处理
  • gpu:模型推理

这样可以让不同 Worker 只处理自己擅长的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from celery import Celery
from kombu import Exchange, Queue

app = Celery("demo")

app.conf.task_queues = (
Queue("default", Exchange("default"), routing_key="default"),
Queue("file", Exchange("file"), routing_key="file.process"),
Queue("gpu", Exchange("gpu"), routing_key="gpu.infer"),
)

app.conf.task_routes = {
"tasks.process_pdf": {
"queue": "file",
"routing_key": "file.process",
},
"tasks.run_inference": {
"queue": "gpu",
"routing_key": "gpu.infer",
},
}

启动指定队列的 Worker:

1
2
celery -A tasks worker -Q file -l info
celery -A tasks worker -Q gpu -l info

这样做的好处:

  • 避免耗时任务阻塞普通任务
  • 可以按机器能力拆分 Worker
  • GPU 任务和 CPU 任务可以隔离

10. 定时任务

Celery 可以配合 celery beat 做周期任务。

例如:

  • 每天凌晨清理临时文件
  • 每小时刷新缓存
  • 每 10 分钟同步一次数据
1
2
3
4
5
6
7
8
from celery.schedules import crontab

app.conf.beat_schedule = {
"cleanup-temp-files": {
"task": "tasks.cleanup_temp_files",
"schedule": crontab(hour=3, minute=0),
},
}

启动:

1
celery -A tasks beat -l info

如果要实际执行任务,除了 beat 还要有 worker 在跑。

11. FastAPI 和 Celery 怎么配合

最常见的配合方式是:

  1. FastAPI 暴露 HTTP 接口
  2. 接口中调用 Celery 提交任务
  3. Celery Worker 在后台执行
  4. 再通过一个查询接口返回任务状态/结果

11.1 什么时候用 BackgroundTasks,什么时候用 Celery

FastAPI 自带 BackgroundTasks,但它和 Celery 不是一类东西。

BackgroundTasks 更适合:

  • 很轻量的后台操作
  • 跟当前服务进程强绑定
  • 不要求失败重试
  • 不要求多机器扩展

Celery 更适合:

  • 任务耗时长
  • 任务量大
  • 需要重试
  • 需要队列隔离
  • 需要多个 Worker 扩展
  • 需要独立于 Web 进程运行

一句话:

轻任务用 BackgroundTasks,重任务用 Celery。

12. FastAPI + Celery 案例 1:提交任务并查询状态

这是最常见的模式。

12.1 目录结构

1
2
3
4
project/
├── main.py
├── celery_app.py
└── tasks.py

12.2 celery_app.py

1
2
3
4
5
6
7
8
9
10
11
12
from celery import Celery

celery_app = Celery(
"fastapi_demo",
broker="redis://127.0.0.1:6379/1",
backend="redis://127.0.0.1:6379/2",
)

celery_app.conf.update(
task_track_started=True,
result_expires=3600,
)

12.3 tasks.py

1
2
3
4
5
6
7
8
import time

from celery_app import celery_app

@celery_app.task(name="tasks.long_time_add")
def long_time_add(a: int, b: int) -> int:
time.sleep(10)
return a + b

12.4 main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from fastapi import FastAPI
from celery.result import AsyncResult

from celery_app import celery_app
from tasks import long_time_add

app = FastAPI()

@app.post("/tasks/add")
def create_add_task(a: int, b: int):
task = long_time_add.delay(a, b)
return {
"message": "task submitted",
"task_id": task.id,
}

@app.get("/tasks/{task_id}")
def get_task_result(task_id: str):
task = AsyncResult(task_id, app=celery_app)

response = {
"task_id": task.id,
"status": task.status,
}

if task.successful():
response["result"] = task.result
elif task.failed():
response["error"] = str(task.result)

return response

12.5 启动方式

启动 FastAPI:

1
uvicorn main:app --reload

启动 Worker:

1
celery -A tasks worker -l info

请求流程:

  1. POST /tasks/add?a=3&b=5
  2. 接口立即返回 task_id
  3. 再调 GET /tasks/{task_id}
  4. 看任务是 PENDINGSTARTED 还是 SUCCESS

这个模式非常适合前后端分离项目。

13. FastAPI + Celery 案例 2:文件处理任务

这个场景很常见,比如:

  • 上传 PDF 后做文本提取
  • 上传图片后做缩略图
  • 上传音频后做转写

13.1 设计原则

不要把 UploadFile、数据库连接、请求对象直接传给 Celery。

原因:

  • Celery 任务参数通常需要能序列化
  • 这些对象往往不能直接被序列化
  • 跨进程后对象本身也失效了

正确做法:

  1. FastAPI 先把文件保存到磁盘或对象存储
  2. file_pathfile_url 传给 Celery
  3. Worker 根据路径再去处理文件

13.2 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pathlib import Path
import shutil
import uuid

from fastapi import FastAPI, File, UploadFile

from tasks import process_pdf

app = FastAPI()

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/files/pdf")
def upload_pdf(file: UploadFile = File(...)):
file_id = f"{uuid.uuid4()}_{file.filename}"
save_path = UPLOAD_DIR / file_id

with save_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)

task = process_pdf.delay(str(save_path))

return {
"file_path": str(save_path),
"task_id": task.id,
}

对应的 Celery 任务:

1
2
3
4
5
6
7
8
9
from celery_app import celery_app

@celery_app.task(name="tasks.process_pdf")
def process_pdf(file_path: str):
# 这里做 PDF 文本抽取、切分、向量化等操作
return {
"file_path": file_path,
"message": "processed",
}

这个思路很适合 AI 项目里的文档解析链路。

14. FastAPI + Celery 案例 3:模型推理任务

对于 AI 模型开发,这个场景更有代表性。

例如:

  • 文本摘要
  • 图片分类
  • 批量 embedding
  • 长文本抽取

14.1 为什么适合 Celery

因为模型推理通常具备这些特点:

  • 耗时不稳定
  • 资源消耗高
  • 可能需要排队
  • 可能要分 CPU / GPU Worker

14.2 一个简化版例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fastapi import FastAPI

from tasks import run_inference

app = FastAPI()

@app.post("/infer")
def create_inference_task(text: str):
task = run_inference.apply_async(
kwargs={"text": text},
queue="gpu",
routing_key="gpu.infer",
)
return {"task_id": task.id}

对应任务:

1
2
3
4
5
6
7
8
9
10
11
12
import time

from celery_app import celery_app

@celery_app.task(name="tasks.run_inference")
def run_inference(text: str):
time.sleep(5)
return {
"text": text,
"label": "positive",
"score": 0.98,
}

这里的重点不在模型代码本身,而在架构思路:

  • API 层只负责接收请求
  • 推理层在 Worker 中执行
  • GPU 任务路由到专门队列
  • 可以单独扩容推理 Worker

如果将来一张 GPU 卡只跑一个 Worker,也很好管理。

15. 实战里很重要的细节

15.1 任务参数尽量简单

建议只传这些:

  • 字符串
  • 数字
  • 布尔值
  • 列表 / 字典
  • 文件路径
  • 数据库主键 ID

不建议直接传:

  • 数据库 Session
  • 请求对象 Request
  • UploadFile
  • 大模型实例
  • 打开的文件句柄

原则:

任务参数最好是“可序列化、可重建、可追踪”的。

15.2 任务尽量幂等

幂等的意思是:同一个任务重复执行多次,结果最好一致,或者至少不会造成严重副作用。

因为这些情况都可能导致重复执行:

  • Worker 崩溃
  • 消息重复投递
  • 任务重试
  • 手动补偿

例如发通知、写数据库、扣费等场景,幂等性非常重要。

15.3 不要在 API 进程里执行重活

FastAPI 本身应该尽量保持:

  • 响应快
  • 逻辑薄
  • 只做参数校验、鉴权、任务分发

重计算、长耗时 IO、批处理,尽量放 Worker。

15.4 结果不是必须永久保存

很多项目会犯一个问题:任务结果一直存 Redis,时间一久就堆积很多垃圾数据。

可以设置:

  • result_expires
  • 任务完成后主动清理
  • 只保留必要结果,不保留大对象

如果结果很大,不要直接把大块内容塞进 Redis,建议落库或存文件,再返回引用地址。

15.5 错误处理不要只靠日志

建议至少做到:

  • 任务失败后能看到 FAILURE
  • 能根据 task_id 查到失败原因
  • 关键任务开启自动重试
  • 必要时记录业务日志或落库

15.6 CPU / GPU / IO 任务分开

这点在 AI 项目里尤其重要。

建议:

  • 文件下载、上传、请求第三方 API 放一个队列
  • CPU 预处理放一个队列
  • GPU 推理放一个队列

这样不会出现“一个长推理任务把所有普通任务都堵住”的问题。

15.7 Worker 并发要按任务类型调

Celery 的并发不是越大越好,要看任务类型。

一般经验:

  • IO 密集型任务可以适当提高并发
  • CPU 密集型任务并发通常不要超过 CPU 核心数太多
  • GPU 推理任务通常一个 Worker 进程绑定一张卡,甚至一个队列只开 --concurrency=1

例如 GPU Worker 常见写法:

1
celery -A tasks worker -Q gpu --concurrency=1 -l info

如果推理模型本身已经很吃显存,再盲目开高并发,反而容易把机器打爆。

16. 常见坑

16.1 任务提交成功,但一直不执行

通常排查:

  • Redis 是否启动
  • Worker 是否启动
  • -A 指向的模块是否正确
  • 任务有没有注册到当前 Worker
  • 是否路由到了某个没人消费的队列

16.2 能查到任务 ID,但拿不到结果

通常是:

  • 没配置 backend
  • task_id 查错了
  • 结果过期被清掉了

16.3 任务函数里引用了 Web 层对象

比如:

  • 直接把 Request 传进任务
  • 把 SQLAlchemy Session 传进任务
  • 把上传文件对象传进任务

这类设计一般都会有问题。任务应该依赖“可序列化参数”,不要依赖当前请求上下文。

16.4 把 Celery 当成同步 RPC 用

有些人虽然用了 Celery,但提交完任务后立刻 .get() 等待结果,这样就又退回同步调用了。

如果接口本身必须立刻拿到结果,那就要重新评估:

  • 这个任务是否真的适合放到 Celery
  • 是否应该走同步推理接口
  • 是否应该拆成“提交任务 + 轮询结果”

17. 一套比较实用的学习顺序

建议按这个顺序掌握:

  1. 先跑通最小示例
  2. 理解 delay()AsyncResult
  3. 学会查任务状态
  4. 学会重试
  5. 学会队列与路由
  6. 再接入 FastAPI
  7. 最后再做定时任务、监控、扩容

18. 总结

Celery 的本质就是:

  • 把耗时任务从主线程 / 主服务里剥离出去
  • 通过消息队列异步执行
  • 通过 Worker 扩展执行能力
  • 通过结果后端查询状态和结果

对于 FastAPI 项目,可以把它理解成一组固定分工:

  • FastAPI:对外提供 API
  • Redis / RabbitMQ:负责排队
  • Celery Worker:真正执行任务
  • Result Backend:保存状态和结果

如果项目里已经出现“接口慢、任务重、需要重试、需要排队、需要异步处理”的问题,那 Celery 基本就是很自然的选择。

19. 参考

Drogon 学习笔记

1. Drogon 是什么

Drogon 是一个基于 C++ 的现代 Web 应用框架,适合用来开发:

  • 高性能 HTTP API 服务
  • C++ 网关服务
  • 需要低延迟的业务接口
  • WebSocket 服务
  • 需要接 MySQL / PostgreSQL / Redis 的后端系统

它的几个核心特点:

  • 性能高,底层是非阻塞 IO
  • 支持异步编程
  • 支持路由、控制器、过滤器、中间件
  • 支持数据库、Redis、Session、文件上传
  • 支持 HTTP Client,也能调用其他内部服务
  • 支持 C++20 协程

如果先抓核心区别,可以这样记:FastAPI 更偏 Python 生态里的高效 API 框架,Drogon 则更像适合高性能 C++ 服务的全功能 Web 框架。

2. 为什么很多后端项目会用 Drogon

如果你的系统有这些特点:

  • 接口并发高
  • 对延迟敏感
  • 业务层想用 C++ 实现
  • 需要把网络、数据库、缓存、会话统一放在一个服务里

那 Drogon 会很合适。

尤其是在你现在这类 AI 项目架构里,Drogon 很适合承担:

  • 对外 API 网关
  • 会话和鉴权层
  • 请求转发层
  • MySQL 持久化层
  • Redis 状态层

而 Python 生态仍然更适合承担:

  • 模型推理
  • 文档解析
  • RAG 流程
  • Celery 异步任务

因此,很多混合架构会拆成:

  • Drogon:对外服务、状态治理、性能敏感层
  • FastAPI:内部模型服务、任务接口
  • Celery:后台异步执行

3. 安装和工程初始化

Drogon 的安装方式比 FastAPI 重一些,因为它是 C++ 框架,需要编译环境和依赖。

常见方式有:

  • vcpkg
  • 源码编译
  • Docker

如果你想先快速上手,最常见的是源码安装:

1
2
3
4
5
6
7
8
git clone https://github.com/drogonframework/drogon.git
cd drogon
git submodule update --init
mkdir build
cd build
cmake ..
make -j4
sudo make install

安装完成后,通常还会有一个命令行工具:

1
drogon_ctl

它常用来:

  • 创建项目骨架
  • 创建 Controller
  • 创建 Filter
  • 生成 ORM Model

例如创建一个项目:

1
drogon_ctl create project demo

4. 最小可运行示例

和 FastAPI 的 main.py 一样,Drogon 也可以先从一个最小服务开始。

4.1 CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
cmake_minimum_required(VERSION 3.16)
project(drogon_demo)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(Drogon CONFIG REQUIRED)

add_executable(${PROJECT_NAME} main.cc)
target_link_libraries(${PROJECT_NAME} PRIVATE Drogon::Drogon)

4.2 main.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <drogon/drogon.h>

using namespace drogon;

int main()
{
app().registerHandler(
"/",
[](const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
Json::Value json;
json["message"] = "hello drogon";
callback(HttpResponse::newHttpJsonResponse(json));
},
{Get});

app().addListener("0.0.0.0", 8080);
app().setThreadNum(1);
app().run();
}

编译和运行:

1
2
3
cmake -S . -B build
cmake --build build
./build/drogon_demo

访问:

  • http://127.0.0.1:8080/

如果对照 FastAPI,可以这样看:

  • app = FastAPI() 对应 app()
  • @app.get("/") 对应 registerHandler("/", ...)
  • return {...} 对应 callback(HttpResponse::newHttpJsonResponse(...))

5. Drogon 的基本工作方式

可以把一次请求理解成下面这条链路:

  1. 客户端发来 HTTP 请求
  2. Drogon 根据路径和方法匹配路由
  3. 先经过中间件和过滤器
  4. 进入 Handler 或 Controller 方法
  5. 业务代码处理请求
  6. 构造 HttpResponse
  7. 通过 callback 返回给客户端

和 FastAPI 很像,但有一个非常重要的区别:

Drogon 不会像 FastAPI 那样自动把 Python 函数返回值转成 JSON,你需要更显式地构造响应对象。

6. 直接注册 Handler

最简单的方式就是直接在 main() 里注册路由。

1
2
3
4
5
6
7
8
9
app().registerHandler(
"/health",
[](const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
Json::Value json;
json["status"] = "ok";
callback(HttpResponse::newHttpJsonResponse(json));
},
{Get});

适合:

  • 学习阶段
  • 很小的 demo
  • 临时测试接口

但项目一大,通常更推荐 Controller。

7. 三种常见 Controller

Drogon 常见的控制器主要有三类:

  • HttpSimpleController
  • HttpController
  • WebSocketController

7.1 HttpSimpleController

适合单个简单接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <drogon/HttpSimpleController.h>

class Ping : public drogon::HttpSimpleController<Ping>
{
public:
PATH_LIST_BEGIN
PATH_ADD("/ping", drogon::Get);
PATH_LIST_END

void asyncHandleHttpRequest(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback) override
{
Json::Value json;
json["message"] = "pong";
callback(drogon::HttpResponse::newHttpJsonResponse(json));
}
};

7.2 HttpController

适合一组 REST 风格接口,功能更完整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <drogon/HttpController.h>

namespace api::v1
{
class User : public drogon::HttpController<User>
{
public:
METHOD_LIST_BEGIN
ADD_METHOD_TO(User::getUser, "/api/v1/users/{id}", drogon::Get);
ADD_METHOD_TO(User::createUser, "/api/v1/users", drogon::Post);
METHOD_LIST_END

void getUser(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback,
int id) const
{
Json::Value json;
json["id"] = id;
callback(drogon::HttpResponse::newHttpJsonResponse(json));
}

void createUser(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback) const
{
auto jsonPtr = req->getJsonObject();

Json::Value json;
json["name"] = jsonPtr ? (*jsonPtr)["name"].asString() : "";
callback(drogon::HttpResponse::newHttpJsonResponse(json));
}
};
} // namespace api::v1

7.3 WebSocketController

适合:

  • 实时聊天
  • 实时协作
  • 推送类业务

如果你后面要做双向实时通信,可以重点看这一类;如果只是 AI 输出流式结果,很多时候 SSE 就够了,不一定非要 WebSocket。

8. 路径参数、查询参数、JSON 请求体

这部分可以类比 FastAPI 的:

  • 路径参数
  • 查询参数
  • 请求体

8.1 路径参数

在 Drogon 里,路径参数可以直接映射到函数参数。

1
ADD_METHOD_TO(BookController::getBook, "/api/v1/books/{book_id}", drogon::Get);

对应的处理函数:

1
2
3
4
void getBook(
const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback,
int bookId) const;

放到 FastAPI 的语境里,大致对应:

1
2
3
@app.get("/books/{book_id}")
def get_book(book_id: int):
...

8.2 查询参数

查询参数通常从 req 里读取:

1
2
auto keyword = req->getParameter("keyword");
auto page = req->getParameter("page");

例如:

1
/api/v1/search?keyword=llm&page=1

8.3 JSON 请求体

如果客户端发的是 JSON,请求体常见写法是:

1
2
3
4
5
6
7
8
9
10
auto jsonPtr = req->getJsonObject();
if (!jsonPtr)
{
auto resp = drogon::HttpResponse::newHttpResponse();
resp->setStatusCode(drogon::k400BadRequest);
callback(resp);
return;
}

std::string name = (*jsonPtr)["name"].asString();

结论:

  • 路径参数:函数参数映射
  • 查询参数:req->getParameter()
  • JSON 请求体:req->getJsonObject()

9. JSON 响应和状态码

Drogon 里最常见的响应方式是手动构造 JSON。

1
2
3
4
5
6
7
Json::Value json;
json["message"] = "created";
json["id"] = 1001;

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k201Created);
callback(resp);

这和 FastAPI 的差异在于:

  • FastAPI 更自动
  • Drogon 更显式

显式的好处是:

  • 更清楚知道返回了什么
  • 更方便做细粒度控制
  • 更符合 C++ 项目风格

10. 配置文件 config.json

很多 Drogon 项目不会把所有配置都写死在 main() 里,而是放到配置文件。

最常见的主程序写法是:

1
2
3
4
5
6
7
#include <drogon/drogon.h>

int main()
{
drogon::app().loadConfigFile("config.json");
drogon::app().run();
}

一个简化版配置示意:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"listeners": [
{
"address": "0.0.0.0",
"port": 8080
}
],
"db_clients": [
{
"name": "default",
"rdbms": "mysql",
"host": "127.0.0.1",
"port": 3306,
"dbname": "demo",
"user": "root",
"passwd": "123456",
"is_fast": false,
"number_of_connections": 4
}
],
"redis_clients": [
{
"name": "default",
"host": "127.0.0.1",
"port": 6379,
"number_of_connections": 2
}
],
"app": {
"number_of_threads": 4,
"enable_session": true,
"upload_path": "uploads",
"client_max_body_size": "50M"
}
}

这里最重要的点是:

  • listeners:监听地址和端口
  • db_clients:数据库连接
  • redis_clients:Redis 连接
  • app:线程数、Session、上传路径等全局设置

11. 过滤器和中间件

这部分很像 FastAPI 的依赖、鉴权逻辑、中间件,但 Drogon 拆得更明确。

简单理解:

  • Filter:更像“接口前置校验”
  • Middleware:更像“围绕请求处理链做前后处理”

例如做一个简单鉴权 Filter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <drogon/HttpFilter.h>

class AuthFilter : public drogon::HttpFilter<AuthFilter>
{
public:
void doFilter(const drogon::HttpRequestPtr &req,
FilterCallback &&fcb,
FilterChainCallback &&fccb) override
{
auto token = req->getHeader("x-api-key");
if (token != "secret")
{
Json::Value json;
json["error"] = "unauthorized";

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k401Unauthorized);
fcb(resp);
return;
}

fccb();
}
};

挂到某个接口上:

1
ADD_METHOD_TO(User::getUser, "/api/v1/users/{id}", drogon::Get, "AuthFilter");

适合抽出来做的逻辑通常有:

  • 登录校验
  • API Key 校验
  • 频率限制
  • 内网访问限制
  • 统一日志

12. Session

Drogon 内置了 Session 支持,但默认不是强制开启的。

它适合做:

  • 登录态
  • 短期会话数据
  • 访问频率控制
  • 一些轻量服务端状态

开启方式可以在配置文件里写:

1
2
3
4
5
{
"app": {
"enable_session": true
}
}

或者代码里显式开启:

1
drogon::app().enableSession(1200);

使用示例:

1
2
3
4
5
6
req->session()->insert("user_id", 1001);

if (req->session()->find("user_id"))
{
auto userId = req->session()->get<int>("user_id");
}

注意:

Session 依赖 Cookie。如果客户端不保存 Cookie,每次请求都可能被当成新会话。

13. 文件上传

在 AI 项目里,文件上传非常常见,比如:

  • 上传 PDF
  • 上传图片
  • 上传音频
  • 上传数据文件

Drogon 常用 MultiPartParser 来处理 multipart 上传。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <drogon/drogon.h>

void upload(const drogon::HttpRequestPtr &req,
std::function<void(const drogon::HttpResponsePtr &)> &&callback)
{
drogon::MultiPartParser parser;
if (parser.parse(req) != 0 || parser.getFiles().empty())
{
Json::Value json;
json["error"] = "no file";

auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k400BadRequest);
callback(resp);
return;
}

auto file = parser.getFiles()[0];
file.save("./uploads");

Json::Value json;
json["filename"] = file.getFileName();
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
callback(resp);
}

实际项目里要特别注意:

  • 不要直接信任原始文件名
  • 要限制文件大小
  • 要校验扩展名和 MIME 类型
  • 最好自己生成保存文件名

对于 AI 系统,更推荐的流程通常是:

  1. Drogon 接收上传
  2. 保存到磁盘或对象存储
  3. 写一条数据库记录
  4. 调内部 FastAPI / Celery 提交处理任务

14. 数据库访问:DbClient 和 ORM

Drogon 自带数据库支持,常见关系型数据库包括:

  • MySQL
  • PostgreSQL
  • SQLite3

最常见的是通过 DbClient 执行 SQL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
auto client = drogon::app().getDbClient();

client->execSqlAsync(
"select id, name from users where id = ?",
[callback](const drogon::orm::Result &result) {
Json::Value json;

if (result.empty())
{
json["error"] = "not found";
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k404NotFound);
callback(resp);
return;
}

json["id"] = result[0]["id"].as<int>();
json["name"] = result[0]["name"].as<std::string>();
callback(drogon::HttpResponse::newHttpJsonResponse(json));
},
[callback](const drogon::orm::DrogonDbException &e) {
Json::Value json;
json["error"] = e.base().what();
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k500InternalServerError);
callback(resp);
},
1001);

要点:

  • MySQL 占位符通常用 ?
  • PostgreSQL 占位符通常用 $1$2
  • 异步接口更符合 Drogon 的整体风格
  • 不要在回调里做很重的阻塞操作

如果你不想手写太多 SQL,还可以用 drogon_ctl 根据表结构生成 ORM Model。

适合:

  • 表结构比较稳定
  • 想减少手写样板代码
  • 想把数据库访问写得更结构化

15. Redis 支持

Drogon 也支持 Redis,而且同样是异步风格。

如果已经在 config.json 里配置了 redis_clients,运行后就可以获取客户端:

1
auto redisClient = drogon::app().getRedisClient();

执行命令示例:

1
2
3
4
5
6
7
8
9
redisClient->execCommandAsync(
[](const drogon::nosql::RedisResult &r) {
LOG_INFO << "redis ok: " << r.asString();
},
[](const std::exception &err) {
LOG_ERROR << "redis error: " << err.what();
},
"get session:%s",
"1001");

它适合做:

  • 缓存
  • 任务状态存储
  • 限流计数器
  • 会话辅助状态
  • 流式输出中间层

如果把你现在的整条链路连起来理解,就是:

  • Drogon:对外接口
  • Redis:缓存 / 状态 / 消息中间层
  • FastAPI / Celery:后台处理

16. C++20 协程 Task<>

如果你用的是较新的编译器,Drogon 支持协程,这可以让异步代码更接近同步写法。

例如数据库查询可以写成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <drogon/drogon.h>

drogon::Task<drogon::HttpResponsePtr> getUserCoro(int userId)
{
auto db = drogon::app().getDbClient();

try
{
auto result = co_await db->execSqlCoro(
"select id, name from users where id = ?",
userId);

Json::Value json;
json["id"] = result[0]["id"].as<int>();
json["name"] = result[0]["name"].as<std::string>();
co_return drogon::HttpResponse::newHttpJsonResponse(json);
}
catch (const drogon::orm::DrogonDbException &e)
{
Json::Value json;
json["error"] = e.base().what();
auto resp = drogon::HttpResponse::newHttpJsonResponse(json);
resp->setStatusCode(drogon::k500InternalServerError);
co_return resp;
}
}

它的好处是:

  • 少写回调嵌套
  • 控制流更清晰
  • 复杂异步逻辑更好维护

但你要注意:

  • 需要合适的编译器和 C++20 支持
  • 协程让代码更好写,不代表可以随便阻塞线程

17. Drogon 和 FastAPI 怎么选

这两个框架不是简单的“谁替代谁”,而是各自适合不同层。

维度 Drogon FastAPI
语言 C++ Python
性能 更高,适合性能敏感层 足够高,开发更快
开发效率 相对慢一些 很高
参数校验 需要自己控制更多细节 自动化更强
生态 系统层、服务层强 AI、数据、模型生态强
适合位置 网关、核心服务、低延迟接口 模型服务、管理后台、内部 API

如果只抓结论,可以这样选:

  • 想更快把模型接口和内部服务跑起来,用 FastAPI
  • 想把网关、状态治理和性能敏感层放在 C++ 里,用 Drogon

18. Drogon + FastAPI + Celery 的典型配合方式

这是你当前项目最值得掌握的一块。

一个典型流程可以这样拆:

  1. 客户端请求先到 Drogon
  2. Drogon 做鉴权、Session、参数检查、落库
  3. Drogon 调用内部 FastAPI 接口提交任务
  4. FastAPI 再把重任务交给 Celery
  5. Celery Worker 真正处理文档、推理、索引构建
  6. 结果写回 MySQL / Redis
  7. Drogon 再把状态或结果返回给前端

你可以把它们理解成固定分工:

  • Drogon:网关和业务编排层
  • FastAPI:Python 服务入口
  • Celery:后台任务执行层
  • Redis:状态、缓存、队列中间层
  • MySQL:长期持久化

这套架构特别适合:

  • 文档上传处理
  • RAG 建库
  • 长耗时模型推理
  • 流式输出聊天系统

19. 一个比较常见的项目结构

如果用 Drogon 做 C++ 网关,一个比较实用的结构可以是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
cpp_gateway/
├── CMakeLists.txt
├── config.json
├── main.cc
├── controllers/
│ ├── HealthController.h
│ ├── HealthController.cc
│ ├── DocumentsController.h
│ └── DocumentsController.cc
├── filters/
│ ├── AuthFilter.h
│ └── AuthFilter.cc
├── services/
│ ├── TaskService.h
│ ├── TaskService.cc
│ ├── PythonClient.h
│ └── PythonClient.cc
├── repositories/
│ ├── DocumentRepo.h
│ └── DocumentRepo.cc
└── models/

建议职责划分:

  • controllers/:只处理 HTTP 层
  • filters/:鉴权、限流、前置校验
  • services/:业务编排、调内部服务
  • repositories/:数据库读写
  • models/:ORM 生成模型或业务数据结构

也就是说:

不要把所有逻辑都堆在 Controller 里。

20. 常见坑

20.1 在 Handler 里做阻塞重活

例如:

  • 大文件解析
  • 模型推理
  • 长时间等待第三方服务

这些都不应该直接堵在 Drogon 的请求处理线程里。

更合理的做法是:

  • Drogon 负责接请求
  • 重活交给 FastAPI / Celery

20.2 误以为性能高就应该把所有东西都写进 C++

不是。

对于 AI 项目:

  • 模型生态、向量库、文档处理库,大部分都在 Python 侧更成熟
  • Drogon 更适合做高性能 API 和状态治理

20.3 忽视文件上传安全

例如:

  • 直接使用原始文件名保存
  • 不限制 body 大小
  • 不校验扩展名

这在上传接口里很危险。

这样会导致服务端不断创建新 Session,白白浪费资源。

20.5 数据库回调里继续做阻塞操作

这会拖慢整个异步链路。

20.6 Controller 过胖

如果 Controller 里同时写:

  • 参数解析
  • SQL
  • Redis
  • 调内部 HTTP
  • 业务编排

后面会非常难维护。

21. 一套比较实用的学习顺序

如果你是为了做 AI 项目里的 C++ 网关,推荐这样学:

  1. 先跑通最小 Drogon 服务
  2. 学会 registerHandler()HttpController
  3. 学会 JSON 请求和响应
  4. 学会 config.json
  5. 学会 Filter 和 Session
  6. 学会文件上传
  7. 学会 DbClient 和 Redis
  8. 最后再接 FastAPI 和 Celery

22. 总结

Drogon 的核心价值在于:

  • 用 C++ 写高性能 Web 服务
  • 把 HTTP、数据库、Redis、Session、文件上传放到一套框架里
  • 适合做网关层、业务层、低延迟接口层

对于 AI 项目,可以把它理解成:

  • 对外 API 入口
  • 会话治理层
  • 状态协调层
  • 数据持久化入口
  • 调用 Python 服务的编排层

如果你把前面几份笔记连起来看,可以把它们理解成一条链:

  • Drogon:对外网关 / C++ 服务层
  • FastAPI:Python API 层
  • Redis:缓存 / 状态 / 中间层
  • Celery:后台异步任务层
  • MySQL:持久化数据层

23. 参考