OpenHands事件系统如何为?
摘要:AI Agent 框架探秘:拆解 OpenHands(6) 事件系统 目录AI Agent 框架探秘:拆解 OpenHands(6) 事件系统0x00 概要0x01 EventStream1.1 功能1.2 实现1.3 订阅1.3.1 订阅
AI Agent 框架探秘:拆解 OpenHands(6)--- 事件系统
目录AI Agent 框架探秘:拆解 OpenHands(6)--- 事件系统0x00 概要0x01 EventStream1.1 功能1.2 实现1.3 订阅1.3.1 订阅者1.3.2 分发1.3.3 资源管理0x02 Event2.1 Event2.1.1 定义2.1.2 分类2.2 Action2.2.1 类型2.2.2 流程2.3 Observation2.3.1 类型2.3.2 流程2.3.3 细节2.4 Environment2.5 AgentThinkActionAnthropic Think Tool何时使用“思考”工具最佳实践代码0xFF 参考
0x00 概要
如果说 ReAct 范式是代理的 “大脑思维模式”,那么事件驱动架构就是Agent系统的 “神经网络”,它使用了发布-订阅模式,以去中心化的方式协调各组件高效运作,允许组件之间的松耦合通信。整个系统的核心并非僵硬的同步调用,而是一条承载所有关键活动的 “事件流”,系统中的各类核心操作都会被抽象为标准化的 “事件”。
OpenHands 中,EventStream 负责管理session中的触发的事件,以及事件注册函数的回调项目中注册事件回调函数,比如:
Runtime注册:只接收Action的事件与runtime进行交互
AgentController注册,根据事件更新Agent状态
main:命令行方式执行agent效果评估时使用,接收agent状态变更事件
因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 EventStream
要掌握整个程序的运作,首先要理解事件系统的,它就像是程序中的信息高速公路,各个部分通过它来交换信息。OpenHands 事件系统的核心是EventStream类(事件流系统),这是一个专门处理事件的系统,它的核心作用是维护了事件队列并支持事件的发布和订阅,管理和分发事件。这种事件驱动的架构使OpenHands能够处理异步操作,并支持多Agent之间的协作。
OpenHands 的代码实现相对简单,关键在于理解其工作原理。主要步骤包括:
启动一个循环运行的线程,这个线程负责从事件队列中读取事件,并将它们逐一发送到各个订阅模块的处理队列中。
当模块需要订阅事件时,它会调用一个订阅函数subscribe,这样事件流就会为该模块维护一个线程池,所有发送到该模块的事件都会被相应的回调函数处理。
任何地方需要向事件流中添加事件时,都会调用一个添加事件的函数add_event。
事件流的逻辑虽然简单,但确保了程序各部分之间的独立性和通信的一致性。这里的消息都是事件,分为两种类型:
Action:指需要执行的任务。
Observation:指环境对任务执行结果的回应。
下图展示了 OpenHands 中 Agent 与环境交互的核心机制:Agent 通过执行动作影响环境,环境通过观察结果反馈给 Agent,Agent 基于这些反馈做出下一步决策。:
Agent 决定执行一个动作,通过AgentController和EventStream传到了Runtime。
Runtime 在环境中执行该动作
环境产生一个观察结果作为执行结果
Runtime 捕获这个观察结果并发送到 EventStream(事件流)
EventStream 存储观察结果,然后通过AgentController通知Agent
Agent 获取更新后的历史记录(包括新的观察结果)
Agent 使用观察结果做出下一个决策
循环重复进行
1.1 功能
EventStream 的功能如下:
事件订阅与通知机制。
多订阅者支持:通过EventStreamSubscriber枚举定义了多种订阅者类型。
灵活订阅机制:使用 subscribe 和 unscribe 方法管理订阅关系。
多回调支持:每个订阅者可以注册多个回调函数,通过callback_id进行区分。
事件处理与分发
异步队列处理:使用 queue.Queue和独立线程处理事件队列。
线程池执行:为每个订阅者的回调函数创建独立的线程池,避免阻塞。
顺序分发:按照订阅者ID的排序顺序将事件分发给订阅者。
事件存储与持久化
事件ID管理:为每个事件分配唯一ID并维护递增计数器。
时间戳记录:自动为事件添加时间戳。
文件存储:将事件以JSON格式持久化到文件系统。
缓存页面机制:使用页面缓存提高大量事件的读写性能。
工作流程:
组件通过subscribe方法注册为事件订阅者。
当有事件发生时,通过add_event方法添加到事件流。
