如何通过自定义步骤实现扩展开发?

摘要:layout: default title: "第十八章:自定义步骤与扩展开发" 第十八章:自定义步骤与扩展开发 GeoPipeAgent 的步骤系统完全开放,通过 @step 装饰器可以
第十八章:自定义步骤与扩展开发 GeoPipeAgent 的步骤系统完全开放,通过 @step 装饰器可以轻松注册自定义步骤,扩展框架的 GIS 分析能力。本章介绍自定义步骤的开发、注册和使用方法。 18.1 步骤注册系统 @step 装饰器 所有步骤(内置和自定义)都通过 @step 装饰器注册到全局注册表(registry._steps): from geopipe_agent.steps.registry import step from geopipe_agent.engine.context import StepContext from geopipe_agent.models.result import StepResult @step( id="my_category.my_action", # 步骤注册 ID(必填) name="我的自定义步骤", # 显示名称 description="步骤功能描述", # 步骤描述 category="my_category", # 类别(影响 list-steps 分组) params={ # 参数规范字典 "input": { "type": "geodataframe", "required": True, "description": "输入矢量数据", }, "my_param": { "type": "number", "required": False, "default": 100, "description": "自定义参数", }, }, outputs={ # 输出规范 "output": {"type": "geodataframe", "description": "处理结果"}, }, backends=["native_python"], # 支持的后端(可省略) examples=[ # 示例(用于 Skill 文档) { "description": "基础用法", "params": {"input": "$load.output", "my_param": 200}, }, ], ) def my_custom_step(ctx: StepContext) -> StepResult: """步骤实现函数,接收 StepContext,返回 StepResult""" gdf = ctx.input("input") # 获取 input 参数(GeoDataFrame) my_param = ctx.param("my_param", 100) # 获取可选参数 # 执行处理逻辑 result_gdf = gdf.copy() result_gdf["my_field"] = my_param return StepResult( output=result_gdf, stats={"feature_count": len(result_gdf)}, ) 注册 ID 规范 步骤 ID 使用点号分隔的 类别.动作 格式: 格式 示例 说明 category.action vector.buffer 推荐格式 org.category.action acme.vector.polygon_merge 带组织前缀,避免冲突 custom.action custom.fix_topology 通用自定义步骤 18.2 StepContext 用法 步骤函数通过 StepContext 获取所有已解析(变量和引用已替换)的参数: def my_step(ctx: StepContext) -> StepResult: # 获取参数(带默认值) gdf = ctx.param("input") # 等价于 ctx.input("input") distance = ctx.param("distance") # 必填参数,None 如不提供 cap = ctx.param("cap_style", "round") # 带默认值 # 简写:ctx.input() 等价于 ctx.param("input") gdf = ctx.input("input") # 访问后端(仅当 backends 列表非空时) backend = ctx.backend # GeoBackend 对象或 None # 访问完整参数字典 all_params = ctx.params 18.3 完整示例:开发空间连接步骤 以下是一个完整的自定义步骤实现:空间连接(Spatial Join)。 # my_steps/spatial_join.py from __future__ import annotations from geopipe_agent.steps.registry import step from geopipe_agent.engine.context import StepContext from geopipe_agent.models.result import StepResult @step( id="custom.spatial_join", name="空间连接", description="将右表的属性通过空间关系连接到左表", category="custom", params={ "left": { "type": "geodataframe", "required": True, "description": "左表(要素保留数量与左表一致)", }, "right": { "type": "geodataframe", "required": True, "description": "右表(属性来源)", }, "how": { "type": "string", "required": False, "default": "left", "enum": ["left", "right", "inner"], "description": "连接方式", }, "op": { "type": "string", "required": False, "default": "intersects", "enum": ["intersects", "contains", "within"], "description": "空间关系谓词", }, "suffix_left": { "type": "string", "required": False, "default": "_left", "description": "左表重名字段后缀", }, "suffix_right": { "type": "string", "required": False, "default": "_right", "description": "右表重名字段后缀", }, }, outputs={ "output": {"type": "geodataframe", "description": "空间连接结果"}, }, examples=[ { "description": "点-面空间连接(将行政区属性连接到 POI 点)", "params": { "left": "$load-poi.output", "right": "$load-districts.output", "how": "left", "op": "within", }, }, ], ) def custom_spatial_join(ctx: StepContext) -> StepResult: import geopandas as gpd left_gdf = ctx.param("left") right_gdf = ctx.param("right") how = ctx.param("how", "left") op = ctx.param("op", "intersects") suffix_left = ctx.param("suffix_left", "_left") suffix_right = ctx.param("suffix_right", "_right") # 执行空间连接 result_gdf = gpd.sjoin( left_gdf, right_gdf, how=how, predicate=op, lsuffix=suffix_left.lstrip("_"), rsuffix=suffix_right.lstrip("_"), ) stats = { "feature_count": len(result_gdf), "left_count": len(left_gdf), "right_count": len(right_gdf), "join_method": how, "spatial_predicate": op, } return StepResult(output=result_gdf, stats=stats) 18.4 自动发现机制 GeoPipeAgent 使用 pkgutil.walk_packages 在包初始化时自动发现并加载内置步骤: # src/geopipe_agent/__init__.py import pkgutil import importlib import geopipe_agent.steps as steps_pkg for finder, name, ispkg in pkgutil.walk_packages( steps_pkg.__path__, prefix=steps_pkg.__name__ + "." ): if not name.endswith("._") and not any(n.startswith("_") for n in name.split(".")): importlib.import_module(name) import geopipe_agent 时,所有 steps/ 子目录下的模块都被自动导入,@step 装饰器在模块导入时即注册步骤。 如何让自定义步骤也被自动发现: 方案一:直接 import(最简单) 在使用自定义步骤的 Python 脚本中,在 import geopipe_agent 之后导入自定义步骤模块: import geopipe_agent # 加载内置步骤 # 加载自定义步骤(触发 @step 注册) import my_steps.spatial_join import my_steps.polygon_merge from geopipe_agent.engine.parser import parse_yaml from geopipe_agent.engine.executor import execute_pipeline pipeline = parse_yaml("pipeline.yaml") report = execute_pipeline(pipeline) 方案二:创建插件包(可分发) 创建一个 Python 包,在其 __init__.py 中导入所有自定义步骤,通过 pip install 安装后自动注册: my-geopipe-steps/ ├── pyproject.toml └── src/ └── my_geopipe_steps/ ├── __init__.py # 导入所有步骤模块 ├── spatial_join.py └── polygon_merge.py __init__.py: # 导入所有步骤(触发 @step 注册) from . import spatial_join from . import polygon_merge 安装后,在 Python 脚本开头导入: import geopipe_agent import my_geopipe_steps # 注册自定义步骤 方案三:修改内置步骤(不推荐) 直接将自定义步骤放入 src/geopipe_agent/steps/custom/ 目录,自动发现机制会加载它们。仅适合本地修改,不适合分发。 18.5 开发规范与最佳实践 5.1 错误处理 步骤内的异常会被 executor 捕获并包装为 StepExecutionError,不需要在步骤中手动包装。但可以提供清晰的错误消息: def my_step(ctx: StepContext) -> StepResult: gdf = ctx.input("input") field = ctx.param("field") if field not in gdf.columns: raise ValueError( f"Field '{field}' not found in input data. " f"Available fields: {list(gdf.columns)}" ) # ... 处理逻辑 5.2 延迟导入重依赖 将可选依赖放在函数内部导入,避免在步骤注册时(模块 import 时)就引入依赖报错: @step(id="custom.ml_classify", ...) def ml_classify(ctx: StepContext) -> StepResult: # 延迟导入,只在步骤实际执行时才尝试导入 try: from sklearn.ensemble import RandomForestClassifier except ImportError: raise ImportError( "scikit-learn is required for 'custom.ml_classify'. " "Install it with: pip install scikit-learn" ) # ... 使用 sklearn 5.3 stats 字段命名 stats 字典的键会通过 $step.key 语法暴露给后续步骤,使用清晰的名称: return StepResult( output=result_gdf, stats={ "feature_count": len(result_gdf), # 标准命名,与内置步骤一致 "join_count": len(joined_gdf), # 自定义统计 "skip_count": len(skipped), # 跳过数量 } ) 5.4 QC 步骤的特殊模式 自定义 QC 步骤应遵循"检查并透传"模式: from geopipe_agent.models.qc import QcIssue from geopipe_agent.steps.qc._helpers import build_issues_gdf @step(id="custom.qc_area_check", ...) def custom_area_check(ctx: StepContext) -> StepResult: gdf = ctx.input("input") min_area = ctx.param("min_area", 1.0) issues = [] for idx, row in gdf.iterrows(): area = row.geometry.area if area < min_area: issues.append(QcIssue( rule_id="area_check", severity="warning", feature_index=idx, message=f"Feature {idx}: area {area:.4f} < min {min_area}", details={"area": area, "min_area": min_area}, )) return StepResult( output=gdf, # 透传输入数据 stats={ "issues_count": len(issues), "valid_count": len(gdf) - len(issues), }, metadata={"issues_gdf": build_issues_gdf(gdf, issues)}, issues=issues, ) 18.6 在 YAML 中使用自定义步骤 自定义步骤注册后,可直接在 YAML 中使用: pipeline: name: "使用自定义步骤的流水线" steps: - id: load-poi use: io.read_vector params: { path: "data/poi.shp" } - id: load-districts use: io.read_vector params: { path: "data/districts.shp" } # 使用自定义空间连接步骤 - id: join-district-attrs use: custom.spatial_join params: left: "$load-poi" right: "$load-districts" how: "left" op: "within" - id: save-result use: io.write_vector params: input: "$join-district-attrs" path: "output/poi_with_district.geojson" 18.7 本章小结 本章介绍了自定义步骤的开发与集成: @step 装饰器:注册步骤,定义参数规范、输出、后端和示例 StepContext:ctx.param()、ctx.input()、ctx.backend 获取参数和后端 StepResult:output、stats、metadata、issues 四个字段 自动发现:import geopipe_agent 自动加载内置步骤,自定义步骤需手动 import 或打包 最佳实践:清晰错误消息、延迟导入依赖、QC 步骤透传数据 导航:← 第十七章:Skill 文件与 AI 集成 | 第十九章:Cookbook 示例精讲 →