Celery 学习笔记
Celery 学习笔记
1. Celery 是什么
Celery 是一个基于 Python 的分布式任务队列,用来把“耗时任务”从主业务流程里拆出去异步执行。
典型场景:
- 发送邮件、短信、消息通知
- 文件转码、图片处理、PDF 解析
- 大模型推理、批量数据处理
- 定时任务、周期性任务
- 需要失败重试的后台任务
先记住最实用的分工:FastAPI 负责接请求和返回结果,Celery 负责把耗时任务放到后台慢慢执行。
2. 为什么要用 Celery
如果一个接口内部要做很久的事情,比如:
- 调第三方接口要 10 秒
- 处理一批文件要 2 分钟
- 模型推理要占满 CPU / GPU
那就不适合一直阻塞 HTTP 请求。更合理的做法是:
- FastAPI 收到请求
- 把任务丢给 Celery
- 立刻返回一个
task_id - 前端或调用方再通过
task_id查询执行结果
这样接口响应会更快,系统也更容易扩展。
3. Celery 核心架构
Celery 主要由 3 个角色组成:
3.1 Broker
消息中间件,负责存放“待执行任务”。
常见选择:
- Redis
- RabbitMQ
它就是任务排队和等待消费的地方。
3.2 Worker
执行任务的工作进程。
它会不断从 Broker 中取任务,然后真正执行 Python 函数。
3.3 Result Backend
结果存储,用来保存任务状态和返回值。
常见选择:
- Redis
- Database
- RPC
如果你只关心“任务有没有执行”,不关心返回结果,也可以不配置结果后端;但实际项目里通常还是会配置,方便查状态。
4. 工作流程
Celery 的完整链路可以这样理解:
- 客户端或 FastAPI 调用一个 Celery 任务
- 任务消息被发送到 Broker
- Worker 从 Broker 取出任务
- Worker 执行任务函数
- 执行结果和状态写入 Result Backend
- 业务侧通过
task_id查询任务状态和结果
对应的状态通常有:
PENDING:任务还没开始,或者还查不到STARTED:任务已开始执行SUCCESS:执行成功FAILURE:执行失败RETRY:正在重试REVOKED:任务被撤销
说明:
- 默认不一定会看到
STARTED - 如果想更明确地看到运行中状态,可以开启
task_track_started=True
5. 安装
1 | pip install celery redis |
如果使用 Redis 作为 Broker 和 Result Backend,需要先启动 Redis。
6. 最小可运行示例
6.1 编写 Celery 应用
文件:celery_app.py
1 | from celery import Celery |
说明:
/1表示 Redis 的第 1 个逻辑库,用来当 Broker/2表示 Redis 的第 2 个逻辑库,用来存任务结果- Broker 和 Backend 分开写更清晰,排查问题也更方便
6.2 定义任务
文件:tasks.py
1 | import time |
这里的 add 虽然是普通 Python 函数,但加了 @app.task 之后,它就成了 Celery 任务。
6.3 启动 Worker
1 | celery -A tasks worker -l info |
含义:
-A tasks:表示 Celery 应用和任务定义在tasks.pyworker:启动 worker 进程-l info:日志级别为info
6.4 提交任务
1 | from tasks import add |
注意:
delay()只是“提交任务”- 它不会在当前进程里同步执行
- 返回的是
AsyncResult - 真正执行任务的是独立的 Worker
6.5 查询任务结果
1 | from celery.result import AsyncResult |
常用属性和方法:
task.id:任务 IDtask.status:任务状态task.successful():是否执行成功task.failed():是否执行失败task.ready():任务是否结束task.get():获取结果,可能会阻塞
7. delay() 和 apply_async() 的区别
7.1 delay()
最简单的调用方式,其实就是 apply_async() 的简写。
1 | add.delay(3, 5) |
适合:
- 普通异步提交
- 不需要额外参数时
7.2 apply_async()
更灵活,支持倒计时、定时执行、路由到指定队列等。
1 | add.apply_async(args=[3, 5], countdown=10) |
常见参数:
args/kwargs:任务参数countdown=10:10 秒后执行eta=...:指定某个时间点执行expires=60:60 秒后任务过期queue="email":指定进入哪个队列routing_key="task.email":指定路由键
结论:
- 简单任务用
delay() - 有调度需求时用
apply_async()
8. 任务重试
很多后台任务依赖外部资源,比如:
- 第三方 API
- 数据库
- 对象存储
- 模型服务
这些服务不稳定时,直接失败往往不合理,更适合自动重试。
1 | from celery_app import app |
说明:
bind=True后,任务第一个参数会变成selfself.retry(...)可以重新入队max_retries=3表示最多重试 3 次default_retry_delay=5表示默认每次间隔 5 秒
适合有“短暂失败可能恢复”的任务,不适合参数本身就错误的任务。
9. 队列与路由
当任务类型越来越多时,通常不会把所有任务都丢到一个队列里。
常见拆法:
default:普通任务email:发邮件file:文件处理gpu:模型推理
这样可以让不同 Worker 只处理自己擅长的任务。
1 | from celery import Celery |
启动指定队列的 Worker:
1 | celery -A tasks worker -Q file -l info |
这样做的好处:
- 避免耗时任务阻塞普通任务
- 可以按机器能力拆分 Worker
- GPU 任务和 CPU 任务可以隔离
10. 定时任务
Celery 可以配合 celery beat 做周期任务。
例如:
- 每天凌晨清理临时文件
- 每小时刷新缓存
- 每 10 分钟同步一次数据
1 | from celery.schedules import crontab |
启动:
1 | celery -A tasks beat -l info |
如果要实际执行任务,除了 beat 还要有 worker 在跑。
11. FastAPI 和 Celery 怎么配合
最常见的配合方式是:
- FastAPI 暴露 HTTP 接口
- 接口中调用 Celery 提交任务
- Celery Worker 在后台执行
- 再通过一个查询接口返回任务状态/结果
11.1 什么时候用 BackgroundTasks,什么时候用 Celery
FastAPI 自带 BackgroundTasks,但它和 Celery 不是一类东西。
BackgroundTasks 更适合:
- 很轻量的后台操作
- 跟当前服务进程强绑定
- 不要求失败重试
- 不要求多机器扩展
Celery 更适合:
- 任务耗时长
- 任务量大
- 需要重试
- 需要队列隔离
- 需要多个 Worker 扩展
- 需要独立于 Web 进程运行
一句话:
轻任务用
BackgroundTasks,重任务用 Celery。
12. FastAPI + Celery 案例 1:提交任务并查询状态
这是最常见的模式。
12.1 目录结构
1 | project/ |
12.2 celery_app.py
1 | from celery import Celery |
12.3 tasks.py
1 | import time |
12.4 main.py
1 | from fastapi import FastAPI |
12.5 启动方式
启动 FastAPI:
1 | uvicorn main:app --reload |
启动 Worker:
1 | celery -A tasks worker -l info |
请求流程:
- 调
POST /tasks/add?a=3&b=5 - 接口立即返回
task_id - 再调
GET /tasks/{task_id} - 看任务是
PENDING、STARTED还是SUCCESS
这个模式非常适合前后端分离项目。
13. FastAPI + Celery 案例 2:文件处理任务
这个场景很常见,比如:
- 上传 PDF 后做文本提取
- 上传图片后做缩略图
- 上传音频后做转写
13.1 设计原则
不要把 UploadFile、数据库连接、请求对象直接传给 Celery。
原因:
- Celery 任务参数通常需要能序列化
- 这些对象往往不能直接被序列化
- 跨进程后对象本身也失效了
正确做法:
- FastAPI 先把文件保存到磁盘或对象存储
- 把
file_path或file_url传给 Celery - Worker 根据路径再去处理文件
13.2 示例
1 | from pathlib import Path |
对应的 Celery 任务:
1 | from celery_app import celery_app |
这个思路很适合 AI 项目里的文档解析链路。
14. FastAPI + Celery 案例 3:模型推理任务
对于 AI 模型开发,这个场景更有代表性。
例如:
- 文本摘要
- 图片分类
- 批量 embedding
- 长文本抽取
14.1 为什么适合 Celery
因为模型推理通常具备这些特点:
- 耗时不稳定
- 资源消耗高
- 可能需要排队
- 可能要分 CPU / GPU Worker
14.2 一个简化版例子
1 | from fastapi import FastAPI |
对应任务:
1 | import time |
这里的重点不在模型代码本身,而在架构思路:
- API 层只负责接收请求
- 推理层在 Worker 中执行
- GPU 任务路由到专门队列
- 可以单独扩容推理 Worker
如果将来一张 GPU 卡只跑一个 Worker,也很好管理。
15. 实战里很重要的细节
15.1 任务参数尽量简单
建议只传这些:
- 字符串
- 数字
- 布尔值
- 列表 / 字典
- 文件路径
- 数据库主键 ID
不建议直接传:
- 数据库 Session
- 请求对象
Request UploadFile- 大模型实例
- 打开的文件句柄
原则:
任务参数最好是“可序列化、可重建、可追踪”的。
15.2 任务尽量幂等
幂等的意思是:同一个任务重复执行多次,结果最好一致,或者至少不会造成严重副作用。
因为这些情况都可能导致重复执行:
- Worker 崩溃
- 消息重复投递
- 任务重试
- 手动补偿
例如发通知、写数据库、扣费等场景,幂等性非常重要。
15.3 不要在 API 进程里执行重活
FastAPI 本身应该尽量保持:
- 响应快
- 逻辑薄
- 只做参数校验、鉴权、任务分发
重计算、长耗时 IO、批处理,尽量放 Worker。
15.4 结果不是必须永久保存
很多项目会犯一个问题:任务结果一直存 Redis,时间一久就堆积很多垃圾数据。
可以设置:
result_expires- 任务完成后主动清理
- 只保留必要结果,不保留大对象
如果结果很大,不要直接把大块内容塞进 Redis,建议落库或存文件,再返回引用地址。
15.5 错误处理不要只靠日志
建议至少做到:
- 任务失败后能看到
FAILURE - 能根据
task_id查到失败原因 - 关键任务开启自动重试
- 必要时记录业务日志或落库
15.6 CPU / GPU / IO 任务分开
这点在 AI 项目里尤其重要。
建议:
- 文件下载、上传、请求第三方 API 放一个队列
- CPU 预处理放一个队列
- GPU 推理放一个队列
这样不会出现“一个长推理任务把所有普通任务都堵住”的问题。
15.7 Worker 并发要按任务类型调
Celery 的并发不是越大越好,要看任务类型。
一般经验:
- IO 密集型任务可以适当提高并发
- CPU 密集型任务并发通常不要超过 CPU 核心数太多
- GPU 推理任务通常一个 Worker 进程绑定一张卡,甚至一个队列只开
--concurrency=1
例如 GPU Worker 常见写法:
1 | celery -A tasks worker -Q gpu --concurrency=1 -l info |
如果推理模型本身已经很吃显存,再盲目开高并发,反而容易把机器打爆。
16. 常见坑
16.1 任务提交成功,但一直不执行
通常排查:
- Redis 是否启动
- Worker 是否启动
-A指向的模块是否正确- 任务有没有注册到当前 Worker
- 是否路由到了某个没人消费的队列
16.2 能查到任务 ID,但拿不到结果
通常是:
- 没配置
backend task_id查错了- 结果过期被清掉了
16.3 任务函数里引用了 Web 层对象
比如:
- 直接把
Request传进任务 - 把 SQLAlchemy Session 传进任务
- 把上传文件对象传进任务
这类设计一般都会有问题。任务应该依赖“可序列化参数”,不要依赖当前请求上下文。
16.4 把 Celery 当成同步 RPC 用
有些人虽然用了 Celery,但提交完任务后立刻 .get() 等待结果,这样就又退回同步调用了。
如果接口本身必须立刻拿到结果,那就要重新评估:
- 这个任务是否真的适合放到 Celery
- 是否应该走同步推理接口
- 是否应该拆成“提交任务 + 轮询结果”
17. 一套比较实用的学习顺序
建议按这个顺序掌握:
- 先跑通最小示例
- 理解
delay()、AsyncResult - 学会查任务状态
- 学会重试
- 学会队列与路由
- 再接入 FastAPI
- 最后再做定时任务、监控、扩容
18. 总结
Celery 的本质就是:
- 把耗时任务从主线程 / 主服务里剥离出去
- 通过消息队列异步执行
- 通过 Worker 扩展执行能力
- 通过结果后端查询状态和结果
对于 FastAPI 项目,可以把它理解成一组固定分工:
- FastAPI:对外提供 API
- Redis / RabbitMQ:负责排队
- Celery Worker:真正执行任务
- Result Backend:保存状态和结果
如果项目里已经出现“接口慢、任务重、需要重试、需要排队、需要异步处理”的问题,那 Celery 基本就是很自然的选择。
19. 参考
- Celery First Steps: https://docs.celeryq.dev/en/main/getting-started/first-steps-with-celery.html
- Celery Calling Tasks: https://docs.celeryq.dev/en/main/userguide/calling.html
- Celery Tasks Guide: https://docs.celeryq.dev/en/main/userguide/tasks.html
- Celery Routing Tasks: https://docs.celeryq.dev/en/main/userguide/routing.html
- Celery Configuration: https://docs.celeryq.dev/en/main/userguide/configuration.html
- FastAPI Background Tasks: https://fastapi.tiangolo.com/tutorial/background-tasks/