FastAPI玩转Redis数据库,异步任务如何不哭?

摘要:还在为FastAPI异步任务里管理Redis连接和数据库会话而头疼?本文将从一个真实踩坑案例出发,带你深入浅出地理解如何优雅地在后台任务中操作外部服务。我们会手写一个生产级别的代码组织方案,帮你彻底告别连接泄露和资源耗尽的问题,让你的异步任
📝 摘要:还在为FastAPI异步任务里管理Redis连接和数据库会话而头疼?本文将从一个真实踩坑案例出发,带你深入浅出地理解如何优雅地在后台任务中操作外部服务。我们会手写一个生产级别的代码组织方案,帮你彻底告别连接泄露和资源耗尽的问题,让你的异步任务跑得又快又稳。 今天咱们聊的这个话题,可以说是每个用FastAPI做生产项目的同学,几乎都会遇到的一道坎——异步任务里怎么安全、高效地调用Redis和数据库? 你可能会问:“这有啥难的?直接丢到 BackgroundTasks 里,然后正常调用不就行了?”哎,如果真这么简单,我这篇文章就不会诞生了。听我给你讲个故事。 🎯 1. 一个让人抓狂的“小问题” 之前有个项目,需要在上传用户头像后,异步生成几种不同尺寸的缩略图,并把处理结果和状态存到MySQL,同时把用户ID和任务ID塞到Redis里做状态追踪。一切看起来都很美好,代码也跑通了。 但上线后,噩梦开始了!应用跑了一两天,就开始随机报错,有的说MySQL连接已关闭,有的说Redis连接数超限。 最离谱的是,有时候任务执行到一半,数据库连接突然断开了,导致部分数据写入失败,状态成了“薛定谔的完成”。 后来debug了好几天,才发现问题的根源:我把数据库和Redis的会话(Session/Connection)直接传递到了异步任务函数里,但生命周期完全错乱了! 🔍 2. 问题到底出在哪? FastAPI的BackgroundTasks虽然用起来简单,但它本质上是在响应返回之后,在同一个进程中“偷偷”执行的一个函数。 问题是,你在请求生命周期内创建的数据库会话(比如通过依赖项注入的db: Session),在请求结束后,通常会被框架自动关闭。但你的后台任务还在用这个已经被关闭的会话,不出错才怪! Redis连接也是类似,如果你把连接池里“借”出来的连接直接传进去,一旦主请求结束,连接被归还或关闭,后台任务再用的时候,就会直接GG。 这里我要特别强调一点:千万不要在异步任务里,直接复用请求生命周期内的资源对象!这是新手最容易踩的坑,也是我当初血泪教训的核心。 ⚙️ 3. 核心原理:各管各的,生命周期要分离 那正确的姿势是什么呢?核心思想就是“谁用谁创建,用完自己关”。 异步任务函数内部,不应该依赖外部传递进来的“活”连接,而是应该拥有自己独立的资源管理逻辑。 具体来说,我们需要在异步任务函数内部,重新创建所需的资源(比如新的数据库Session,新的Redis连接),并在任务执行完毕后,确保这些资源被正确关闭或归还。 这听起来像是个体力活,但其实我们可以通过一些好的代码组织模式,让它变得优雅且可维护。 🛠️ 4. 实战:生产级别的组织方案 好,理论说完了,咱们直接上代码。 📁 第一步:目录结构 project/ ├── app/ │ ├── api/ # 路由层 │ ├── core/ # 核心配置(数据库、Redis等) │ │ ├── database.py │ │ └── redis_client.py │ ├── models/ # 数据库模型 │ ├── schemas/ # Pydantic模型 │ └── tasks/ # 异步任务模块!✨ │ ├── __init__.py │ ├── user_tasks.py │ └── worker.py # 任务入口 └── ... 💾 第二步:核心资源管理(重点!) 在tasks/worker.py里,我们定义一个基类或辅助函数,专门负责在每个任务中,初始化和管理这些资源。 # tasks/worker.py from sqlalchemy.orm import sessionmaker from app.core.database import engine from app.core.redis_client import get_redis_client from contextlib import contextmanager # 注意:这里是在模块级别创建SessionLocal,它是一个工厂,不是具体的会话 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) @contextmanager def get_db_session(): """每个任务自己独立创建一个数据库会话,用完即关""" db = SessionLocal() try: yield db db.commit() except Exception: db.rollback() raise finally: db.close() @contextmanager def get_redis_conn(): """每个任务自己独立创建一个Redis连接,用完即关""" # 这里假设你的redis_client是连接池,这个函数返回一个连接实例 redis_client = get_redis_client() try: yield redis_client finally: # 注意:如果用的是连接池,归还连接的操作通常是在你调close()时内部处理的 # 这里只是示意,具体看你的Redis库实现 redis_client.close() 看到没?这里的关键就是 get_db_session 和 get_redis_conn 这两个上下文管理器。它们确保了每一个独立的异步任务,都拥有一个属于自己的、生命周期完整的资源。 🚀 第三步:编写具体的异步任务 现在,我们可以在tasks/user_tasks.py里编写具体的业务逻辑了。 # tasks/user_tasks.py from celery import Task from app.tasks.worker import get_db_session, get_redis_conn from app.models.user import User import asyncio # 如果任务里需要异步IO class ProcessAvatarTask(Task): name = "process_avatar" def run(self, user_id: int, image_path: str): # 这里我们自己来创建和管理资源 with get_db_session() as db: user = db.query(User).filter(User.id == user_id).first() # 1. 更新数据库状态 user.avatar_status = "processing" db.commit() # 2. 处理图片(耗时操作,可以是同步的,也可以跑在异步线程池) # 这里为了简单,用同步模拟 # ... # 3. 再次更新状态,并写入Redis with get_redis_conn() as redis: redis.set(f"user:{user_id}:avatar:status", "completed") with get_db_session() as db: user.avatar_status = "completed" db.commit() return {"status": "success", "user_id": user_id} 📞 第四步:在API路由中调用 # api/user.py from fastapi import APIRouter, BackgroundTasks from app.tasks.user_tasks import ProcessAvatarTask router = APIRouter() @router.post("/upload_avatar") async def upload_avatar(user_id: int, background_tasks: BackgroundTasks): # 这里不要传数据库session或redis连接进去! # 只传必要的业务数据,比如user_id和文件路径 background_tasks.add_task(ProcessAvatarTask().run, user_id, "/tmp/avatar.jpg") return {"msg": "任务已添加"} 💡 5. 进阶思考与踩坑预警 - 关于连接池大小: 别以为资源独立了就万事大吉。如果任务并发太高,每个任务都独立创建一个数据库连接,很容易把连接池撑爆。 所以,你的数据库连接池(比如SQLAlchemy的pool_size)要设置得合理一些,比如pool_size=20, max_overflow=10,然后根据你的任务并发量去调整。 - 关于Redis连接池: 上面的get_redis_conn最好是返回一个从连接池中获取的连接,而不是每次都新建一个TCP连接。这点很重要! - 关于错误重试: 如果你的任务依赖外部服务(比如Redis、数据库),一定要考虑网络抖动带来的暂时性失败。推荐使用Celery的autoretry_for机制,或者自己在任务里写重试逻辑。
好了,今天的内容就到这里。其实异步任务里的资源管理,核心就是一个“职责分离”的原则。API层只负责接收请求,投递任务,绝不越俎代庖去管理任务内部的生命周期。而任务层,则要像一个独立的小程序,自己负责所有资源的生杀大权。 希望这篇掏心窝子的分享,能让你在FastAPI的异步之路上少踩几个坑。如果你也有什么独家秘籍,或者被我文章里的某个点戳中了,欢迎在评论区留言,咱们一起交流,一起进步! 老规矩,觉得有用的话,点赞、关注、转发走一波,让更多小伙伴看到。毕竟,大家好,才是真的好嘛!😉