如何手搓代码沙箱与FastAPI-MCP结合实战?

摘要:前两章我们讨论了JupyterAgent,当时用的是E2B的代码沙箱。这次我决定自己动手,用字节的TRAE从头构建一个Python代码沙箱,并加入MCP支持。完整代码已经开源在github.comDSXiangLisimple_sand
最近Vibe Code在各种技术社区刷屏,不过说实话,在日常工作中,我更多是用LLM来生成文档、批量修改代码或者排查问题。毕竟业务需求嘛,很少有能一次性描述清楚的(懂的都懂哈哈~)。但在看了最新的SWE-Bench Pro评测后,我决定尝试一下端到端的AI编程体验。 前两章我们讨论了JupyterAgent,当时用的是E2B的代码沙箱。这次我决定自己动手,用字节的TRAE从头构建一个Python代码沙箱,并加入MCP支持。完整代码已经开源在simple_sandbox,Star is Welcomed! 个人体感TRAE在国内Coding IDE里算Very Good,比开源的Cline,Kilo等在Token使用上效率更高,但和cursor还有距离。如何最大化Token使用效率和效果是个系统工程问题~ Vibe Coding实战:从零构建代码沙箱 本来想完整分享整个Vibe Coding过程的,结果TRAE升级把历史对话记录清空了(哭)。那就跟大家分享一下我的操作思路和最终成果吧! 我在使用AI IDE时通常有两种策略: 模型主导:让模型先做整体规划,我人工调整后让模型执行,依赖模型的测试文件和执行报错进行迭代优化 人工主导:我自己拆解任务,让模型逐步实现,每一步都进行人工校验 如何选择这两种模式完全取决于我对于整个项目是否有很强的先验思考,哈哈就是我知道怎么做的我带着模型做,我不知道的模型带着我做。 这次的任务因为有E2B的SDK可以参考,我选择了人工主导的方式,将任务拆解成几个明确的步骤: 沙箱核心功能:沙箱创建、环境初始化、代码执行 功能完善:结构化输出、预装环境、工作目录隔离、中文字体支持 扩展功能:文件上传、文件获取、超时关闭 服务化:搭建FastAPI服务接口 客户端Demo:创建请求示例 文档撰写:完善的README 最终模型实现的沙箱核心类如下,完整代码请移步Github,在整个编码过程中,我只在功能设计层面做了调整,没有发现任何编码错误: # 沙箱类 class Sandbox: def __init__(self, sandbox_id: str, work_dir: str, venv_dir: str): self.sandbox_id = sandbox_id self.work_dir = work_dir self.venv_dir = venv_dir # 配置KernelManager使用虚拟环境中的Python解释器 self.kernel_manager = KernelManager( kernel_name='python3', kernel_spec_manager=self._create_custom_kernel_spec_manager() ) # 设置环境变量 env = os.environ.copy() env['VIRTUAL_ENV'] = venv_dir # 设置内核使用虚拟环境中的Python self.kernel_manager.start_kernel( cwd=work_dir, env=env, ) self.kernel_client = self.kernel_manager.client() self.kernel_client.start_channels() self.kernel_client.wait_for_ready() self.last_execute_id = 0 # 复制字体文件到沙箱工作目录 font_source_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'SimHei.ttf') font_dest_path = os.path.join(work_dir, 'SimHei.ttf') if os.path.exists(font_source_path): try: shutil.copy2(font_source_path, font_dest_path) print(f"字体文件已复制到: {font_dest_path}") except Exception as e: print(f"复制字体文件失败: {e}") else: print(f"未找到字体文件: {font_source_path}") # 安装基本包 self._install_basic_packages() # 执行字体注册代码 self.execute_code(font_code) def _create_custom_kernel_spec_manager(self): # 创建自定义的内核规范管理器,确保使用正确的Python环境 from jupyter_client.kernelspec import KernelSpecManager ksm = KernelSpecManager() return ksm def _install_basic_packages(self): # 在虚拟环境中安装基本包 # 如果是从镜像复制的环境,可能已经包含了基本包,这里可以跳过或仅检查 try: # 直接使用Linux路径设置 pip_path = os.path.join(self.venv_dir, 'bin', 'pip') python_exe = os.path.join(self.venv_dir, 'bin', 'python') # 检查ipykernel是否已安装 check_result = subprocess.run( [python_exe, '-c', 'import ipykernel'], capture_output=True, text=True ) # 如果ipykernel未安装,则安装 if check_result.returncode != 0: print("ipykernel未安装,开始安装...") subprocess.check_call([pip_path, 'install', 'ipykernel']) except Exception as e: print(f"安装基础包失败: {e}") def execute_code(self, code: str) -> Dict: # 生成执行ID self.last_execute_id += 1 # 执行代码 msg_id = self.kernel_client.execute(code) stdout = [] stderr = [] error = None results = [] # 收集执行结果 while True: try: msg = self.kernel_client.get_iopub_msg(timeout=3600) msg_type = msg['header']['msg_type'] if msg['parent_header'].get('msg_id') != msg_id: continue elif msg_type == 'stream': content = msg['content'] if content['name'] == 'stdout': stdout.append(content['text']) elif content['name'] == 'stderr': stderr.append(content['text']) elif msg_type == 'error': content = msg['content'] # 合并error字段,包含ename、evalue和traceback error = { 'name':ansi_escape.sub('', content['ename']), 'value': ansi_escape.sub('',content['evalue']), 'traceback': [ansi_escape.sub('', i) for i in content['traceback']] } elif msg_type == 'execute_result': # 处理执行结果,按照{type,data}格式存储 data = msg['content']['data'] for data_type, data_value in data.items(): results.append({"type": data_type, "data": data_value}) elif msg_type == 'display_data': # 处理显示数据,按照{type,data}格式存储 data = msg['content']['data'] for data_type, data_value in data.items(): results.append({"type": data_type, "data": data_value}) elif msg_type == 'execute_reply': break elif msg_type == 'status': if msg['content']['execution_state'] == 'idle': break except Exception: break return { 'stdout': [ansi_escape.sub('', i) for i in stdout], 'stderr': [ansi_escape.sub('', i) for i in stderr], 'error': error, 'results': results } def upload_file(self, file: UploadFile, file_path: Optional[str] = None) -> str: # 确定文件保存路径:默认为工作目录最简化code if file_path: save_path = os.path.join(self.work_dir, file_path) else: save_path = os.path.join(self.work_dir, file.filename) # 确保目标目录存在 os.makedirs(os.path.dirname(save_path), exist_ok=True) # 保存文件 with open(save_path, "wb") as f: shutil.copyfileobj(file.file, f) return save_path def get_files(self) -> List[Dict[str, str]]: files = [] for root, _, filenames in os.walk(self.work_dir): for filename in filenames: file_path = os.path.join(root, filename) relative_path = os.path.relpath(file_path, self.work_dir) files.append({ 'path': relative_path, 'size': os.path.getsize(file_path) }) return files def get_file_path(self, file_path: str) -> str: full_path = os.path.abspath(os.path.join(self.work_dir, file_path)) # 安全检查,确保文件在工作目录内 if not full_path.startswith(os.path.abspath(self.work_dir)): raise HTTPException(status_code=403, detail="File access denied") return full_path def shutdown(self): try: self.kernel_client.stop_channels() self.kernel_manager.shutdown_kernel() # 清理工作目录 shutil.rmtree(self.work_dir, ignore_errors=True) # 清理虚拟环境目录 shutil.rmtree(self.venv_dir, ignore_errors=True) except Exception: pass def install_package(self, package_name: str) -> Dict: """在沙箱的虚拟环境中安装Python包""" try: # 获取虚拟环境中的pip路径(Linux环境) pip_path = os.path.join(self.venv_dir, 'bin', 'pip') # 执行pip安装命令 result = subprocess.run( [pip_path, 'install', package_name], capture_output=True, text=True, timeout=60 # 设置超时时间 ) # 检查安装是否成功 if result.returncode == 0: return { 'success': True, 'stdout': result.stdout, 'stderr': result.stderr, 'message': f"成功安装包: {package_name}" } else: return { 'success': False, 'stdout': result.stdout, 'stderr': result.stderr, 'message': f"安装包失败: {package_name}" } except Exception as e: return { 'success': False, 'stdout': '', 'stderr': str(e), 'message': f"安装过程出错: {str(e)}" } 用FastAPI-MCP打造标准化MCP服务 沙箱服务跑通后,我决定将其打包成标准的MCP服务。这时候就轮到FastAPI-MCP出场了!就像我们在解密prompt系列58. MCP - 工具演变 & MCP基础中提到的MCP本身并不是工具,它只是Adapter,而FastAPI-MCP库完美体现了这一特性——它可以将现有的FastAPI工具直接转换成标准MCP服务。 但这里遇到了一个常见问题:大模型对新的library支持不够好。解决方案是使用上下文管理模块,将API接口文档加入上下文。这种方法特别适用于: 这两年的新Library:MCP etc. 大版本更新的Lirabry: 例如ES 7.XX -> Elastic Search 8.XX 这样我们就可以引用对应的API文档让TRAE帮我们进一步把服务借助FastAPI-MCP包装成MCP服务,并使用FastMCP给出请求Demo。而FastAPI-MCP的使用也非常简单,只需要添加三行代码就可以完成MCP服务的适配 from fastapi_mcp import FastApiMCP # 创建并挂载MCP服务器 - 移到所有端点定义之后 mcp = FastApiMCP(app) mcp.mount_http() 接下来,我把FastMCP的接口文档加入上下文,让模型生成MCP Client来验证服务。但运行时发现MCP服务可以请求成功,list_tool却显示为空。通过观察输入输出来定位问题,TRAE成功找到了问题所在并进行了修复。 重要提示:使用FastMCP-API时需要先添加端点再挂载MCP! 不过下一个问题难住了TRAE:调用后发现FastMCP-API默认使用端点名称+函数名称作为工具名称,导致list_tool显示的工具名很奇怪。我想优化工具命名,但TRAE没有在接口文档中找到对应内容。 其实解决方案很简单,只需要添加operation_id参数就能提供工具别名: # Explicit operation_id (tool will be named "get_user_info") @app.get("/users/{user_id}", operation_id="get_user_info") async def read_user(user_id: int): return {"user_id": user_id} 结合上下文和明确的任务拆解,LLM在Coding任务上确实是当前AI能发挥最大价值的领域可能没有之一。笔者已经很明显的感觉在工作中能否最大程度发挥工具的能力已经能很显著地拉开大家工作效率的差距。哈哈这里肯定有人吐槽当牛马你要这么高的效率干什么,但咱就说省出来的时间咱鼓捣点新鲜好玩的不香么?下次咱结合开源的Cline或Kilo一边看上下文的工程设计,一边看下模型自主对复杂任务的拆解效果。