FastAPI Celery异步任务中,如何高效整合Redis数据库实现生产级组织方案?

摘要:在 FastAPI 项目中使用 Celery 做异步任务时,如何优雅地管理数据库会话和 Redis 连接?本文从一个真实踩坑案例切入,深入剖析生命周期错乱的问题,并给出生产级别的代码组织方案。你将学会如何让 Celery 任务独立管理资源,
前面的文章里使用的BackgroundTasks进行的任务调用,这里使用Celery对整个方案重新构建下,核心逻辑同理!!! Celery 的任务是在独立的工作进程(worker)中执行的,和 FastAPI 的请求进程是分离的。如果你在 API 路由里通过依赖项注入创建了一个数据库会话,然后把这个会话对象作为参数传给 Celery 任务,会发生什么? 首先,数据库会话(比如 SQLAlchemy 的 Session)通常不是可序列化的,根本传不到 worker 那边。就算你用了某些技巧把它序列化了,worker 那边拿到后,这个会话对应的底层数据库连接可能早已在原始进程中关闭,或者根本不存在。结果就是各种稀奇古怪的报错。 Redis 连接同理,如果你把连接池里“借”出来的连接直接传给 Celery 任务,序列化后到了 worker 端,完全无法使用。 我们需要在 Celery 任务函数内部,重新创建所需的资源(比如新的数据库 Session,新的 Redis 连接),并在任务执行完毕后,确保这些资源被正确关闭或归还。 🛠️ 实战:生产级别的组织方案 好,理论前面已经说完了,更多的细节可以看上篇FastAPI里玩转Redis和数据库的正确姿势,别让异步任务把你坑哭了!,这里就直接上重构的代码。 我会展示一个我目前在用的、相对成熟的方案,它基于 Celery 和 SQLAlchemy,并配合 Redis 做状态缓存。 📁 第一步:目录结构 project/ ├── app/ │ ├── api/ # 路由层 │ ├── core/ # 核心配置 │ │ ├── database.py # 数据库引擎、SessionLocal 工厂 │ │ ├── redis_client.py # Redis 连接池 │ │ └── celery_app.py # Celery 应用实例 │ ├── models/ # 数据库模型 │ ├── schemas/ # Pydantic 模型 │ └── tasks/ # Celery 任务模块 ✨ │ ├── __init__.py │ └── user_tasks.py └── ... 💾 第二步:Celery 应用配置 在 core/celery_app.py 中创建 Celery 实例,并配置 broker 和 backend(通常用 Redis)。 # core/celery_app.py from celery import Celery import os celery_app = Celery( "worker", broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"), include=["app.tasks.user_tasks"] # 自动发现任务模块 ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="Asia/Shanghai", enable_utc=True, task_track_started=True, task_time_limit=30 * 60, # 任务超时时间 task_soft_time_limit=25 * 60, ) 🔌 第三步:资源管理上下文管理器(重点!) 在 core/database.py 和 core/redis_client.py 中定义好工厂和上下文管理器,供 Celery 任务独立使用。
阅读全文