第十三章流水线引擎深度解析,有哪些关键点值得探究?

摘要:layout: default title: "第十三章:流水线引擎深度解析" 第十三章:流水线引擎深度解析 GeoPipeAgent 的流水线引擎由 engine 目录下 5 个模块组
第十三章:流水线引擎深度解析 GeoPipeAgent 的流水线引擎由 engine/ 目录下 5 个模块组成,本章深入解析每个模块的内部机制,帮助开发者理解框架工作原理、调试问题和进行二次开发。 13.1 引擎整体流程 执行一个 YAML 流水线涉及四个阶段: YAML 文件 ↓ parser.py PipelineDefinition(内存模型) ↓ validator.py 校验通过(或抛出 PipelineValidationError) ↓ executor.py + context.py 执行所有步骤,收集结果 ↓ reporter.py JSON 执行报告 13.2 parser.py:YAML 解析 职责:将 YAML 文件(或字符串)解析为 PipelineDefinition 数据模型。 核心函数 def parse_yaml(source: str | Path) -> PipelineDefinition: raw = _load_yaml(source) # 读取文件或字符串 → dict return _build_pipeline(raw) # 构建 PipelineDefinition 关键解析规则 顶层 pipeline: 键必须存在:若不存在,抛出 PipelineParseError("Missing 'pipeline' key at the top level.") steps 必须是非空列表:pipeline.steps 为空时报错 每个步骤必须有 id 和 use:缺少时报错并指明步骤索引 on_error 默认值为 "fail":解析时自动填充 解析的数据结构 @dataclass class PipelineDefinition: name: str steps: list[StepDefinition] description: str = "" crs: str | None = None variables: dict = {} outputs: dict = {} @dataclass class StepDefinition: id: str use: str params: dict = {} when: str | None = None on_error: str = "fail" # "fail" | "skip" | "retry" backend: str | None = None 13.3 validator.py:流水线校验 职责:在执行之前检查流水线的语义正确性,尽早发现问题。 校验规则 步骤 ID 格式:匹配 [a-z0-9_-],点号(.)不允许 步骤 ID 唯一性:同一流水线不能有重复 ID 步骤类型存在:use 指定的步骤必须已注册(通过 registry.has()) 参数引用合法性:$step-id 引用的步骤必须在当前步骤之前定义;${var} 引用的变量必须在 variables 中定义 on_error 合法值:必须是 fail/skip/retry 之一 outputs 中的引用:引用的步骤必须存在 区分校验与执行时错误 错误类型 触发时机 异常类型 顶层 pipeline: 缺失 解析时 PipelineParseError 步骤 ID 不合法 校验时 PipelineValidationError 引用未定义步骤 校验时 PipelineValidationError 步骤执行异常 执行时 StepExecutionError 变量解析失败 执行时 VariableResolutionError geopipe-agent validate 命令只运行解析和校验两个阶段,不执行步骤。 13.4 context.py:上下文与引用解析 职责:维护流水线执行状态,提供变量替换和步骤引用解析。 PipelineContext 类 class PipelineContext: variables: dict # 流水线变量(含 --var 覆盖) _step_outputs: dict[str, StepResult] # 已完成步骤的输出 def set_output(step_id, result) # 步骤完成后存储结果 def get_output(step_id) # 获取步骤结果 def resolve(value) # 解析值(变量替换/步骤引用) def resolve_params(params) # 解析整个 params 字典 引用解析算法 resolve(value) 的处理逻辑: def resolve(self, value): if not isinstance(value, str): return value # 非字符串直接返回 if value.startswith("$") and not value.startswith("${"): return self._resolve_step_ref(value) # 步骤引用 if "${" in value: return self._substitute_variables(value) # 变量替换 return value # 普通字符串 def _resolve_step_ref(self, ref): ref_body = ref[1:] if "." not in ref_body: step_id, attr = ref_body, "output" # $step → $step.output else: step_id, attr = ref_body.split(".", 1) result = self._step_outputs[step_id] return getattr(result, attr) # 访问 StepResult 属性 getattr(result, attr) 的工作原理:StepResult 实现了 __getattr__,查找顺序为: 内置属性(output、stats、metadata、issues) stats 字典键(如 feature_count、issues_count) metadata 字典键(如 issues_gdf、transform) 以上都找不到 → 抛出 AttributeError,转为 VariableResolutionError StepContext 类 每个步骤执行时接收 StepContext,提供参数访问: class StepContext: def param(self, name, default=None) # 获取指定参数值 def input(self, name="input") # input("input") 的快捷方式 @property params # 完整参数字典 backend # 后端对象(可能为 None) 13.5 executor.py:步骤执行调度 职责:按顺序执行所有步骤,处理条件跳过、错误策略、重试逻辑。 主执行流程 def execute_pipeline(pipeline) -> dict: context = PipelineContext(variables=pipeline.variables) backend_manager = BackendManager.default() for step_def in pipeline.steps: # 1. 条件判断 if step_def.when and not _evaluate_condition(step_def.when, context): context.set_output(step_def.id, StepResult()) # → 记录 status=skipped,继续 # 2. 执行步骤(含重试) result = _execute_step(step_def, context, backend_manager) context.set_output(step_def.id, result) # 3. 解析 outputs resolved_outputs = {k: context.resolve(v) for k, v in pipeline.outputs.items()} # 4. 生成 JSON 报告 return build_report(...) when 条件安全求值 _evaluate_condition 函数使用 AST 白名单确保安全: 用正则替换 ${var} 和 $step.attr 占位符为 Python 字面值 解析为 AST(ast.parse(mode="eval")) 通过 validate_condition_ast 检查 AST 节点类型(白名单) 使用空 __builtins__ 求值:eval(tree, {"__builtins__": {}}, {}) 若求值失败,返回 False(步骤被跳过),不中断流水线。 重试机制 def _with_retry(fn, max_attempts, step_id): for attempt in range(1, max_attempts + 1): try: return fn() except StepExecutionError: raise # 已封装的执行错误不重试 except Exception as e: if attempt < max_attempts: time.sleep(0.5 * attempt) # 递增等待:0.5s, 1s, 1.5s raise last_error 智能错误建议 当步骤失败时,_suggest_fix 函数分析错误信息并提供建议: patterns = [ ("crs" in msg and "degree" in msg → 建议添加 reproject 步骤), ("file not found" → 检查文件路径), ("invalid geometry" → 添加 qc.geometry_validity), ("keyerror" → 检查字段名是否存在), ... ] 这些建议会出现在 JSON 报告的 suggestion 字段中,帮助 AI 或用户快速修复。 13.6 reporter.py:JSON 报告生成 职责:将执行结果汇总为标准 JSON 报告结构。 报告格式 { "pipeline": "流水线名称", "status": "success", // "success" | "error" "duration": 1.234, // 总耗时(秒) "steps": [ { "id": "load-roads", "step": "io.read_vector", "status": "success", // "success" | "skipped" | "error" "duration": 0.089, "output_summary": { // StepResult.summary() 的输出 "feature_count": 100, "crs": "EPSG:4326", "geometry_types": ["LineString"] }, // QC 步骤还会有: "issues_count": 5, "issues": [{"rule_id": "...", "severity": "error", "message": "..."}] }, { "id": "optional-step", "step": "vector.simplify", "status": "skipped", "skip_reason": "condition not met: ${enable_simplify} == true" }, { "id": "failed-step", "step": "vector.clip", "status": "error", "error": "CRS mismatch: ...", "suggestion": "Add a vector.reproject step..." } ], "outputs": { "result": "output/buffer.geojson", "feature_count": 100 } } 13.7 Python API:在代码中使用引擎 除 CLI 外,也可直接在 Python 代码中调用引擎 API: import geopipe_agent # 触发内置步骤的自动注册 from geopipe_agent.engine.parser import parse_yaml from geopipe_agent.engine.validator import validate_pipeline from geopipe_agent.engine.executor import execute_pipeline from geopipe_agent.errors import GeopipeAgentError # 方式一:从文件加载 pipeline = parse_yaml("my_pipeline.yaml") # 方式二:从字符串加载 yaml_str = """ pipeline: name: "inline pipeline" steps: - id: load use: io.read_vector params: { path: "data.shp" } """ pipeline = parse_yaml(yaml_str) # 运行时覆盖变量 pipeline.variables["input_path"] = "data/roads.shp" pipeline.variables["buffer_dist"] = 500 # 校验(返回警告列表,或抛出异常) warnings = validate_pipeline(pipeline) # 执行(返回 JSON 报告字典) try: report = execute_pipeline(pipeline) print(f"Status: {report['status']}") print(f"Duration: {report['duration']}s") except GeopipeAgentError as e: print(f"Error: {e}") 13.8 本章小结 本章深入解析了流水线引擎的 5 个模块: parser.py:YAML → PipelineDefinition,强制要求顶层 pipeline: 键 validator.py:执行前语义校验,检查 ID 格式、步骤存在性、引用合法性 context.py:维护执行上下文,解析 $step.attr 和 ${var} 引用 executor.py:按序执行步骤,处理条件跳过、重试、错误策略,生成建议 reporter.py:汇总生成标准化 JSON 报告 导航:← 第十二章:数据质检步骤 | 第十四章:多后端系统 →