如何编排工作流以构建复杂业务逻辑?

摘要:工作流编排:构建复杂业务逻辑 前言 在前几篇文章中,我们学习了如何创建智能体、如何管理对话状态、如何实现长期记忆。但是,真实的业务场景往往更加复杂:一个完整的业务流程可能涉及多个步骤、需要条件分支、并行处理、错误恢复等高级特性。 例如,一个
工作流编排:构建复杂业务逻辑 前言 在前几篇文章中,我们学习了如何创建智能体、如何管理对话状态、如何实现长期记忆。但是,真实的业务场景往往更加复杂:一个完整的业务流程可能涉及多个步骤、需要条件分支、并行处理、错误恢复等高级特性。 例如,一个订单处理流程可能包含以下步骤:验证用户身份 → 检查商品库存 → 计算价格 → 处理支付 → 生成订单 → 发送通知。每个步骤都可能有失败的情况,需要有相应的重试或回退机制。 这就是工作流编排(Workflow Orchestration)的用武之地。本文将深入探讨如何在Agent Framework中实现复杂业务逻辑的编排,包括基于图的工作流、状态机、并行执行、条件分支等核心概念。 一、工作流编排基础 1.1 什么是工作流编排 工作流编排是指协调多个任务或步骤的执行,以完成复杂业务流程的过程。与简单的线性流程不同,现代工作流通常需要: 顺序执行:按照预定义的顺序执行任务。 条件分支:根据条件选择不同的执行路径。 并行执行:同时执行多个独立的任务以提高效率。 循环迭代:重复执行某些任务直到满足条件。 错误处理:处理执行过程中的异常情况。 状态持久化:保存工作流执行状态,支持中断恢复。 1.2 Agent Framework的工作流支持 Agent Framework提供了强大的工作流编排能力,主要包括: 图引擎(Graph Engine):基于有向无环图(DAG)的工作流定义和执行。 消息路由:根据消息内容将任务路由到不同的处理节点。 并行处理:支持多个任务的并行执行和结果聚合。 中间件管道:支持在请求处理过程中插入自定义逻辑。 二、基于图的工作流引擎 2.1 工作流定义 首先,我们定义工作流的基础结构: // WorkflowDefinition.cs public class WorkflowDefinition { public string Id { get; set; } = Guid.NewGuid().ToString(); public string Name { get; set; } = string.Empty; public string Description { get; set; } = string.Empty; public List<WorkflowNode> Nodes { get; set; } = new(); public List<WorkflowEdge> Edges { get; set; } = new(); public Dictionary<string, object> Metadata { get; set; } = new(); public DateTime CreatedAt { get; set; } = DateTime.UtcNow; public DateTime UpdatedAt { get; set; } = DateTime.UtcNow; } // 工作流节点 public class WorkflowNode { public string Id { get; set; } = Guid.NewGuid().ToString(); public string Name { get; set; } = string.Empty; public WorkflowNodeType Type { get; set; } public Dictionary<string, object> Properties { get; set; } = new(); public List<string> InputBindings { get; set; } = new(); public List<string> OutputBindings { get; set; } = new(); } // 节点类型 public enum WorkflowNodeType { Agent, // 智能体节点 Tool, // 工具节点 Condition, // 条件判断节点 Switch, // 多分支节点 Parallel, // 并行执行节点 Merge, // 结果聚合节点 Loop, // 循环节点 SubWorkflow, // 子流程节点 Start, // 开始节点 End // 结束节点 } // 工作流边 public class WorkflowEdge { public string Id { get; set; } = Guid.NewGuid().ToString(); public string SourceNodeId { get; set; } = string.Empty; public string TargetNodeId { get; set; } = string.Empty; public string? Condition { get; set; } // 触发条件 public Dictionary<string, object> Properties { get; set; } = new(); } 2.2 工作流执行器 现在实现工作流的执行引擎: // WorkflowExecutor.cs public class WorkflowExecutor { private readonly IServiceProvider _serviceProvider; private readonly ILogger<WorkflowExecutor> _logger; private readonly IMetrics _metrics; public WorkflowExecutor( IServiceProvider serviceProvider, ILogger<WorkflowExecutor> logger, IMetrics metrics) { _serviceProvider = serviceProvider; _logger = logger; _metrics = metrics; } public async Task<WorkflowExecutionResult> ExecuteAsync( WorkflowDefinition workflow, WorkflowExecutionContext context) { var execution = new WorkflowExecution { WorkflowId = workflow.Id, WorkflowName = workflow.Name, StartedAt = DateTime.UtcNow, Status = WorkflowExecutionStatus.Running }; try { // 验证工作流 ValidateWorkflow(workflow); // 找到起始节点 var startNode = workflow.Nodes.FirstOrDefault(n => n.Type == WorkflowNodeType.Start); if (startNode == null) { throw new InvalidOperationException("工作流缺少开始节点"); } // 执行工作流 await ExecuteNodeAsync(workflow, startNode, context, execution); execution.Status = WorkflowExecutionStatus.Completed; execution.CompletedAt = DateTime.UtcNow; _logger.LogInformation("工作流执行完成: {WorkflowName}, Duration={Duration}ms", workflow.Name, (execution.CompletedAt - execution.StartedAt).TotalMilliseconds); return new WorkflowExecutionResult { Success = true, Execution = execution, Output = context.Output }; } catch (Exception ex) { execution.Status = WorkflowExecutionStatus.Failed; execution.Error = ex.Message; execution.CompletedAt = DateTime.UtcNow; _logger.LogError(ex, "工作流执行失败: {WorkflowName}", workflow.Name); return new WorkflowExecutionResult { Success = false, Execution = execution, Error = ex.Message }; } } private async Task ExecuteNodeAsync( WorkflowDefinition workflow, WorkflowNode node, WorkflowExecutionContext context, WorkflowExecution execution) { var nodeExecution = new NodeExecution { NodeId = node.Id, NodeName = node.Name, StartedAt = DateTime.UtcNow }; execution.NodeExecutions.Add(nodeExecution); _logger.LogDebug("执行节点: {NodeName}", node.Name); try { var result = node.Type switch { WorkflowNodeType.Start => await ExecuteStartNodeAsync(node, context), WorkflowNodeType.End => await ExecuteEndNodeAsync(node, context), WorkflowNodeType.Agent => await ExecuteAgentNodeAsync(node, context), WorkflowNodeType.Tool => await ExecuteToolNodeAsync(node, context), WorkflowNodeType.Condition => await ExecuteConditionNodeAsync(node, workflow, context), WorkflowNodeType.Switch => await ExecuteSwitchNodeAsync(node, workflow, context), WorkflowNodeType.Parallel => await ExecuteParallelNodeAsync(workflow, node, context), WorkflowNodeType.Loop => await ExecuteLoopNodeAsync(workflow, node, context), _ => throw new NotSupportedException($"不支持的节点类型: {node.Type}") }; nodeExecution.Result = result; nodeExecution.CompletedAt = DateTime.UtcNow; // 根据结果确定下一个节点 var nextNodeId = GetNextNode(workflow, node, result, context); if (!string.IsNullOrEmpty(nextNodeId)) { var nextNode = workflow.Nodes.FirstOrDefault(n => n.Id == nextNodeId); if (nextNode != null) { await ExecuteNodeAsync(workflow, nextNode, context, execution); } } } catch (Exception ex) { nodeExecution.Error = ex.Message; nodeExecution.CompletedAt = DateTime.UtcNow; // 检查是否有错误处理边 var errorEdge = workflow.Edges.FirstOrDefault(e => e.SourceNodeId == node.Id && e.Properties.ContainsKey("onError")); if (errorEdge != null) { var errorHandlerNode = workflow.Nodes.FirstOrDefault(n => n.Id == errorEdge.TargetNodeId); if (errorHandlerNode != null) { context.SetValue("error", ex.Message); await ExecuteNodeAsync(workflow, errorHandlerNode, context, execution); } } else { throw; } } } private async Task<NodeExecutionResult> ExecuteAgentNodeAsync( WorkflowNode node, WorkflowExecutionContext context) { var agentId = node.Properties.GetValueOrDefault("agentId")?.ToString(); var prompt = node.Properties.GetValueOrDefault("prompt")?.ToString(); if (string.IsNullOrEmpty(agentId)) { throw new InvalidOperationException($"节点 {node.Name} 缺少 agentId"); } // 获取智能体 var agent = _serviceProvider.GetService(typeof(IAIAgent)) as IAIAgent ?? throw new InvalidOperationException($"无法获取智能体: {agentId}"); // 准备输入 var input = context.GetValue<string>("input") ?? string.Empty; if (!string.IsNullOrEmpty(prompt)) { input = FormatPrompt(prompt, context.Values); } // 执行智能体 var response = await agent.ProcessAsync(context, input); // 保存输出 context.SetValue($"output_{node.Id}", response); context.SetValue("latest_output", response); return new NodeExecutionResult { Success = true, Output = response, Data = new Dictionary<string, object> { { "response", response } } }; } private async Task<NodeExecutionResult> ExecuteToolNodeAsync( WorkflowNode node, WorkflowExecutionContext context) { var toolName = node.Properties.GetValueOrDefault("toolName")?.ToString(); var toolParameters = node.Properties.GetValueOrDefault("parameters") as Dictionary<string, object>; if (string.IsNullOrEmpty(toolName)) { throw new InvalidOperationException($"节点 {node.Name} 缺少 toolName"); } // 解析参数 var resolvedParams = ResolveParameters(toolParameters, context.Values); // 获取并执行工具 var tool = _serviceProvider.GetService(typeof(ITool)) as ITool ?? throw new InvalidOperationException($"无法获取工具: {toolName}"); var result = await tool.ExecuteAsync(resolvedParams); context.SetValue($"output_{node.Id}", result); return new NodeExecutionResult { Success = true, Output = JsonSerializer.Serialize(result), Data = new Dictionary<string, object> { { "result", result } } }; } private async Task<NodeExecutionResult> ExecuteConditionNodeAsync( WorkflowNode node, WorkflowDefinition workflow, WorkflowExecutionContext context) { var condition = node.Properties.GetValueOrDefault("condition")?.ToString(); // 评估条件 var result = EvaluateCondition(condition, context.Values); _logger.LogDebug("条件评估结果: {Condition} = {Result}", condition, result); return new NodeExecutionResult { Success = true, Output = result.ToString(), Data = new Dictionary<string, object> { { "conditionResult", result } } }; } private async Task<NodeExecutionResult> ExecuteParallelNodeAsync( WorkflowDefinition workflow, WorkflowNode node, WorkflowExecutionContext context) { var parallelNodeIds = node.Properties.GetValueOrDefault("parallelNodes") as List<string> ?? new List<string>(); var parallelNodes = workflow.Nodes .Where(n => parallelNodeIds.Contains(n.Id)) .ToList(); var tasks = parallelNodes.Select(node => ExecuteNodeAsync(workflow, node, context, new WorkflowExecution())); var results = await Task.WhenAll(tasks); // 聚合结果 var aggregated = new Dictionary<string, object>(); for (int i = 0; i < results.Length; i++) { aggregated[parallelNodes[i].Id] = results[i]; } context.SetValue($"output_{node.Id}", aggregated); return new NodeExecutionResult { Success = true, Output = "并行执行完成", Data = aggregated }; } private string GetNextNode( WorkflowDefinition workflow, WorkflowNode currentNode, NodeExecutionResult result, WorkflowExecutionContext context) { var outgoingEdges = workflow.Edges .Where(e => e.SourceNodeId == currentNode.Id) .ToList(); // 如果没有出边,返回空(工作流结束) if (!outgoingEdges.Any()) { return null; } // 查找满足条件的边 foreach (var edge in outgoingEdges) { if (string.IsNullOrEmpty(edge.Condition)) { return edge.TargetNodeId; // 默认边 } if (EvaluateCondition(edge.Condition, context.Values)) { return edge.TargetNodeId; } } return outgoingEdges.FirstOrDefault()?.TargetNodeId; } private bool EvaluateCondition(string? condition, Dictionary<string, object> values) { if (string.IsNullOrEmpty(condition)) { return true; } // 简化实现:支持简单的表达式 // 生产环境应使用表达式解析库 try { if (condition.Contains("==")) { var parts = condition.Split("=="); var left = ResolveValue(parts[0].Trim(), values); var right = ResolveValue(parts[1].Trim(), values); return left?.ToString() == right?.ToString(); } if (condition.Contains("!=")) { var parts = condition.Split("!="); var left = ResolveValue(parts[0].Trim(), values); var right = ResolveValue(parts[1].Trim(), values); return left?.ToString() != right?.ToString(); } // 默认返回true return true; } catch { return false; } } private object? ResolveValue(string expression, Dictionary<string, object> values) { if (expression.StartsWith("{{") && expression.EndsWith("}}")) { var key = expression.Substring(2, expression.Length - 4).Trim(); return values.GetValueOrDefault(key); } // 字面量 if (expression.StartsWith("'") && expression.EndsWith("'")) { return expression.Substring(1, expression.Length - 2); } if (bool.TryParse(expression, out var boolValue)) { return boolValue; } if (int.TryParse(expression, out var intValue)) { return intValue; } return expression; } private Dictionary<string, object> ResolveParameters( Dictionary<string, object>? parameters, Dictionary<string, object> values) { if (parameters == null) { return new Dictionary<string, object>(); } var resolved = new Dictionary<string, object>(); foreach (var kvp in parameters) { resolved[kvp.Key] = ResolveValue(kvp.Value?.ToString() ?? "", values); } return resolved; } private void ValidateWorkflow(WorkflowDefinition workflow) { if (!workflow.Nodes.Any()) { throw new InvalidOperationException("工作流不能为空"); } if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.Start)) { throw new InvalidOperationException("工作流必须包含开始节点"); } if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.End)) { throw new InvalidOperationException("工作流必须包含结束节点"); } } } 三、订单处理工作流示例 3.1 定义订单处理工作流 让我们通过一个实际的订单处理流程来演示工作流的用法: // OrderProcessingWorkflow.cs public class OrderProcessingWorkflow { public static WorkflowDefinition CreateOrderProcessingWorkflow() { var workflow = new WorkflowDefinition { Name = "订单处理流程", Description = "完整的订单处理流程,包括验证、支付、库存、通知等步骤" }; // 开始节点 var start = new WorkflowNode { Id = "start", Name = "开始", Type = WorkflowNodeType.Start, Properties = new Dictionary<string, object> { { "description", "接收订单请求" } } }; // 验证用户身份 var validateUser = new WorkflowNode { Id = "validate_user", Name = "验证用户", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "ValidateUserTool" }, { "parameters", new Dictionary<string, object> { { "userId", "{{userId}}" } } } } }; // 检查用户是否有效 var checkUserValid = new WorkflowNode { Id = "check_user_valid", Name = "检查用户有效性", Type = WorkflowNodeType.Condition, Properties = new Dictionary<string, object> { { "condition", "{{user_valid}} == true" } } }; // 检查商品库存 var checkInventory = new WorkflowNode { Id = "check_inventory", Name = "检查库存", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "CheckInventoryTool" }, { "parameters", new Dictionary<string, object> { { "productId", "{{productId}}" }, { "quantity", "{{quantity}}" } } } } }; // 检查库存是否充足 var checkStockAvailable = new WorkflowNode { Id = "check_stock", Name = "检查库存可用", Type = WorkflowNodeType.Condition, Properties = new Dictionary<string, object> { { "condition", "{{stock_available}} == true" } } }; // 计算价格 var calculatePrice = new WorkflowNode { Id = "calculate_price", Name = "计算价格", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "CalculatePriceTool" }, { "parameters", new Dictionary<string, object> { { "productId", "{{productId}}" }, { "quantity", "{{quantity}}" }, { "couponCode", "{{couponCode}}" } } } } }; // 处理支付 var processPayment = new WorkflowNode { Id = "process_payment", Name = "处理支付", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "ProcessPaymentTool" }, { "parameters", new Dictionary<string, object> { { "userId", "{{userId}}" }, { "amount", "{{total_price}}" }, { "paymentMethod", "{{paymentMethod}}" } } } } }; // 检查支付结果 var checkPaymentSuccess = new WorkflowNode { Id = "check_payment", Name = "检查支付结果", Type = WorkflowNodeType.Condition, Properties = new Dictionary<string, object> { { "condition", "{{payment_success}} == true" } } }; // 锁定库存 var reserveInventory = new WorkflowNode { Id = "reserve_inventory", Name = "锁定库存", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "ReserveInventoryTool" }, { "parameters", new Dictionary<string, object> { { "productId", "{{productId}}" }, { "quantity", "{{quantity}}" } } } } }; // 创建订单 var createOrder = new WorkflowNode { Id = "create_order", Name = "创建订单", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "CreateOrderTool" }, { "parameters", new Dictionary<string, object> { { "userId", "{{userId}}" }, { "productId", "{{productId}}" }, { "quantity", "{{quantity}}" }, { "totalPrice", "{{total_price}}" } } } } }; // 发送通知 var sendNotification = new WorkflowNode { Id = "send_notification", Name = "发送通知", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "SendNotificationTool" }, { "parameters", new Dictionary<string, object> { { "userId", "{{userId}}" }, { "orderId", "{{orderId}}" }, { "type", "order_created" } } } } }; // 订单创建成功 var orderSuccess = new WorkflowNode { Id = "order_success", Name = "订单成功", Type = WorkflowNodeType.End, Properties = new Dictionary<string, object> { { "description", "订单创建成功" } } }; // 库存不足 var outOfStock = new WorkflowNode { Id = "out_of_stock", Name = "库存不足", Type = WorkflowNodeType.End, Properties = new Dictionary<string, object> { { "description", "商品库存不足" } } }; // 用户无效 var userInvalid = new WorkflowNode { Id = "user_invalid", Name = "用户无效", Type = WorkflowNodeType.End, Properties = new Dictionary<string, object> { { "description", "用户验证失败" } } }; // 支付失败 var paymentFailed = new WorkflowNode { Id = "payment_failed", Name = "支付失败", Type = WorkflowNodeType.End, Properties = new Dictionary<string, object> { { "description", "支付处理失败" } } }; // 添加所有节点 workflow.Nodes.AddRange(new[] { start, validateUser, checkUserValid, checkInventory, checkStockAvailable, calculatePrice, processPayment, checkPaymentSuccess, reserveInventory, createOrder, sendNotification, orderSuccess, outOfStock, userInvalid, paymentFailed }); // 添加边(工作流连接) workflow.Edges.AddRange(new[] { // 开始 -> 验证用户 new WorkflowEdge { SourceNodeId = "start", TargetNodeId = "validate_user" }, // 验证用户 -> 检查用户有效性 new WorkflowEdge { SourceNodeId = "validate_user", TargetNodeId = "check_user_valid" }, // 检查用户有效性 -> 库存检查(用户有效) new WorkflowEdge { SourceNodeId = "check_user_valid", TargetNodeId = "check_inventory", Condition = "{{user_valid}} == true" }, // 检查用户有效性 -> 用户无效(用户无效) new WorkflowEdge { SourceNodeId = "check_user_valid", TargetNodeId = "user_invalid", Condition = "{{user_valid}} == false" }, // 库存检查 -> 检查库存可用 new WorkflowEdge { SourceNodeId = "check_inventory", TargetNodeId = "check_stock" }, // 检查库存可用 -> 计算价格(库存充足) new WorkflowEdge { SourceNodeId = "check_stock", TargetNodeId = "calculate_price", Condition = "{{stock_available}} == true" }, // 检查库存可用 -> 库存不足(库存不足) new WorkflowEdge { SourceNodeId = "check_stock", TargetNodeId = "out_of_stock", Condition = "{{stock_available}} == false" }, // 计算价格 -> 处理支付 new WorkflowEdge { SourceNodeId = "calculate_price", TargetNodeId = "process_payment" }, // 处理支付 -> 检查支付结果 new WorkflowEdge { SourceNodeId = "process_payment", TargetNodeId = "check_payment" }, // 检查支付结果 -> 锁定库存(支付成功) new WorkflowEdge { SourceNodeId = "check_payment", TargetNodeId = "reserve_inventory", Condition = "{{payment_success}} == true" }, // 检查支付结果 -> 支付失败(支付失败) new WorkflowEdge { SourceNodeId = "check_payment", TargetNodeId = "payment_failed", Condition = "{{payment_success}} == false" }, // 锁定库存 -> 创建订单 new WorkflowEdge { SourceNodeId = "reserve_inventory", TargetNodeId = "create_order" }, // 创建订单 -> 发送通知 new WorkflowEdge { SourceNodeId = "create_order", TargetNodeId = "send_notification" }, // 发送通知 -> 订单成功 new WorkflowEdge { SourceNodeId = "send_notification", TargetNodeId = "order_success" } }); return workflow; } } 3.2 执行订单处理工作流 // OrderWorkflowService.cs public class OrderWorkflowService { private readonly WorkflowExecutor _executor; private readonly IWorkflowStore _workflowStore; private readonly ILogger<OrderWorkflowService> _logger; public OrderWorkflowService( WorkflowExecutor executor, IWorkflowStore workflowStore, ILogger<OrderWorkflowService> logger) { _executor = executor; _workflowStore = workflowStore; _logger = logger; } public async Task<OrderResult> ProcessOrderAsync(OrderRequest request) { // 获取或创建工作流 var workflow = await _workflowStore.GetWorkflowAsync("order_processing"); if (workflow == null) { workflow = OrderProcessingWorkflow.CreateOrderProcessingWorkflow(); await _workflowStore.SaveWorkflowAsync(workflow); } // 创建执行上下文 var context = new WorkflowExecutionContext { ExecutionId = Guid.NewGuid().ToString(), UserId = request.UserId, Values = new Dictionary<string, object> { { "userId", request.UserId }, { "productId", request.ProductId }, { "quantity", request.Quantity }, { "couponCode", request.CouponCode ?? "" }, { "paymentMethod", request.PaymentMethod }, { "input", $"创建订单: {request.ProductId} x {request.Quantity}" } } }; _logger.LogInformation("开始处理订单: UserId={UserId}, ProductId={ProductId}", request.UserId, request.ProductId); try { var result = await _executor.ExecuteAsync(workflow, context); if (result.Success) { var orderId = context.GetValue<string>("orderId"); _logger.LogInformation("订单处理成功: OrderId={OrderId}", orderId); return new OrderResult { Success = true, OrderId = orderId, Message = "订单创建成功", TotalPrice = context.GetValue<decimal>("total_price") }; } else { _logger.LogWarning("订单处理失败: Error={Error}", result.Error); return new OrderResult { Success = false, Message = result.Error ?? "订单处理失败" }; } } catch (Exception ex) { _logger.LogError(ex, "订单处理异常: UserId={UserId}", request.UserId); return new OrderResult { Success = false, Message = "系统错误,请稍后重试" }; } } } 四、动态工作流构建 4.1 可视化工作流设计器 在生产环境中,通常需要一个可视化的工作流设计器: // WorkflowDesignerService.cs public class WorkflowDesignerService { private readonly IWorkflowStore _workflowStore; private readonly ILogger<WorkflowDesignerService> _logger; public WorkflowDesignerService( IWorkflowStore workflowStore, ILogger<WorkflowDesignerService> logger) { _workflowStore = workflowStore; _logger = logger; } // 创建新工作流 public async Task<WorkflowDefinition> CreateWorkflowAsync(CreateWorkflowRequest request) { var workflow = new WorkflowDefinition { Name = request.Name, Description = request.Description, Metadata = request.Metadata ?? new Dictionary<string, object>() }; // 添加开始节点 workflow.Nodes.Add(new WorkflowNode { Id = "start", Name = "开始", Type = WorkflowNodeType.Start }); // 添加结束节点 workflow.Nodes.Add(new WorkflowNode { Id = "end", Name = "结束", Type = WorkflowNodeType.End }); await _workflowStore.SaveWorkflowAsync(workflow); _logger.LogInformation("创建工作流: {Name}", workflow.Name); return workflow; } // 添加节点 public async Task<WorkflowNode> AddNodeAsync( string workflowId, AddNodeRequest request) { var workflow = await _workflowStore.GetWorkflowAsync(workflowId) ?? throw new InvalidOperationException($"工作流不存在: {workflowId}"); var node = new WorkflowNode { Name = request.Name, Type = request.NodeType, Properties = request.Properties ?? new Dictionary<string, object>() }; workflow.Nodes.Add(node); await _workflowStore.SaveWorkflowAsync(workflow); return node; } // 连接节点 public async Task ConnectNodesAsync( string workflowId, string sourceNodeId, string targetNodeId, string? condition = null) { var workflow = await _workflowStore.GetWorkflowAsync(workflowId) ?? throw new InvalidOperationException($"工作流不存在: {workflowId}"); // 验证节点存在 if (!workflow.Nodes.Any(n => n.Id == sourceNodeId)) { throw new InvalidOperationException($"源节点不存在: {sourceNodeId}"); } if (!workflow.Nodes.Any(n => n.Id == targetNodeId)) { throw new InvalidOperationException($"目标节点不存在: {targetNodeId}"); } // 添加边 var edge = new WorkflowEdge { SourceNodeId = sourceNodeId, TargetNodeId = targetNodeId, Condition = condition }; workflow.Edges.Add(edge); workflow.UpdatedAt = DateTime.UtcNow; await _workflowStore.SaveWorkflowAsync(workflow); _logger.LogInformation("连接节点: {Source} -> {Target}", sourceNodeId, targetNodeId); } // 验证工作流 public async Task<WorkflowValidationResult> ValidateWorkflowAsync(string workflowId) { var workflow = await _workflowStore.GetWorkflowAsync(workflowId) ?? throw new InvalidOperationException($"工作流不存在: {workflowId}"); var result = new WorkflowValidationResult { IsValid = true }; // 检查开始节点 if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.Start)) { result.IsValid = false; result.Errors.Add("工作流必须包含开始节点"); } // 检查结束节点 if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.End)) { result.IsValid = false; result.Errors.Add("工作流必须包含结束节点"); } // 检查孤立节点 var connectedNodeIds = workflow.Edges .SelectMany(e => new[] { e.SourceNodeId, e.TargetNodeId }) .Distinct() .ToHashSet(); foreach (var node in workflow.Nodes) { if (!connectedNodeIds.Contains(node.Id) && node.Type != WorkflowNodeType.Start && node.Type != WorkflowNodeType.End) { result.Warnings.Add($"节点 {node.Name} 未连接到任何其他节点"); } } // 检查循环引用 if (HasCircularReference(workflow)) { result.IsValid = false; result.Errors.Add("工作流存在循环引用"); } return result; } private bool HasCircularReference(WorkflowDefinition workflow) { // 简化实现:检查是否存在从End回到Start的路径 var visited = new HashSet<string>(); var path = new List<string>(); var startNode = workflow.Nodes.FirstOrDefault(n => n.Type == WorkflowNodeType.Start); if (startNode == null) return false; return CanReachNode(workflow, startNode.Id, "start", visited, path); } private bool CanReachNode( WorkflowDefinition workflow, string currentNodeId, string targetNodeId, HashSet<string> visited, List<string> path) { if (currentNodeId == targetNodeId && path.Count > 0) { return true; } if (visited.Contains(currentNodeId)) { return false; } visited.Add(currentNodeId); path.Add(currentNodeId); var outgoingEdges = workflow.Edges.Where(e => e.SourceNodeId == currentNodeId); foreach (var edge in outgoingEdges) { if (CanReachNode(workflow, edge.TargetNodeId, targetNodeId, visited, path)) { return true; } } path.RemoveAt(path.Count - 1); return false; } } 五、工作流版本管理 5.1 版本控制实现 // WorkflowVersioningService.cs public class WorkflowVersioningService { private readonly IWorkflowStore _workflowStore; private readonly ILogger<WorkflowVersioningService> _logger; public WorkflowVersioningService( IWorkflowStore workflowStore, ILogger<WorkflowVersioningService> logger) { _workflowStore = workflowStore; _logger = logger; } // 创建新版本 public async Task<WorkflowVersion> CreateVersionAsync( string workflowId, string changeDescription) { var workflow = await _workflowStore.GetWorkflowAsync(workflowId) ?? throw new InvalidOperationException($"工作流不存在: {workflowId}"); // 获取当前版本号 var currentVersion = await GetLatestVersionAsync(workflowId); var newVersionNumber = currentVersion + 1; // 创建版本快照 var version = new WorkflowVersion { Id = Guid.NewGuid().ToString(), WorkflowId = workflowId, VersionNumber = newVersionNumber, Definition = JsonSerializer.Serialize(workflow), ChangeDescription = changeDescription, CreatedAt = DateTime.UtcNow, CreatedBy = "system" }; // 保存版本 await _workflowStore.SaveVersionAsync(version); _logger.LogInformation("创建工作流版本: WorkflowId={WorkflowId}, Version={Version}", workflowId, newVersionNumber); return version; } // 回滚到指定版本 public async Task<WorkflowDefinition> RollbackToVersionAsync( string workflowId, int versionNumber) { var version = await _workflowStore.GetVersionAsync(workflowId, versionNumber) ?? throw new InvalidOperationException($"版本不存在: {versionNumber}"); var workflow = JsonSerializer.Deserialize<WorkflowDefinition>(version.Definition) ?? throw new InvalidOperationException("无法解析工作流定义"); // 更新当前工作流 workflow.UpdatedAt = DateTime.UtcNow; await _workflowStore.SaveWorkflowAsync(workflow); _logger.LogInformation("回滚工作流: WorkflowId={WorkflowId}, Version={Version}", workflowId, versionNumber); return workflow; } // 获取版本历史 public async Task<List<WorkflowVersion>> GetVersionHistoryAsync(string workflowId) { return await _workflowStore.GetVersionHistoryAsync(workflowId); } // 获取最新版本号 private async Task<int> GetLatestVersionAsync(string workflowId) { var versions = await _workflowStore.GetVersionHistoryAsync(workflowId); return versions.Any() ? versions.Max(v => v.VersionNumber) : 0; } } 六、工作流监控与日志 6.1 执行监控 // WorkflowMonitor.cs public class WorkflowMonitor { private readonly IMetrics _metrics; private readonly ILogger<WorkflowMonitor> _logger; public WorkflowMonitor(IMetrics metrics, ILogger<WorkflowMonitor> logger) { _metrics = metrics; _logger = logger; } public void RecordExecutionStarted(string workflowName, string executionId) { _metrics.IncrementCounter("workflow.executions.started"); _metrics.IncrementCounter($"workflow.executions.started.{workflowName}"); _logger.LogInformation("工作流开始执行: Workflow={Workflow}, ExecutionId={ExecutionId}", workflowName, executionId); } public void RecordExecutionCompleted( string workflowName, string executionId, TimeSpan duration, bool success) { _metrics.RecordHistogram("workflow.execution.duration", duration.TotalMilliseconds); _metrics.RecordHistogram( $"workflow.execution.duration.{workflowName}", duration.TotalMilliseconds); if (success) { _metrics.IncrementCounter("workflow.executions.completed"); _metrics.IncrementCounter($"workflow.executions.completed.{workflowName}"); } else { _metrics.IncrementCounter("workflow.executions.failed"); _metrics.IncrementCounter($"workflow.executions.failed.{workflowName}"); } _logger.LogInformation( "工作流执行完成: Workflow={Workflow}, ExecutionId={ExecutionId}, Duration={Duration}ms, Success={Success}", workflowName, executionId, duration.TotalMilliseconds, success); } public void RecordNodeExecution( string workflowName, string nodeName, TimeSpan duration, bool success) { _metrics.RecordHistogram( $"workflow.node.execution.duration.{workflowName}.{nodeName}", duration.TotalMilliseconds); if (!success) { _metrics.IncrementCounter( $"workflow.node.execution.failed.{workflowName}.{nodeName}"); } } public void RecordRetry(string workflowName, string nodeName, int attemptNumber) { _metrics.IncrementCounter("workflow.retries"); _metrics.IncrementCounter($"workflow.retries.{workflowName}.{nodeName}"); _logger.LogWarning( "工作流重试: Workflow={Workflow}, Node={Node}, Attempt={Attempt}", workflowName, nodeName, attemptNumber); } } 七、实战:客服工作流 7.1 智能客服工作流 // CustomerServiceWorkflow.cs public class CustomerServiceWorkflow { public static WorkflowDefinition CreateCustomerServiceWorkflow() { var workflow = new WorkflowDefinition { Name = "智能客服工作流", Description = "处理客户咨询、问题分类、转接等流程" }; // 开始 var start = new WorkflowNode { Id = "start", Name = "接收咨询", Type = WorkflowNodeType.Start }; // 理解用户意图 var understandIntent = new WorkflowNode { Id = "understand_intent", Name = "理解意图", Type = WorkflowNodeType.Agent, Properties = new Dictionary<string, object> { { "agentId", "intent_classifier" }, { "prompt", "分析用户意图,分类为:订单查询、退款申请、技术支持、其他" } } }; // 意图分类 var classifyIntent = new WorkflowNode { Id = "classify_intent", Name = "意图分类", Type = WorkflowNodeType.Switch, Properties = new Dictionary<string, object> { { "expression", "{{intent}}" } } }; // 订单查询流程 var handleOrder = new WorkflowNode { Id = "handle_order", Name = "处理订单查询", Type = WorkflowNodeType.SubWorkflow, Properties = new Dictionary<string, object> { { "subWorkflowId", "order_inquiry" } } }; // 退款流程 var handleRefund = new WorkflowNode { Id = "handle_refund", Name = "处理退款", Type = WorkflowNodeType.SubWorkflow, Properties = new Dictionary<string, object> { { "subWorkflowId", "refund_process" } } }; // 技术支持流程 var handleTechSupport = new WorkflowNode { Id = "handle_tech_support", Name = "技术支持", Type = WorkflowNodeType.SubWorkflow, Properties = new Dictionary<string, object> { { "subWorkflowId", "tech_support" } } }; // 其他问题 var handleOther = new WorkflowNode { Id = "handle_other", Name = "处理其他问题", Type = WorkflowNodeType.Agent, Properties = new Dictionary<string, object> { { "agentId", "general_support_agent" } } }; // 满意度调查 var satisfactionSurvey = new WorkflowNode { Id = "satisfaction_survey", Name = "满意度调查", Type = WorkflowNodeType.Tool, Properties = new Dictionary<string, object> { { "toolName", "SatisfactionSurveyTool" } } }; // 结束 var end = new WorkflowNode { Id = "end", Name = "结束", Type = WorkflowNodeType.End }; // 添加节点 workflow.Nodes.AddRange(new[] { start, understandIntent, classifyIntent, handleOrder, handleRefund, handleTechSupport, handleOther, satisfactionSurvey, end }); // 添加边 workflow.Edges.AddRange(new[] { new WorkflowEdge { SourceNodeId = "start", TargetNodeId = "understand_intent" }, new WorkflowEdge { SourceNodeId = "understand_intent", TargetNodeId = "classify_intent" }, // 意图分支 new WorkflowEdge { SourceNodeId = "classify_intent", TargetNodeId = "handle_order", Properties = new Dictionary<string, object> { { "case", "order_inquiry" } } }, new WorkflowEdge { SourceNodeId = "classify_intent", TargetNodeId = "handle_refund", Properties = new Dictionary<string, object> { { "case", "refund" } } }, new WorkflowEdge { SourceNodeId = "classify_intent", TargetNodeId = "handle_tech_support", Properties = new Dictionary<string, object> { { "case", "tech_support" } } }, new WorkflowEdge { SourceNodeId = "classify_intent", TargetNodeId = "handle_other", Properties = new Dictionary<string, object> { { "case", "other" } } }, // 汇合到满意度调查 new WorkflowEdge { SourceNodeId = "handle_order", TargetNodeId = "satisfaction_survey" }, new WorkflowEdge { SourceNodeId = "handle_refund", TargetNodeId = "satisfaction_survey" }, new WorkflowEdge { SourceNodeId = "handle_tech_support", TargetNodeId = "satisfaction_survey" }, new WorkflowEdge { SourceNodeId = "handle_other", TargetNodeId = "satisfaction_survey" }, new WorkflowEdge { SourceNodeId = "satisfaction_survey", TargetNodeId = "end" } }); return workflow; } } 八、总结与展望 通过本文的学习,我们已经掌握了工作流编排的核心技术: ✅ 工作流基础:理解工作流编排的概念和重要性 ✅ 图引擎实现:基于DAG的工作流定义和执行 ✅ 节点类型:Agent、Tool、Condition、Switch、Parallel等节点 ✅ 订单处理示例:完整的订单处理工作流实现 ✅ 动态工作流:可视化工作流设计器 ✅ 版本控制:工作流的版本管理和回滚 ✅ 监控日志:工作流执行的监控和日志记录 关键收获: 工作流编排是构建复杂业务系统的核心能力。通过合理的工作流设计,可以将复杂的业务逻辑分解为可管理的步骤,实现业务流程的自动化和可视化。在实际应用中,需要根据业务复杂度选择合适的工作流方案,并重视版本管理和监控。 下一篇文章预告: 在第七篇文章中,我们将探索自定义工具与中间件开发。我们将学习如何扩展Agent Framework的功能,创建自定义工具和中间件,以满足特定的业务需求。 实践建议: 从简单的工作流开始,逐步增加复杂性 为工作流添加完善的错误处理和重试机制 重视工作流的版本管理,便于问题排查和回滚 实现充分的监控和日志,便于运维和问题分析 相关资源: Agent Framework工作流文档 DAG算法详解 工作流模式参考 "好的工作流设计让复杂的业务逻辑变得清晰可控。"