第十三章流水线引擎深度解析,有哪些关键点值得探究?
摘要:layout: default title: "第十三章:流水线引擎深度解析" 第十三章:流水线引擎深度解析 GeoPipeAgent 的流水线引擎由 engine 目录下 5 个模块组
第十三章:流水线引擎深度解析
GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。
13.1 引擎整体流程
执行一个 YAML 流水线涉及四个阶段:
YAML 文件
↓ parser.py
PipelineDefinition(内存模型)
↓ validator.py
校验通过(或抛出 PipelineValidationError)
↓ executor.py + context.py
执行所有步骤,收集结果
↓ reporter.py
JSON 执行报告
13.2 parser.py:YAML 解析
职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。
核心函数
def parse_yaml(source: str | Path) -> PipelineDefinition:
raw = _load_yaml(source) # 读取文件或字符串 → dict
return _build_pipeline(raw) # 构建 PipelineDefinition
关键解析规则
顶层 pipeline: 键必须存在:若不存在,抛出 PipelineParseError("Missing 'pipeline' key at the top level.")
steps 必须是非空列表:pipeline.steps 为空时报错
每个步骤必须有 id 和 use:缺少时报错并指明步骤索引
on_error 默认值为 "fail":解析时自动填充
解析的数据结构
@dataclass
class PipelineDefinition:
name: str
steps: list[StepDefinition]
description: str = ""
crs: str | None = None
variables: dict = {}
outputs: dict = {}
@dataclass
class StepDefinition:
id: str
use: str
params: dict = {}
when: str | None = None
on_error: str = "fail" # "fail" | "skip" | "retry"
backend: str | None = None
13.3 validator.py:流水线校验
职责:在执行之前检查流水线的语义正确性,尽早发现问题。
校验规则
步骤 ID 格式:匹配 [a-z0-9_-],点号(.)不允许
步骤 ID 唯一性:同一流水线不能有重复 ID
步骤类型存在:use 指定的步骤必须已注册(通过 registry.has())
参数引用合法性:$step-id 引用的步骤必须在当前步骤之前定义;${var} 引用的变量必须在 variables 中定义
on_error 合法值:必须是 fail/skip/retry 之一
outputs 中的引用:引用的步骤必须存在
区分校验与执行时错误
错误类型
触发时机
异常类型
顶层 pipeline: 缺失
解析时
PipelineParseError
步骤 ID 不合法
校验时
PipelineValidationError
引用未定义步骤
校验时
PipelineValidationError
步骤执行异常
执行时
StepExecutionError
变量解析失败
执行时
VariableResolutionError
geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。
13.4 context.py:上下文与引用解析
职责:维护流水线执行状态,提供变量替换和步骤引用解析。
