FastAPI和PostgreSQL实战,如何给应用同时装上缓存和日志翅膀?

摘要:本文分享了在FastAPI项目中集成Redis做缓存、Elasticsearch做日志存储的实战经验,包含完整代码示例和踩坑总结,帮你轻松提升API性能和可观测性。
一次搞定性能瓶颈与故障排查难题,让API飞起来,日志不再乱。 👉 咱们今天聊这些: 🚨 痛点直击 ⚙️ 原理速通 💻 代码实战 🔥 踩坑血泪 🚀 进阶姿势 1. 你的API还好吗? 🎯 有个朋友跟我吐槽,说他们的FastAPI服务上线第三天,数据库连接直接爆满,接口响应从50ms飙升到3秒。我问他:“你们用缓存了吗?” 他一脸懵:“数据库不是挺快的吗?” 你看,这不就是典型的“裸奔上阵”吗?PostgreSQL再强,也扛不住高频重复查询的“暴击”。而且一旦出问题,查日志全靠grep和awk,累死个人。 今天我们就来解决这两个痛点:用Redis给数据库加个“小本本”缓存热点数据,用Elasticsearch让日志变成“可搜索的数据库”。全程实操代码,都是我亲自踩过的坑,包你少走弯路。 2. 核心原理:收银员与小本本 把PostgreSQL想象成超市的总仓库,每次查询都得跑大老远去取货。Redis就是收银员随身带的小本本,记下最常卖的商品(热点数据)。下次顾客要,直接从小本本查,秒级响应。 Elasticsearch呢?它像个档案管理员,把所有日志分门别类建索引,你可以用关键词秒搜到任何请求的细节,再也不用登录服务器翻文件了。 3. 开干!FastAPI + Redis + ES 集成 假设你已经有个FastAPI项目,连了PostgreSQL。咱们一步步加料。 🔧 3.1 安装依赖 pip install redis elasticsearch[async] fastapi-cache2[redis] 这里用了fastapi-cache库,封装了缓存装饰器,省得自己写重复代码。当然你也可以直接用aioredis,看个人喜好。 📝 3.2 Redis缓存装饰器实战 创建一个缓存工具模块: # app/cache.py from fastapi_cache import FastAPICache from fastapi_cache.backends.redis import RedisBackend from fastapi_cache.decorator import cache import redis.asyncio as redis async def init_cache(redis_url: str = "redis://localhost:6379"): redis_client = redis.from_url(redis_url, encoding="utf-8", decode_responses=True) FastAPICache.init(RedisBackend(redis_client), prefix="fastapi-cache") 然后在启动事件中调用: # main.py from app.cache import init_cache @app.on_event("startup") async def startup(): await init_cache() 重点来了:在需要缓存的接口上加@cache()装饰器: @app.get("/items/{item_id}") @cache(expire=60) # 缓存60秒 async def get_item(item_id: int, db: Session = Depends(get_db)): # 这里是数据库查询 item = db.query(Item).filter(Item.id == item_id).first() return item 就这么简单!同样的请求60秒内直接走Redis,数据库连看都不看一眼。 ⚠️ 这里我踩过一个坑: 如果接口参数里有db session这种不可哈希的对象,fastapi-cache会报错。解决办法是把依赖项移到装饰器外面,或者用cache(…, key_builder=…)自定义键。 📜 3.3 Elasticsearch日志中间件 日志不能只打控制台,要统一送ES。我们写一个中间件,记录每次请求的方法、路径、状态码、耗时等。 # app/log_middleware.py from elasticsearch import AsyncElasticsearch import time import json es = AsyncElasticsearch(["http://localhost:9200"]) @app.middleware("http") async def log_to_es(request: Request, call_next): start = time.time() response = await call_next(request) duration = time.time() - start log_data = { "method": request.method, "url": str(request.url), "status_code": response.status_code, "duration": round(duration, 4), "client_ip": request.client.host, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } # 异步发送日志,别阻塞请求 await es.index(index="fastapi-logs", document=log_data) return response 注意: 生产环境千万别每个请求都同步发ES,得用批量发送+缓冲,或者用background tasks。上面只是demo,实际要优化。 4. 那些年我踩过的坑(必看) 集成完了?别急,下面这几个坑我挨个帮你排雷。 🔥 坑1:缓存穿透 — 查询一个不存在的id,每次都会穿透到数据库。解决方案:缓存空值或布隆过滤器。 🔥 坑2:ES索引爆炸 — 如果不管理索引生命周期,日志会把磁盘撑爆。一定要用ILM(索引生命周期管理)或定时删除旧索引。 🔥 坑3:异步Redis连接未关闭 — 服务停止时忘记关闭Redis连接,导致警告。在shutdown事件里加await FastAPICache.clear()和redis_client.close()。 🔥 坑4:ES连接失败导致请求阻塞 — 日志中间件里要加try-except,否则ES挂掉整个API也挂了。 是不是以为这样就完了?No no no,还有进阶操作: ✅ 用缓存预热,在启动时加载热门商品到Redis。 ✅ 给ES日志加上APM trace ID,配合链路追踪。 ✅ 把缓存装饰器封装成统一@custom_cache,自动处理异常和降级。
好了,今天就聊到这儿。如果你也在FastAPI里集成过Redis或ES,欢迎留言分享你的骚操作或者踩坑经历,咱们一起进步! 💡 觉得有用?点个赞+关注,下次写“FastAPI+celery异步任务实战”时第一个通知你! —— 爱写代码也爱吐槽的一名程序媛