如何通过Nanobot源码学习OpenClaw的架构?

摘要:【OpenClaw】通过Nanobot源码学习架构 (1)总体 目录【OpenClaw】通过Nanobot源码学习架构 (1)总体0x00 概要0x01 OpenClaw 基础1.1 Harness1.2 OpenClaw1.3 OpenC
【OpenClaw】通过Nanobot源码学习架构---(1)总体 目录【OpenClaw】通过Nanobot源码学习架构---(1)总体0x00 概要0x01 OpenClaw 基础1.1 Harness1.2 OpenClaw1.3 OpenClaw 架构1.3.1 OpenClaw 精简架构 11.3.2 OpenClaw 精简架构 21.4 关键组件0x02 Nanobot 基础2.1 核心功能 / 特色2.2 🛠️ 技术栈2.3 📁 主要目录结构2.4🌐 支持的平台0x03 Nanobot 总体架构3.1 架构特点3.2 架构图0x04 Nanobot 消息分发机制详解4.1 消息处理流程图4.2 消息流入和流出完整流程4.2.1 用户消息入站阶段4.2.2 构建入站消息对象4.2.3 发布到消息总线4.2.4 AgentLoop 消费入站消息4.2.5 消息分发处理4.2.6 发布出站消息4.2.7 ChannelManager 分发出站消息4.2.8 渠道发送消息到用户4.3 消息如何分发给不同 Session4.3.1 SessionKey 生成机制4.3.2 SessionManager 持久化机制4.3.3 Session 对象结构4.3.4 获取或创建会话流程4.3.5 会话完全隔离机制4.3.6 /stop 命令的会话级取消4.4 消息如何从 Channel 发送到其他模块4.4.1 CronService 消息发送机制4.4.2 AgentLoop 处理系统消息4.4.3 HeartbeatService 消息发送机制4.4.4 MessageTool 跨渠道消息发送4.4.5 模块间消息分发总结0xFF 参考 0x00 概要 OpenClaw 应该有几十万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。 Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。其核心定位如下,非常适合学习Agent架构: 超轻量:核心代码仅约 4,000 行,比 Clawdbot 的 43 万行代码少 99% 设计哲学:微内核架构 + 极致可读性 功能定位:个人AI助手,支持多平台接入 研究友好:代码清晰易读,易于理解、修改和扩展 快速启动:最小化占用意味着更快的启动速度和更低的资源消耗 即开即用:一键部署即可使用 注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。 0x01 OpenClaw 基础 我们首先来看看 OpenClaw 的基础概念,能让我们在后续利用 Nanobot 学习更加顺利。 OpenClaw 是 Harness,是面向个人与本地场景的、开箱即用的 Agent Harness 框架。它不生产模型,而是把模型 “套上马具”,让模型能稳定、安全、自主地在本地执行真实任务。或者说,OpenClaw 是 Agent 中"不是 AI 的部分",而 Agent 的实际"聪明程度"完全取决于背后接入的语言模型。 1.1 Harness 智能体 = 模型 + 控制壳(Harness)。 Harness 是包裹在 LLM 之外、负责让 Agent 稳定、可控、可落地执行任务的全套基础设施层,模型提供智能,控制壳让智能变得可用。或者说,Harness 是 Agent 在特定领域工作所需要的一切:Harness = (推理·上下文·记忆·状态) + (工具·编排·闭环) + Knowledge + Observation + Action Interfaces + Permissions,即:智能管理层 + 执行调度层 + 领域知识层 + 反馈观测层 + 安全权限层: Tools: 文件读写、Shell、网络、数据库、浏览器 Knowledge: 产品文档、领域资料、API 规范、风格指南 Observation: git diff、错误日志、浏览器状态、传感器数据 Action: CLI 命令、API 调用、UI 交互 Permissions: 沙箱隔离、审批流程、信任边界、安全护栏、权限控制、错误处理 1.2 OpenClaw OpenClaw 的架构与能力完全符合 Harness 的定义,是 Harness 在个人场景的落地: OpenClaw 是 Agent 中"不是 AI 的部分"。它负责记忆管理、任务调度、工具执行、信道路由,而 Agent 的实际"聪明程度"完全取决于背后接入的LLM。 模型做决策。OpenClaw 执行。模型做推理。OpenClaw 提供上下文。模型是驾驶者。OpenClaw 是载具。 Harness 核心能力 OpenClaw 对应实现 模型无关的执行层 不绑定模型,统一接口对接各类 LLM 任务编排与执行闭环 自然语言→拆解→工具调用→反馈→持久化 工具 / 系统调用管控 本地 Shell、文件、浏览器、API 调用网关 上下文与记忆管理 会话记忆、长期偏好、跨会话状态持久化 安全与护栏 权限控制、操作审计、本地数据隔离 可扩展生态 ClawHub 技能市场、插件化扩展 1.3 OpenClaw 架构 OpenClaw 的架构可以概括为一个以Gateway(网关)为核心的控制平面的分布式系统,OpenClaw 的核心不是模型,而是网关(Gateway)。 OpenClaw 本质上是一个围绕集中式控制平面构建的、事件驱动的、会话隔离的单写入状态机,其整体架构是以网关为中心的星型拓扑: Gateway 是流量调度器、唯一事实来源和控制平面,负责接收来自四面八方的各类事件并进行处理路由、排队、状态管理,然后才去调用 LLM。 智能体运行时(agent runtime)是负责“思考与执行”的工作单元,能够执行“轮次操作”:调用大语言模型、使用工具、写入状态,并回复。 下面两个图可以展示其架构。 1.3.1 OpenClaw 精简架构 1 1.3.2 OpenClaw 精简架构 2 1.4 关键组件 OpenClaw 的关键组件如下: Channels(频道 / 用户接入层): OpenClaw 不构建自己的 UI,而是把现有通讯渠道(WhatsApp、Telegram、Slack、Discord)作为交互层。每个渠道有不同的能力和约束,Agent 的行为要适应渠道特性。 OpenClaw 把每个平台(Telegram/Discord/Signal 等)都抽象成同一套 ChannelPlugin 适配器合同。核心系统不需要知道“某个平台的私有细节”,只要按统一接口调用即可。账号生命周期由 ChannelManager 统一管理。 Channels 负责多渠道统一接入,消息格式转换,核心功能是:监听各渠道消息,统一消息格式,用户身份识别,消息路由分发。消息从各个Channel进入时不会被同步阻塞,而是通过回调把事件推送进系统。Channel接入层只负责把“新消息、连接变化、错误”等信号变成统一的异步事件,再交给后续处理链路,从源头保证高并发和低耦合。 Channels是OpenClaw进行社交生态连接最重要的设计,它将AI能力真正注入到了用户的社交与工作动线中。这对产品设计的影响是深远的:AI 功能的入口,将越来越多地迁移到用户已经存在的工作流里,而不是要求用户打开一个新应用。 "让 AI 功能在用户已在的地方运作"将成为设计决策的起点,而不是"让用户来找 AI"。 Gateway(控制平面 / 信息调度中心): OpenClaw 运行着一个持续在线的网关守护进程,负责维持所有连接并协调整个系统,这是坐在用户指令和模型调用之间的控制层。 OpenClaw 能够支持多种界面(CLI、Web UI、桌面应用、移动节点)的一个重要原因是,它将网关视为一个真正的控制平面。Gateway 连接各种聊天平台和控制界面,把收到的消息派发给 Agent 运行器处理。 Gateway 关键设计思想是: 把消息通信、接口层和AI 怎么思考和执行(Agent)彻底分开。它采用调度中心架构,所有消息都经过一个中央塔台进行分发。Gateway 是"总机" → Agent 是"接线员+执行者"。 Gateway 关注: 谁发来的?发到哪? Agent 关注: 什么意思?怎么做?做完回什么? Gateway 核心就是一个HTTP和WebSocket服务。其启动时与注册的Channel(比如Telegram机器人)建立WebSocket连接,随时准备接收消息。 Gateway 是持久运行的控制平面,负责保持与所有消息渠道的长连接、所有组件的调度与交互,管理会话状态、响应客户端请求、处理定时任务,以及Agent调度。同时还要负责监控各Channel和Node的联通状况(health check)。 智能体运行时(agent runtime / 思考核心): 智能体运行时具体负责:多模型统一接口,工具调用和执行,技能系统管理,会话上下文维护,记忆系统(短期+长期); 一旦网关决定了由哪个 agent 和哪个会话来处理输入,智能体运行时会执行这样一个常规循环:1)加载上下文(会话历史 + 工作区上下文);2)调用模型;3)执行工具调用(浏览器、文件系统、shell、节点、插件);4)持久化更新;5)响应(或故意保持沉默)。 OpenClaw并没有从0构造Agent核心,而是使用开源的Pi-Agent框架。Pi Agent构成整个系统执行的大脑思考核心,是处理逻辑和生成回复的核心引擎。系统中所有的运行逻辑都由推理循环架构来控制,也就是AgenticLoop。 AgentLoop 是 OpenClaw 最关键的执行循环。每次收到用户消息,都会进入这个循环。OpenClaw的推理循环是一个事件驱动的架构:主循环 (run.ts) 负责错误处理、重试、profile轮换;尝试层 (attempt.ts) 负责单次LLM调用的完整生命周期;事件订阅 (subscribe.ts) 处理流式响应和工具调用。 Nodes & Apps: 通过将不同设备定义为“节点”,OpenClaw 实现了跨设备的硬件控制; Channel是基于不同IM的开放平台能力,让OpenClaw与不同的IM的开放平台服务通信。可以说Channel是针对不同的IM程序的适配器。相对的,Node则是针对iOS、Android、macOS这种操作系统的适配器。 Node是一种实际运行在iOS、Android和macOS上的程序,并与运行在用户电脑上的OpenClaw主程序进行远程通信。用户将自己的设备的权限开放给Node,这样OpenClaw就可以通过Node来远程控制用户的设备,如执行任务,打开摄像头,屏幕截图,获取地理位置等。 0x02 Nanobot 基础 Nanobot遵循两条核心原则: "当有疑问时,留白" :拒绝功能膨胀、保持核心精简、按需扩展 "代理逻辑不应埋藏在层层抽象之下":代码可读性优先、单一职责原则、模块边界清晰 因此,Nanobot的核心特色如下: 2.1 核心功能 / 特色 轻量级架构:无冗余设计,仅保留 Agent 核心能力。 异步非阻塞:全异步设计,支持并发任务处理,避免单任务阻塞整个引擎 多通道适配:支持 CLI、系统消息、自定义通道等多场景消息处理 消息驱动的 Agent 主循环:接收消息 → 构建上下文 → 调用 LLM → 执行工具 → 返回结果 完整的工具生态:支持文件操作、命令执行、网页搜索 / 爬取、子 Agent 生成、定时任务、消息发送等核心工具 可扩展:插件化工具注册机制,支持自定义工具 / MCP 扩展,适配不同场景 会话管理与记忆 Consolidation:自动管理会话历史,自动 Consolidate 长会话记忆,支持大窗口记忆压缩,避免上下文溢出,平衡上下文长度与记忆完整性 安全隔离:可限制 Agent 操作范围到指定工作区,防止越权访问 可中断性:支持 /stop 指令终止当前任务,保证 Agent 响应性 2.2 🛠️ 技术栈 以下是 Nanobot 的技术栈。 组件 技术/库 核心语言 Python ≥ 3.11 CLI 工具 Typer LLM 接入 LiteLLM (支持多个提供商) 配置管理 Pydantic 2.x 日志 Loguru WebSocket websockets, websocket-client HTTP 客户端 httpx OAuth oauth-cli-kit Telegram python-telegram-bot Discord Discord.py (通过 Slack SDK) 飞书 lark-oapi 钉钉 dingtalk-stream Slack slack-sdk QQ 机器人 qq-botpy 定时任务 croniter WhatsApp Bridge Node.js + TypeScript + Baileys MCP 支持 mcp (Model Context Protocol) Markdown 渲染 rich 代码规范 Ruff 2.3 📁 主要目录结构 以下是 Nanobot 的主要目录结构。 nanobot-main/ ├── nanobot/ # 核心包目录 │ ├── agent/ # 🧠 核心代理逻辑 │ │ ├── loop.py # 代理循环 (LLM ↔ 工具执行) │ │ ├── context.py # 提示构建器 │ │ ├── memory.py # 持久化记忆 │ │ ├── skills.py # 技能加载器 │ │ ├── subagent.py # 后台任务执行 │ │ └── tools/ # 内置工具 │ ├── skills/ # 🎯 捆绑技能 (github, weather, tmux...) │ │ ├── clawhub/ │ │ ├── cron/ │ │ ├── github/ │ │ ├── memory/ │ │ ├── skill-creator/ │ │ ├── summarize/ │ │ ├── tmux/ │ │ └── weather/ │ ├── channels/ # 📱 聊天渠道集成 │ ├── bus/ # 🚌 消息路由 │ ├── cron/ # ⏰ 定时任务 │ ├── heartbeat/ # 💓 主动唤醒 │ ├── providers/ # 🤖 LLM 提供商配置 │ ├── session/ # 💬 会话管理 │ ├── config/ # ⚙️ 配置处理 │ └── cli/ # 🖥️ 命令行界面 ├── bridge/ # Node.js WhatsApp 桥接器 │ ├── package.json │ └── tsconfig.json ├── tests/ # 测试目录 ├── case/ # 案例展示 (GIF) ├── pyproject.toml # Python 项目配置 ├── Dockerfile # Docker 镜像配置 ├── docker-compose.yml # Docker Compose 配置 ├── README.md # 项目文档 └── SECURITY.md # 安全文档 2.4🌐 支持的平台 以下是 Nanobot 支持的平台。 聊天渠道:Telegram、Discord、WhatsApp、飞书、Mochat、钉钉、Slack、Email、QQ、Matrix LLM 提供商:OpenRouter、Anthropic (Claude)、OpenAI (GPT)、DeepSeek、Groq、Gemini、MiniMax、AiHubMix、SiliconFlow、VolcEngine、通义千问 (Dashscope)、Moonshot (Kimi)、智谱 (Zhipu)、vLLM、OpenAI Codex、GitHub Copilot 0x03 Nanobot 总体架构 3.1 架构特点 Nanobot 的架构特点如下: 消息驱动架构:通过 MessageBus 实现渠道与 Agent 的解耦 核心引擎:AgentLoop是核心处理引擎,负责 LLM 与工具执行的循环 多渠道支持:通过 ChannelManager 统一管理 10+ 种聊天平台 可扩展性: 工具通过 ToolRegistry 注册 LLM 提供商通过 Provider Registry 统一管理 支持技能插件系统和 MCP 协议 持久化:SessionManager和 MemoryStore 负责会话和记忆的持久化 后台任务:CronService 和 Heartbeat提供定时任务和主动唤醒功能 3.2 架构图 Nanobot 的架构图如下: 对应各组件职责为: Gateway: 系统入口,协调各组件启动和运行 启动MessageBus、AgentLoop、ChannelManager 协调 CronService 和 HeartbeatService Channel(如QQchannel): 接收外部消息 将消息发布到.MessageBus 发送响应消息回外部平台 AgentLoop: 从MessageBus消费消息 执行 AI 推理和工具调用 将响应发布到 MessageBu MessageBus: 解耦 Channel 和 AgentLoop 提供异步消息队列机制 对应具体代码逻辑为 0x04 Nanobot 消息分发机制详解 我们梳理下Nanobot 消息分发机制,后续文章会针对各个环节进行解析学习。 4.1 消息处理流程图 nanobot 采用 异步消息总线 架构,实现消息的解耦分发: MessageBus:异步队列,连接渠道和 Agent InboundQueue:入站消息(渠道 → Agent) OutboundQueue:出站消息(Agent → 渠道) SessionKey:会话标识符,用于区分不同用户/会话 nanobot 的消息处理流程图如下: Gateway启动所有服务:启动 AgentLoop、所有 Channels、CronService 和 HeartbeatService 协调组件间通信:通过MessageBus 实现各组件间的解耦 维护整体状态:管理整个系统生命周期 Channels 与具体平台(QQ、Telegram等)对接,将消息标准化后发送到MessageBus MessageBus 解耦Channels和Agent,实现消息传递 AgentLoop 统一处理来自所有渠道的消息,执行核心逻辑 4.2 消息流入和流出完整流程 以 QQ 用户发送消息为例的完整流程如下: 4.2.1 用户消息入站阶段 当 QQ 用户向 nanobot 发送消息(如"帮我分析这段代码")时,消息首先被 QQ 平台的服务器接收,然后通过 WebSocket 连接传递给 nanobot 的 QQ 机器人实例。 QQChannel 类通过继承 botpy.Client 并实现事件处理方法来接收消息: on_c2c_message_create() - 处理 C2C(用户对机器人)消息 on_direct_message_create() - 处理直接消息 当这两个事件被触发时,QQChannel 会调用内部方法 _on_message()。这个方法: 首先,进行消息去重处理,使用一个最大长度为 1000 的双端队列 _processed_ids 来记录已处理的消息 ID,避免重复处理相同消息。然后提取用户信息,包括发送者 ID(author.id 或 author.user_openid)和消息内容。如果内容为空则直接返回。 接着,QQChannel 调用基类 BaseChannel 的 _handle_message() 方法进行权限检查和。这个方法首先调用 is_allowed(sender_id) 检查用户是否在白名单中。白名单通过配置文件的 allowFrom 字段设置,如果未配置白名单则允许所有用户访问。如果用户在白名单外,会记录警告日志并返回,拒绝处理此消息。 4.2.2 构建入站消息对象 通过权限检查后,BaseChannel 会构建一个 InboundMessage 数据类对象: channel: 渠道名称,如 "qq" sender_id: 发送者 ID,如 "123456789" chat_id: 聊天 ID,QQ 私聊时等于 sender_id content: 消息文本内容 timestamp: 消息时间戳(自动生成) media: 媒体文件列表(如图片 URL),默认为空 metadata: 渠道特定元数据,如 QQ 的 message_id session_key_override: 会话键覆盖,用于线程作用域会话 InboundMessage 有一个 session_key 属性,自动生成会话键:如果设置了 session_key_override 则使用它,否则使用 f"{channel}:{chat_id}" 格式。这样 QQ 用户的会话键就是 "qq:123456789"。 4.2.3 发布到消息总线 BaseChannel 调用 await self.bus.publish_inbound(msg) 将入站消息发布到消息总线。 MessageBus 维护两个异步队列: inbound: asyncio.Queue[InboundMessage] - 入站消息队列(渠道 → Agent) outbound: asyncio.Queue[OutboundMessage] - 出站消息队列(Agent → 渠道) publish_inbound() 方法使用 await self.inbound.put(msg) 将消息放入入站队列。这是一个非阻塞操作,如果队列满了会自动等待。 4.2.4 AgentLoop 消费入站消息 AgentLoop 的 run() 方法是主循环,持续从消息总线消费入站消息: while self._running: try: msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: continue consume_inbound() 使用 await self.inbound.get() 从队列获取消息,这是一个阻塞操作,会等待直到有消息可用。这里设置了 1 秒超时,用于定期检查 _running 状态以便优雅停止。 获取到消息后,AgentLoop 会检查是否是特殊命令 /stop,如果是则调用 _handle_stop(msg) 取消该会话的所有活跃任务和子代理。否则,创建一个异步任务来处理这条消息: task = asyncio.create_task(self._dispatch(msg)) self._active_tasks.setdefault(msg.session_key, []).append(task) AgentLoop 将任务添加到 _active_tasks 字典中,键是 session_key,值是该会话的任务列表。这样 /stop 命令可以取消特定会话的所有任务。 4.2.5 消息分发处理 _dispatch(msg) 方法是消息分发的核心,它使用全局处理锁 _processing_lock 确保消息串行化处理,避免并发问题: async with self._processing_lock: try: response = await self._process_message(msg) if response is not None: await self.bus.publish_outbound(response) _process_message(msg) 是完整的 Agent 处理流程,包括获取或创建会话、构建上下文、运行 Agent 迭代循环(LLM 与工具交互)、保存会话等。最终返回一个 OutboundMessage 对象。 4.2.6 发布出站消息 _process_message() 返回的 OutboundMessage 包含: channel: 目标渠道名,如 "qq" chat_id: 目标聊天 ID,如 "123456789" content: Agent 的响应文本 reply_to: 可选的回复消息 ID media: 可选的媒体文件列表 metadata: 可选的元数据,如进度标记 AgentLoop 调用 await self.bus.publish_outbound(response) 将响应发布到出站队列。publish_outbound() 使用 await self.outbound.put(msg) 将消息放入出站队列。 4.2.7 ChannelManager 分发出站消息 ChannelManager 运行一个独立的协程 _dispatch_outbound() 来分发出站消息: while True: try: msg = await asyncio.wait_for(self.bus.consume_outbound(), timeout=1.0) # 过滤进度消息(根据配置) if msg.metadata.get("_progress"): if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints: continue if not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress: continue # 获取目标渠道 channel = self.channels.get(msg.channel) if channel: await channel.send(msg) 这个循环持续从出站队列消费消息,支持根据配置过滤工具提示和进度消息。然后通过 self.channels.get(msg.channel) 获取目标渠道实例。channels 是一个字典,存储了所有启用的渠道,如 {"qq": QQChannel实例, "telegram": TelegramChannel实例}。 4.2.8 渠道发送消息到用户 获取到目标渠道实例后,调用其 send(msg) 方法。对于 QQChannel 具体如下: async def send(self, msg: OutboundMessage) -> None: if not self._client: return try: await self._client.api.post_c2c_message( openid=msg.chat_id, msg_type=0, content=msg.content, ) except Exception as e: logger.error("Error sending QQ message: {}", e) QQChannel 使用 botpy SDK 的 API post_c2c_message() 发送 C2C 私聊消息,msg_type=0 表示文本消息。 最终,QQ 用户在客户端收到 nanobot 的响应消息,完成了完整的消息流入和流出流程。 4.3 消息如何分发给不同 Session Session 用以区分不同用户/会话的机制: session_key = "{channel}:{chat_id}" - 唯一标识会话 SessionManager 用字典缓存 Session 对象 ({session_key: Session}) 每个 Session 独立存储到 session_key.jsonl 文件 不同会话的消息历史完全隔离, 互不影响 /stop 命令只取消当前 session_key 的任务 4.3.1 SessionKey 生成机制 nanobot 通过 session_key 来区分不同的会话。每个 InboundMessage 都有会话键属性: @property def session_key(self) -> str: return self.session_key_override or f"{self.channel}:{self.chat_id}" 默认情况下,会话键使用 {渠道名}:{聊天ID} 格式生成: Telegram 用户:"telegram:123456789" QQ 用户:"qq:987654321" Discord 群组:"discord:456789123" CLI 会话:"cli:direct" 如果设置了 session_key_override,则使用覆盖值。这用于特殊场景,如线程作用域会话、系统任务专用会话等。 4.3.2 SessionManager 持久化机制 SessionManager 负责会话的持久化管理: def __init__(self, workspace: Path): self.workspace = workspace self.sessions_dir = ensure_dir(self.workspace / "sessions") self._cache: dict[str, Session] = {} _cache 是内存缓存字典,键是 session_key,值是 Session 对象。这避免频繁读取磁盘。 会话文件存储在 workspace/sessions/ 目录,每个会话一个 JSONL 文件,文件名是安全的会话键(将 : 替换为 _)。例如:telegram_123456789.jsonl。 4.3.3 Session 对象结构 Session 数据类存储会话的所有消息: @dataclass class Session: key: str # 会话键 messages: list[dict[str, Any]] # 消息列表 created_at: datetime # 创建时间 updated_at: datetime # 更新时间 metadata: dict[str, Any] # 元数据 last_consolidated: int # 已归档的消息数 messages 是 append-only 的消息列表,存储完整的对话历史。last_consolidated 字段记录已经归档到 MEMORY.md 的消息数量,get_history() 方法只返回 messages[last_consolidated:] 的未归档消息,这样 LLM 不会看到重复的历史。 4.3.4 获取或创建会话流程 当 AgentLoop 处理消息时,调用 session = sessions.get_or_create(msg.session_key): def get_or_create(self, key: str) -> Session: if key in self._cache: return self._cache[key] session = self._load(key) if session is None: session = Session(key=key) self._cache[key] = session return session 首先检查内存缓存,如果存在直接返回。否则尝试从磁盘加载会话文件,如果文件不存在或加载失败则创建新会话。最后将会话加入缓存并返回。 4.3.5 会话完全隔离机制 不同用户/会话的消息通过 session_key 完全隔离: 不同 session_key 映射到不同的 Session 对象 每个 Session 独立存储到 {safe_key}.jsonl 文件 Session.get_history() 只返回该会话的消息历史 AgentLoop 处理每条消息时使用对应的 Session 子代理可继承主会话键或使用独立会话键 这种设计确保用户 A 的对话历史不会影响用户 B 的会话,Telegram 群组的消息不会泄露给 Discord 私聊。 4.3.6 /stop 命令的会话级取消 /stop 命令只取消特定 session_key 的任务: async def _handle_stop(self, msg: InboundMessage) -> None: tasks = self._active_tasks.pop(msg.session_key, []) cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) 这确保停止一个会话不会影响其他会话的运行任务。 4.4 消息如何从 Channel 发送到其他模块 4.4.1 CronService 消息发送机制 CronService 是定时任务服务,在 Gateway 启动时通过回调与消息系统连接: async def on_cron_job(job: CronJob) -> str | None: channel, chat_id = _pick_cron_target(job) if job.payload.deliver: await bus.publish_inbound(InboundMessage( channel="system", # 使用 system 标识,不是具体渠道 sender_id="cron", chat_id=f"{channel}:{chat_id}", # 目标渠道和 ID(特殊格式) content=job.payload.message, session_key_override=f"cron{job.id}", # 专用会话键 )) return "Job delivered" else: # 不发送到渠道,直接调用 Agent 处理 return await agent.process_direct( content=job.payload.message, session_key=f"cron{job.id}", ) 当定时任务到期时,CronService 调用 Gateway 设置的 on_job 回调。如果任务配置了 deliver=true,回调通过 MessageBus 发布一个 system 消息,这个消息的特点是: channel="system":标识这是系统消息,不是直接来自聊天平台 sender_id="cron":标识消息来源是 cron 定时任务 chat_id=f"{channel}:{chat_id}":特殊格式,包含实际的目标渠道和聊天 ID session_key_override=f"cron{job.id}":使用专用会话键 这个消息进入 inbound 队列后被 AgentLoop 接收。 4.4.2 AgentLoop 处理系统消息 AgentLoop 的 _process_message() 方法会识别并处理 system 消息: if msg.channel == "system": if ":" in msg.chat_id: target_channel, target_chat_id = msg.chat_id.split(":", 1) else: target_channel = msg.channel target_chat_id = msg.chat_id 对于 system 消息,从 chat_id 解析出实际的目标渠道和聊天 ID。例如,chat_id="telegram:123456789" 会被解析为 target_channel="telegram" 和 target_chat_id="123456789"。 AgentLoop 使用解析的目标渠道和聊天 ID 构建出站消息,确保定时任务的响应发送到正确的用户。 4.4.3 HeartbeatService 消息发送机制 HeartbeatService 是心跳服务,用于定期检查和执行 HEARTBEAT.md 中的任务。它有两个回调: on_execute 回调:执行心跳任务 async def on_heartbeat_execute(tasks: str) -> str: channel, chat_id = _pick_heartbeat_target() return await agent.process_direct( content=tasks, session_key="heartbeat", channel=channel, chat_id=chat_id, on_progress=_silent, ) 这个回调直接调用 agent.process_direct(),不经过 MessageBus。process_direct() 内部处理消息但不自动发送结果到渠道。 on_notify 回调:通知用户心跳任务结果 async def on_heartbeat_notify(response: str) -> None: channel, chat_id = _pick_heartbeat_target() if channel == "cli": return # CLI 模式无法发送 await bus.publish_outbound(OutboundMessage( channel=channel, chat_id=chat_id, content=response, )) 这个回调在任务完成后通过 MessageBus 发布出站消息,ChannelManager 会将其分发给目标渠道。 4.4.4 MessageTool 跨渠道消息发送 MessageTool 允许 LLM 主动向任意启用的渠道发送消息: async def execute(self, content, channel=None, chat_id=None, **kwargs) -> str: channel = channel or self._default_channel chat_id = chat_id or self._default_chat_id msg = OutboundMessage( channel=channel, chat_id=chat_id, content=content, ) if self._send_callback: await self._send_callback(msg) # -> bus.publish_outbound(msg) AgentLoop 在处理每条消息时设置 MessageTool 的上下文: self.tools.message_tool.set_context(msg.channel, msg.chat_id) 这样 LLM 可以在响应 Telegram 用户时主动发送消息到 Discord 群组,实现跨渠道通知。 4.4.5 模块间消息分发总结 CronService:通过 on_job 回调 → MessageBus.publish_inbound() → AgentLoop 处理 → MessageBus.publish_outbound() → ChannelManager 分发 HeartbeatService:通过 on_execute 回调 → AgentLoop.process_direct()(内部处理),然后通过 on_notify 回调 → MessageBus.publish_outbound() → ChannelManager 分发 MessageTool:LLM 调用工具 → MessageTool.execute() → _send_callback(msg) → MessageBus.publish_outbound() → ChannelManager 分发到指定渠道 所有模块间的通信都通过 MessageBus 的异步队列实现,确保了系统的解耦和可扩展性。 0xFF 参考 3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析 Kimi Agent产品很厉害,然后呢? OpenClaw真完整解说:架构与智能体内核 https://github.com/shareAI-lab/learn-claude-code 深入理解OpenClaw技术架构与实现原理(上) 深度解析:一张图拆解OpenClaw的Agent核心设计 OpenClaw小龙虾架构全面解析 OpenClaw架构-Agent Runtime 运行时深度拆解 OpenClaw 架构详解 · 第一部分:控制平面、会话管理与事件循环 从回答问题到替你做事,AI Agent 为什么突然火了?