用FastAPI接ollama大模型,asyncio难题让我崩溃了吗?

摘要:很多人在用FastAPI调用外部API时会遇到阻塞、超时甚至崩溃。本文从asyncio原理讲起,通过FastAPI+httpx异步调用本地ollama模型,带你一步步搭建一个对话窗口,并分享我踩过的坑和解决方案。
📝 摘要:很多人在用FastAPI调用外部API时会遇到阻塞、超时甚至崩溃。本文从asyncio原理讲起,通过FastAPI+httpx异步调用本地ollama模型,带你一步步搭建一个对话窗口,并分享我踩过的坑和解决方案。 嘿,朋友们,我是一枚程序媛👩‍💻。今天咱们来聊聊最近让我又爱又恨的 asyncio,尤其是用FastAPI去调用本地ollama大模型时踩的那些坑。你是不是也觉得FastAPI既然是异步框架,调用外部API应该很丝滑?结果一上线,接口卡死、超时、甚至服务直接挂掉?别急,这篇文章就是来帮你排雷的。 🎯 先说个真事儿 前阵子我做一个AI对话服务,用FastAPI接本地的ollama模型。刚开始图省事,直接用 requests 库同步调用,结果并发上来后,CPU直接飙满,请求排长队,最后服务彻底没响应。后来换成 httpx 异步客户端,以为万事大吉,结果又遇到了流式解析错误、超时设置不当的问题……折腾了两天,总算摸清了门道。 今天就把这些经验掰开揉碎讲给你听,保证你能少走弯路。 📌 本文能帮你解决什么 ✅ 搞懂asyncio在FastAPI中到底怎么工作的(用餐厅比喻) ✅ 正确使用httpx异步调用外部API,避免阻塞 ✅ 处理ollama流式响应,实时返回给前端 ✅ 搭建一个简单的对话窗口,可以直接运行 🚨 第一部分:为什么异步调用外部API那么容易挂? 很多新手(包括当年的我)以为用了FastAPI就是异步了,路由函数前面加个 async def 就万事大吉。但真正的坑在于:如果你在异步函数里用了同步的IO操作(比如requests.get),事件循环就会被阻塞,整个服务都会卡住。 就好比你去餐厅吃饭,服务员(线程)就一个人,他帮你点完菜后不去服务其他桌,而是站在厨房门口等你的菜做好。那其他桌的客人就只能干等着。这就是典型的阻塞。 所以,调用外部API必须用异步HTTP客户端,比如 httpx.AsyncClient 或 aiohttp。但光是换库还不够,还得注意超时、连接复用、流式处理等细节。 🧠 第二部分:先懂原理,再动手 🍽️ 用餐厅比喻理解asyncio 想象一个餐厅只有一个服务员(一个线程)。他负责点菜、上菜、结账。如果每个客人点完菜后服务员都站在旁边等,那效率极低。聪明的服务员会: ▪️ 给客人A点完菜后,告诉厨房做菜(发起网络请求) ▪️ 然后立刻去服务客人B(交出控制权,await) ▪️ 等厨房喊“菜好了”(请求返回),再继续给A上菜 这就是asyncio的核心:在等待IO时让出事件循环,去执行其他任务。 所以你的异步代码里必须要有 await 点,否则就会阻塞。 🔧 httpx.AsyncClient 的正确姿势 httpx 是requests的异步兄弟。但有个坑:很多人每次请求都创建新的client,这会导致连接无法复用,性能反而更差。正确的做法是:全局复用一个client,或者用依赖注入确保单例。 另外,ollama的API支持流式返回,我们需要用 client.stream() 方法,并且实时解析JSON行。 ⚡ 第三部分:实战!FastAPI + ollama 对话窗口 假设你已经本地运行了ollama,并且拉取了模型(比如 qwen3:1.7b)。我们来实现一个简单的聊天接口,并提供一个简陋但可用的前端页面。 1️⃣ 项目结构 . ├── main.py # FastAPI应用 ├── static/ # 存放HTML │ └── chat.html └── requirements.txt 2️⃣ 安装依赖 fastapi uvicorn httpx jinja2 # 可选,为了简单我们直接返回HTML 3️⃣ 编写后端 main.py 这里要特别注意:httpx.AsyncClient 要声明为全局单例,并在应用关闭时清理。 from fastapi import FastAPI, Request from contextlib import asynccontextmanager from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles import httpx import json import asyncio # 全局复用httpx客户端 client = httpx.AsyncClient(timeout=30.0) # 超时设为30秒 @asynccontextmanager async def lifespan(app: FastAPI): yield await client.aclose() app = FastAPI(lifespan=lifespan) # 挂载静态文件(为了chat.html) app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/", response_class=HTMLResponse) async def get_chat_page(): '''提供聊天页面''' with open("static/chat2.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) @app.post("/chat") async def chat(request: Request): '''流式对话接口''' data = await request.json() prompt = data.get("prompt", "") # 构造ollama请求体(根据你的模型调整) ollama_payload = { "model": "qwen3:1.7b", "prompt": prompt, "stream": True } # 定义一个异步生成器,逐行解析ollama返回的JSON async def event_generator(): try: # 注意:这里用client.stream发起流式请求 async with client.stream("POST", "http://192.168.10.105:11434/api/generate", json=ollama_payload) as response: async for line in response.aiter_lines(): # 每次迭代前检查是否被取消(可选) if await asyncio.sleep(0, result=True): # 这行只是为了 yield 给事件循环检查取消 pass if line.strip(): try: obj = json.loads(line) # ollama流式返回每行包含一个response字段 if "response" in obj: yield f"data: {json.dumps({'text': obj['response']})}\n\n" # 如果是结束标志,可以发送特殊消息 if obj.get("done"): yield f"data: {json.dumps({'done': True})}\n\n" except json.JSONDecodeError: continue # 流结束后关闭 except asyncio.CancelledError: # 客户端断开,清理工作(实际上 async with 已经处理) print("生成器被取消,清理资源") raise # 重新抛出,让框架处理 except httpx.StreamClosed: print("流被意外关闭,尝试重新连接...") except httpx.TimeoutException: yield f"data: {json.dumps({'error': '请求超时'})}\n\n" except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") ⚠️ 重要警告: 千万别在每次请求里创建新的 httpx.AsyncClient,否则会耗尽文件描述符,而且连接无法复用😭。(实际测试发现每次请求都新建AsyncClient,其实也可以接受,因为 httpx 内部维护了连接池,开销不大。但如果追求极致性能,还是把 client作为全局单例,然后在生成器内部只使用async with client.stream(...)(不包AsyncClient的创建)。但要确保生成器退出时,不要关闭client(全局的应该由应用生命周期管理)。) 还有个坑点要注意,就是流的生命周期必须和生成器的生命周期绑定。也就是说,要在生成器内部使用async with,这样只要生成器还在迭代,流就保持打开;生成器结束(或者客户端断开)时,流自动关闭。 4️⃣ 前端页面 static/chat.html 一个极简的HTML,用EventSource或fetch接收流式数据。这里用fetch + reader演示。 <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>Chat with Ollama (可停止)</title> <style> body { max-width: 600px; margin: 50px auto; font-family: sans-serif; } #chat { height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; margin-bottom: 10px; } #input { width: 70%; padding: 8px; } button { padding: 8px 15px; margin-right: 5px; } #stopBtn { background-color: #f44336; color: white; border: none; } #stopBtn:disabled { background-color: #ccc; } </style> </head> <body> <h2>本地大模型聊天(可停止)</h2> <div id="chat"></div> <input type="text" id="input" placeholder="输入你的问题..." /> <button id="sendBtn">发送</button> <button id="stopBtn" disabled>停止</button> <script> const chatDiv = document.getElementById('chat'); const input = document.getElementById('input'); const sendBtn = document.getElementById('sendBtn'); const stopBtn = document.getElementById('stopBtn'); let controller = null; // 用于取消请求 let currentReader = null; function appendMessage(content, isUser = false) { const msg = document.createElement('div'); msg.style.margin = '10px 0'; msg.style.textAlign = isUser ? 'right' : 'left'; msg.innerHTML = `<strong>${isUser ? '你' : 'AI'}:</strong> ${content}`; chatDiv.appendChild(msg); chatDiv.scrollTop = chatDiv.scrollHeight; } // 停止生成 function stopGeneration() { if (controller) { controller.abort(); // 取消fetch controller = null; } stopBtn.disabled = true; sendBtn.disabled = false; // 可选:在界面上提示“已停止” appendMessage('(生成已停止)', false); } sendBtn.addEventListener('click', async () => { const text = input.value.trim(); if (!text) return; appendMessage(text, true); input.value = ''; // 准备取消控制器 controller = new AbortController(); const signal = controller.signal; sendBtn.disabled = true; stopBtn.disabled = false; // 调用后端的流式接口 try { const response = await fetch('/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: text }), signal: signal // 关联取消信号 }); if (!response.ok) { throw new Error(`HTTP error ${response.status}`); } const reader = response.body.getReader(); const decoder = new TextDecoder(); let aiMessage = ''; let done = false; // 保存reader以便可能的停止(其实abort后read会抛异常) currentReader = reader; while (true) { let chunk; try { const result = await reader.read(); if (result.done) break; chunk = decoder.decode(result.value, { stream: true }); } catch (err) { if (err.name === 'AbortError') { console.log('Fetch aborted'); break; } throw err; } // 解析SSE格式的数据:data: {...} const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { try { const jsonData = JSON.parse(line.slice(6)); if (jsonData.error) { appendMessage('错误:' + jsonData.error); done = true; break; } if (jsonData.text) { aiMessage += jsonData.text; // 实时更新最后一条消息(简单做法:先删除再添加) // 这里偷懒直接清除最后一条重新加,生产环境可以用diff更新 // 更新显示(简单处理:移除最后一条AI消息重新添加) if (chatDiv.lastChild && chatDiv.lastChild.innerText.startsWith('AI:')) { chatDiv.removeChild(chatDiv.lastChild); } appendMessage(aiMessage, false); } if (jsonData.done) { done = true; break; } } catch (e) { // 忽略解析错误 } } } if (done) break; } } catch (err) { if (err.name !== 'AbortError') { appendMessage('请求出错:' + err.message); } } finally { // 清理状态 controller = null; currentReader = null; sendBtn.disabled = false; stopBtn.disabled = true; } }); stopBtn.addEventListener('click', stopGeneration); </script> </body> </html> 运行 uvicorn main:app --reload,打开浏览器访问 http://localhost:8000,就能看到一个简陋但能用的聊天窗口了。🎉 🧯 第四部分:还有哪些坑? 🔸 超时设置:ollama生成大段文本可能耗时较长,务必调大timeout,否则会提前断开。 🔸 连接池限制:默认连接池最多10个并发连接,如果你的服务并发高,可以配置limits=httpx.Limits(max_keepalive_connections=20, max_connections=100) 🔸 异常处理:网络波动、ollama重启等都会引发异常,记得try,并给前端返回友好提示。 🔸 压力测试:可以用locust模拟并发,观察事件循环是否健康,连接数是否合理。 再说个进阶的思考:如果多个用户同时请求,且每个请求都要流式输出,那么每个请求都会占用一个到ollama的连接。如果ollama本身不支持高并发,你可能会把ollama搞崩。这时可以考虑请求队列、缓存等策略。
好了,今天分享的这些,都是我亲手踩坑又爬出来的经验。希望能帮你避开那些恼人的asyncio陷阱。如果你也在用FastAPI接大模型,或者遇到了其他奇怪的问题,欢迎在评论区留言交流~ 觉得有用的话,点个赞、收藏一下,下次写代码遇到问题就能快速找到了! 也欢迎转发给可能需要的朋友,咱们一起优雅地写异步代码。😄