如何编排工作流以构建复杂业务逻辑?
摘要:工作流编排:构建复杂业务逻辑 前言 在前几篇文章中,我们学习了如何创建智能体、如何管理对话状态、如何实现长期记忆。但是,真实的业务场景往往更加复杂:一个完整的业务流程可能涉及多个步骤、需要条件分支、并行处理、错误恢复等高级特性。 例如,一个
工作流编排:构建复杂业务逻辑
前言
在前几篇文章中,我们学习了如何创建智能体、如何管理对话状态、如何实现长期记忆。但是,真实的业务场景往往更加复杂:一个完整的业务流程可能涉及多个步骤、需要条件分支、并行处理、错误恢复等高级特性。
例如,一个订单处理流程可能包含以下步骤:验证用户身份 → 检查商品库存 → 计算价格 → 处理支付 → 生成订单 → 发送通知。每个步骤都可能有失败的情况,需要有相应的重试或回退机制。
这就是工作流编排(Workflow Orchestration)的用武之地。本文将深入探讨如何在Agent Framework中实现复杂业务逻辑的编排,包括基于图的工作流、状态机、并行执行、条件分支等核心概念。
一、工作流编排基础
1.1 什么是工作流编排
工作流编排是指协调多个任务或步骤的执行,以完成复杂业务流程的过程。与简单的线性流程不同,现代工作流通常需要:
顺序执行:按照预定义的顺序执行任务。
条件分支:根据条件选择不同的执行路径。
并行执行:同时执行多个独立的任务以提高效率。
循环迭代:重复执行某些任务直到满足条件。
错误处理:处理执行过程中的异常情况。
状态持久化:保存工作流执行状态,支持中断恢复。
1.2 Agent Framework的工作流支持
Agent Framework提供了强大的工作流编排能力,主要包括:
图引擎(Graph Engine):基于有向无环图(DAG)的工作流定义和执行。
消息路由:根据消息内容将任务路由到不同的处理节点。
并行处理:支持多个任务的并行执行和结果聚合。
中间件管道:支持在请求处理过程中插入自定义逻辑。
二、基于图的工作流引擎
2.1 工作流定义
首先,我们定义工作流的基础结构:
// WorkflowDefinition.cs
public class WorkflowDefinition
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = string.Empty;
public string Description { get; set; } = string.Empty;
public List<WorkflowNode> Nodes { get; set; } = new();
public List<WorkflowEdge> Edges { get; set; } = new();
public Dictionary<string, object> Metadata { get; set; } = new();
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
}
// 工作流节点
public class WorkflowNode
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = string.Empty;
public WorkflowNodeType Type { get; set; }
public Dictionary<string, object> Properties { get; set; } = new();
public List<string> InputBindings { get; set; } = new();
public List<string> OutputBindings { get; set; } = new();
}
// 节点类型
public enum WorkflowNodeType
{
Agent, // 智能体节点
Tool, // 工具节点
Condition, // 条件判断节点
Switch, // 多分支节点
Parallel, // 并行执行节点
Merge, // 结果聚合节点
Loop, // 循环节点
SubWorkflow, // 子流程节点
Start, // 开始节点
End // 结束节点
}
// 工作流边
public class WorkflowEdge
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string SourceNodeId { get; set; } = string.Empty;
public string TargetNodeId { get; set; } = string.Empty;
public string? Condition { get; set; } // 触发条件
public Dictionary<string, object> Properties { get; set; } = new();
}
2.2 工作流执行器
现在实现工作流的执行引擎:
// WorkflowExecutor.cs
public class WorkflowExecutor
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<WorkflowExecutor> _logger;
private readonly IMetrics _metrics;
public WorkflowExecutor(
IServiceProvider serviceProvider,
ILogger<WorkflowExecutor> logger,
IMetrics metrics)
{
_serviceProvider = serviceProvider;
_logger = logger;
_metrics = metrics;
}
public async Task<WorkflowExecutionResult> ExecuteAsync(
WorkflowDefinition workflow,
WorkflowExecutionContext context)
{
var execution = new WorkflowExecution
{
WorkflowId = workflow.Id,
WorkflowName = workflow.Name,
StartedAt = DateTime.UtcNow,
Status = WorkflowExecutionStatus.Running
};
try
{
// 验证工作流
ValidateWorkflow(workflow);
// 找到起始节点
var startNode = workflow.Nodes.FirstOrDefault(n => n.Type == WorkflowNodeType.Start);
if (startNode == null)
{
throw new InvalidOperationException("工作流缺少开始节点");
}
// 执行工作流
await ExecuteNodeAsync(workflow, startNode, context, execution);
execution.Status = WorkflowExecutionStatus.Completed;
execution.CompletedAt = DateTime.UtcNow;
_logger.LogInformation("工作流执行完成: {WorkflowName}, Duration={Duration}ms",
workflow.Name,
(execution.CompletedAt - execution.StartedAt).TotalMilliseconds);
return new WorkflowExecutionResult
{
Success = true,
Execution = execution,
Output = context.Output
};
}
catch (Exception ex)
{
execution.Status = WorkflowExecutionStatus.Failed;
execution.Error = ex.Message;
execution.CompletedAt = DateTime.UtcNow;
_logger.LogError(ex, "工作流执行失败: {WorkflowName}", workflow.Name);
return new WorkflowExecutionResult
{
Success = false,
Execution = execution,
Error = ex.Message
};
}
}
private async Task ExecuteNodeAsync(
WorkflowDefinition workflow,
WorkflowNode node,
WorkflowExecutionContext context,
WorkflowExecution execution)
{
var nodeExecution = new NodeExecution
{
NodeId = node.Id,
NodeName = node.Name,
StartedAt = DateTime.UtcNow
};
execution.NodeExecutions.Add(nodeExecution);
_logger.LogDebug("执行节点: {NodeName}", node.Name);
try
{
var result = node.Type switch
{
WorkflowNodeType.Start => await ExecuteStartNodeAsync(node, context),
WorkflowNodeType.End => await ExecuteEndNodeAsync(node, context),
WorkflowNodeType.Agent => await ExecuteAgentNodeAsync(node, context),
WorkflowNodeType.Tool => await ExecuteToolNodeAsync(node, context),
WorkflowNodeType.Condition => await ExecuteConditionNodeAsync(node, workflow, context),
WorkflowNodeType.Switch => await ExecuteSwitchNodeAsync(node, workflow, context),
WorkflowNodeType.Parallel => await ExecuteParallelNodeAsync(workflow, node, context),
WorkflowNodeType.Loop => await ExecuteLoopNodeAsync(workflow, node, context),
_ => throw new NotSupportedException($"不支持的节点类型: {node.Type}")
};
nodeExecution.Result = result;
nodeExecution.CompletedAt = DateTime.UtcNow;
// 根据结果确定下一个节点
var nextNodeId = GetNextNode(workflow, node, result, context);
if (!string.IsNullOrEmpty(nextNodeId))
{
var nextNode = workflow.Nodes.FirstOrDefault(n => n.Id == nextNodeId);
if (nextNode != null)
{
await ExecuteNodeAsync(workflow, nextNode, context, execution);
}
}
}
catch (Exception ex)
{
nodeExecution.Error = ex.Message;
nodeExecution.CompletedAt = DateTime.UtcNow;
// 检查是否有错误处理边
var errorEdge = workflow.Edges.FirstOrDefault(e =>
e.SourceNodeId == node.Id && e.Properties.ContainsKey("onError"));
if (errorEdge != null)
{
var errorHandlerNode = workflow.Nodes.FirstOrDefault(n =>
n.Id == errorEdge.TargetNodeId);
if (errorHandlerNode != null)
{
context.SetValue("error", ex.Message);
await ExecuteNodeAsync(workflow, errorHandlerNode, context, execution);
}
}
else
{
throw;
}
}
}
private async Task<NodeExecutionResult> ExecuteAgentNodeAsync(
WorkflowNode node,
WorkflowExecutionContext context)
{
var agentId = node.Properties.GetValueOrDefault("agentId")?.ToString();
var prompt = node.Properties.GetValueOrDefault("prompt")?.ToString();
if (string.IsNullOrEmpty(agentId))
{
throw new InvalidOperationException($"节点 {node.Name} 缺少 agentId");
}
// 获取智能体
var agent = _serviceProvider.GetService(typeof(IAIAgent)) as IAIAgent
?? throw new InvalidOperationException($"无法获取智能体: {agentId}");
// 准备输入
var input = context.GetValue<string>("input") ?? string.Empty;
if (!string.IsNullOrEmpty(prompt))
{
input = FormatPrompt(prompt, context.Values);
}
// 执行智能体
var response = await agent.ProcessAsync(context, input);
// 保存输出
context.SetValue($"output_{node.Id}", response);
context.SetValue("latest_output", response);
return new NodeExecutionResult
{
Success = true,
Output = response,
Data = new Dictionary<string, object>
{
{ "response", response }
}
};
}
private async Task<NodeExecutionResult> ExecuteToolNodeAsync(
WorkflowNode node,
WorkflowExecutionContext context)
{
var toolName = node.Properties.GetValueOrDefault("toolName")?.ToString();
var toolParameters = node.Properties.GetValueOrDefault("parameters") as Dictionary<string, object>;
if (string.IsNullOrEmpty(toolName))
{
throw new InvalidOperationException($"节点 {node.Name} 缺少 toolName");
}
// 解析参数
var resolvedParams = ResolveParameters(toolParameters, context.Values);
// 获取并执行工具
var tool = _serviceProvider.GetService(typeof(ITool)) as ITool
?? throw new InvalidOperationException($"无法获取工具: {toolName}");
var result = await tool.ExecuteAsync(resolvedParams);
context.SetValue($"output_{node.Id}", result);
return new NodeExecutionResult
{
Success = true,
Output = JsonSerializer.Serialize(result),
Data = new Dictionary<string, object>
{
{ "result", result }
}
};
}
private async Task<NodeExecutionResult> ExecuteConditionNodeAsync(
WorkflowNode node,
WorkflowDefinition workflow,
WorkflowExecutionContext context)
{
var condition = node.Properties.GetValueOrDefault("condition")?.ToString();
// 评估条件
var result = EvaluateCondition(condition, context.Values);
_logger.LogDebug("条件评估结果: {Condition} = {Result}", condition, result);
return new NodeExecutionResult
{
Success = true,
Output = result.ToString(),
Data = new Dictionary<string, object>
{
{ "conditionResult", result }
}
};
}
private async Task<NodeExecutionResult> ExecuteParallelNodeAsync(
WorkflowDefinition workflow,
WorkflowNode node,
WorkflowExecutionContext context)
{
var parallelNodeIds = node.Properties.GetValueOrDefault("parallelNodes") as List<string>
?? new List<string>();
var parallelNodes = workflow.Nodes
.Where(n => parallelNodeIds.Contains(n.Id))
.ToList();
var tasks = parallelNodes.Select(node => ExecuteNodeAsync(workflow, node, context, new WorkflowExecution()));
var results = await Task.WhenAll(tasks);
// 聚合结果
var aggregated = new Dictionary<string, object>();
for (int i = 0; i < results.Length; i++)
{
aggregated[parallelNodes[i].Id] = results[i];
}
context.SetValue($"output_{node.Id}", aggregated);
return new NodeExecutionResult
{
Success = true,
Output = "并行执行完成",
Data = aggregated
};
}
private string GetNextNode(
WorkflowDefinition workflow,
WorkflowNode currentNode,
NodeExecutionResult result,
WorkflowExecutionContext context)
{
var outgoingEdges = workflow.Edges
.Where(e => e.SourceNodeId == currentNode.Id)
.ToList();
// 如果没有出边,返回空(工作流结束)
if (!outgoingEdges.Any())
{
return null;
}
// 查找满足条件的边
foreach (var edge in outgoingEdges)
{
if (string.IsNullOrEmpty(edge.Condition))
{
return edge.TargetNodeId; // 默认边
}
if (EvaluateCondition(edge.Condition, context.Values))
{
return edge.TargetNodeId;
}
}
return outgoingEdges.FirstOrDefault()?.TargetNodeId;
}
private bool EvaluateCondition(string? condition, Dictionary<string, object> values)
{
if (string.IsNullOrEmpty(condition))
{
return true;
}
// 简化实现:支持简单的表达式
// 生产环境应使用表达式解析库
try
{
if (condition.Contains("=="))
{
var parts = condition.Split("==");
var left = ResolveValue(parts[0].Trim(), values);
var right = ResolveValue(parts[1].Trim(), values);
return left?.ToString() == right?.ToString();
}
if (condition.Contains("!="))
{
var parts = condition.Split("!=");
var left = ResolveValue(parts[0].Trim(), values);
var right = ResolveValue(parts[1].Trim(), values);
return left?.ToString() != right?.ToString();
}
// 默认返回true
return true;
}
catch
{
return false;
}
}
private object? ResolveValue(string expression, Dictionary<string, object> values)
{
if (expression.StartsWith("{{") && expression.EndsWith("}}"))
{
var key = expression.Substring(2, expression.Length - 4).Trim();
return values.GetValueOrDefault(key);
}
// 字面量
if (expression.StartsWith("'") && expression.EndsWith("'"))
{
return expression.Substring(1, expression.Length - 2);
}
if (bool.TryParse(expression, out var boolValue))
{
return boolValue;
}
if (int.TryParse(expression, out var intValue))
{
return intValue;
}
return expression;
}
private Dictionary<string, object> ResolveParameters(
Dictionary<string, object>? parameters,
Dictionary<string, object> values)
{
if (parameters == null)
{
return new Dictionary<string, object>();
}
var resolved = new Dictionary<string, object>();
foreach (var kvp in parameters)
{
resolved[kvp.Key] = ResolveValue(kvp.Value?.ToString() ?? "", values);
}
return resolved;
}
private void ValidateWorkflow(WorkflowDefinition workflow)
{
if (!workflow.Nodes.Any())
{
throw new InvalidOperationException("工作流不能为空");
}
if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.Start))
{
throw new InvalidOperationException("工作流必须包含开始节点");
}
if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.End))
{
throw new InvalidOperationException("工作流必须包含结束节点");
}
}
}
三、订单处理工作流示例
3.1 定义订单处理工作流
让我们通过一个实际的订单处理流程来演示工作流的用法:
// OrderProcessingWorkflow.cs
public class OrderProcessingWorkflow
{
public static WorkflowDefinition CreateOrderProcessingWorkflow()
{
var workflow = new WorkflowDefinition
{
Name = "订单处理流程",
Description = "完整的订单处理流程,包括验证、支付、库存、通知等步骤"
};
// 开始节点
var start = new WorkflowNode
{
Id = "start",
Name = "开始",
Type = WorkflowNodeType.Start,
Properties = new Dictionary<string, object>
{
{ "description", "接收订单请求" }
}
};
// 验证用户身份
var validateUser = new WorkflowNode
{
Id = "validate_user",
Name = "验证用户",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "ValidateUserTool" },
{ "parameters", new Dictionary<string, object>
{
{ "userId", "{{userId}}" }
}
}
}
};
// 检查用户是否有效
var checkUserValid = new WorkflowNode
{
Id = "check_user_valid",
Name = "检查用户有效性",
Type = WorkflowNodeType.Condition,
Properties = new Dictionary<string, object>
{
{ "condition", "{{user_valid}} == true" }
}
};
// 检查商品库存
var checkInventory = new WorkflowNode
{
Id = "check_inventory",
Name = "检查库存",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "CheckInventoryTool" },
{ "parameters", new Dictionary<string, object>
{
{ "productId", "{{productId}}" },
{ "quantity", "{{quantity}}" }
}
}
}
};
// 检查库存是否充足
var checkStockAvailable = new WorkflowNode
{
Id = "check_stock",
Name = "检查库存可用",
Type = WorkflowNodeType.Condition,
Properties = new Dictionary<string, object>
{
{ "condition", "{{stock_available}} == true" }
}
};
// 计算价格
var calculatePrice = new WorkflowNode
{
Id = "calculate_price",
Name = "计算价格",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "CalculatePriceTool" },
{ "parameters", new Dictionary<string, object>
{
{ "productId", "{{productId}}" },
{ "quantity", "{{quantity}}" },
{ "couponCode", "{{couponCode}}" }
}
}
}
};
// 处理支付
var processPayment = new WorkflowNode
{
Id = "process_payment",
Name = "处理支付",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "ProcessPaymentTool" },
{ "parameters", new Dictionary<string, object>
{
{ "userId", "{{userId}}" },
{ "amount", "{{total_price}}" },
{ "paymentMethod", "{{paymentMethod}}" }
}
}
}
};
// 检查支付结果
var checkPaymentSuccess = new WorkflowNode
{
Id = "check_payment",
Name = "检查支付结果",
Type = WorkflowNodeType.Condition,
Properties = new Dictionary<string, object>
{
{ "condition", "{{payment_success}} == true" }
}
};
// 锁定库存
var reserveInventory = new WorkflowNode
{
Id = "reserve_inventory",
Name = "锁定库存",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "ReserveInventoryTool" },
{ "parameters", new Dictionary<string, object>
{
{ "productId", "{{productId}}" },
{ "quantity", "{{quantity}}" }
}
}
}
};
// 创建订单
var createOrder = new WorkflowNode
{
Id = "create_order",
Name = "创建订单",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "CreateOrderTool" },
{ "parameters", new Dictionary<string, object>
{
{ "userId", "{{userId}}" },
{ "productId", "{{productId}}" },
{ "quantity", "{{quantity}}" },
{ "totalPrice", "{{total_price}}" }
}
}
}
};
// 发送通知
var sendNotification = new WorkflowNode
{
Id = "send_notification",
Name = "发送通知",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "SendNotificationTool" },
{ "parameters", new Dictionary<string, object>
{
{ "userId", "{{userId}}" },
{ "orderId", "{{orderId}}" },
{ "type", "order_created" }
}
}
}
};
// 订单创建成功
var orderSuccess = new WorkflowNode
{
Id = "order_success",
Name = "订单成功",
Type = WorkflowNodeType.End,
Properties = new Dictionary<string, object>
{
{ "description", "订单创建成功" }
}
};
// 库存不足
var outOfStock = new WorkflowNode
{
Id = "out_of_stock",
Name = "库存不足",
Type = WorkflowNodeType.End,
Properties = new Dictionary<string, object>
{
{ "description", "商品库存不足" }
}
};
// 用户无效
var userInvalid = new WorkflowNode
{
Id = "user_invalid",
Name = "用户无效",
Type = WorkflowNodeType.End,
Properties = new Dictionary<string, object>
{
{ "description", "用户验证失败" }
}
};
// 支付失败
var paymentFailed = new WorkflowNode
{
Id = "payment_failed",
Name = "支付失败",
Type = WorkflowNodeType.End,
Properties = new Dictionary<string, object>
{
{ "description", "支付处理失败" }
}
};
// 添加所有节点
workflow.Nodes.AddRange(new[]
{
start, validateUser, checkUserValid, checkInventory, checkStockAvailable,
calculatePrice, processPayment, checkPaymentSuccess, reserveInventory,
createOrder, sendNotification, orderSuccess, outOfStock, userInvalid, paymentFailed
});
// 添加边(工作流连接)
workflow.Edges.AddRange(new[]
{
// 开始 -> 验证用户
new WorkflowEdge { SourceNodeId = "start", TargetNodeId = "validate_user" },
// 验证用户 -> 检查用户有效性
new WorkflowEdge { SourceNodeId = "validate_user", TargetNodeId = "check_user_valid" },
// 检查用户有效性 -> 库存检查(用户有效)
new WorkflowEdge
{
SourceNodeId = "check_user_valid",
TargetNodeId = "check_inventory",
Condition = "{{user_valid}} == true"
},
// 检查用户有效性 -> 用户无效(用户无效)
new WorkflowEdge
{
SourceNodeId = "check_user_valid",
TargetNodeId = "user_invalid",
Condition = "{{user_valid}} == false"
},
// 库存检查 -> 检查库存可用
new WorkflowEdge { SourceNodeId = "check_inventory", TargetNodeId = "check_stock" },
// 检查库存可用 -> 计算价格(库存充足)
new WorkflowEdge
{
SourceNodeId = "check_stock",
TargetNodeId = "calculate_price",
Condition = "{{stock_available}} == true"
},
// 检查库存可用 -> 库存不足(库存不足)
new WorkflowEdge
{
SourceNodeId = "check_stock",
TargetNodeId = "out_of_stock",
Condition = "{{stock_available}} == false"
},
// 计算价格 -> 处理支付
new WorkflowEdge { SourceNodeId = "calculate_price", TargetNodeId = "process_payment" },
// 处理支付 -> 检查支付结果
new WorkflowEdge { SourceNodeId = "process_payment", TargetNodeId = "check_payment" },
// 检查支付结果 -> 锁定库存(支付成功)
new WorkflowEdge
{
SourceNodeId = "check_payment",
TargetNodeId = "reserve_inventory",
Condition = "{{payment_success}} == true"
},
// 检查支付结果 -> 支付失败(支付失败)
new WorkflowEdge
{
SourceNodeId = "check_payment",
TargetNodeId = "payment_failed",
Condition = "{{payment_success}} == false"
},
// 锁定库存 -> 创建订单
new WorkflowEdge { SourceNodeId = "reserve_inventory", TargetNodeId = "create_order" },
// 创建订单 -> 发送通知
new WorkflowEdge { SourceNodeId = "create_order", TargetNodeId = "send_notification" },
// 发送通知 -> 订单成功
new WorkflowEdge { SourceNodeId = "send_notification", TargetNodeId = "order_success" }
});
return workflow;
}
}
3.2 执行订单处理工作流
// OrderWorkflowService.cs
public class OrderWorkflowService
{
private readonly WorkflowExecutor _executor;
private readonly IWorkflowStore _workflowStore;
private readonly ILogger<OrderWorkflowService> _logger;
public OrderWorkflowService(
WorkflowExecutor executor,
IWorkflowStore workflowStore,
ILogger<OrderWorkflowService> logger)
{
_executor = executor;
_workflowStore = workflowStore;
_logger = logger;
}
public async Task<OrderResult> ProcessOrderAsync(OrderRequest request)
{
// 获取或创建工作流
var workflow = await _workflowStore.GetWorkflowAsync("order_processing");
if (workflow == null)
{
workflow = OrderProcessingWorkflow.CreateOrderProcessingWorkflow();
await _workflowStore.SaveWorkflowAsync(workflow);
}
// 创建执行上下文
var context = new WorkflowExecutionContext
{
ExecutionId = Guid.NewGuid().ToString(),
UserId = request.UserId,
Values = new Dictionary<string, object>
{
{ "userId", request.UserId },
{ "productId", request.ProductId },
{ "quantity", request.Quantity },
{ "couponCode", request.CouponCode ?? "" },
{ "paymentMethod", request.PaymentMethod },
{ "input", $"创建订单: {request.ProductId} x {request.Quantity}" }
}
};
_logger.LogInformation("开始处理订单: UserId={UserId}, ProductId={ProductId}",
request.UserId, request.ProductId);
try
{
var result = await _executor.ExecuteAsync(workflow, context);
if (result.Success)
{
var orderId = context.GetValue<string>("orderId");
_logger.LogInformation("订单处理成功: OrderId={OrderId}", orderId);
return new OrderResult
{
Success = true,
OrderId = orderId,
Message = "订单创建成功",
TotalPrice = context.GetValue<decimal>("total_price")
};
}
else
{
_logger.LogWarning("订单处理失败: Error={Error}", result.Error);
return new OrderResult
{
Success = false,
Message = result.Error ?? "订单处理失败"
};
}
}
catch (Exception ex)
{
_logger.LogError(ex, "订单处理异常: UserId={UserId}", request.UserId);
return new OrderResult
{
Success = false,
Message = "系统错误,请稍后重试"
};
}
}
}
四、动态工作流构建
4.1 可视化工作流设计器
在生产环境中,通常需要一个可视化的工作流设计器:
// WorkflowDesignerService.cs
public class WorkflowDesignerService
{
private readonly IWorkflowStore _workflowStore;
private readonly ILogger<WorkflowDesignerService> _logger;
public WorkflowDesignerService(
IWorkflowStore workflowStore,
ILogger<WorkflowDesignerService> logger)
{
_workflowStore = workflowStore;
_logger = logger;
}
// 创建新工作流
public async Task<WorkflowDefinition> CreateWorkflowAsync(CreateWorkflowRequest request)
{
var workflow = new WorkflowDefinition
{
Name = request.Name,
Description = request.Description,
Metadata = request.Metadata ?? new Dictionary<string, object>()
};
// 添加开始节点
workflow.Nodes.Add(new WorkflowNode
{
Id = "start",
Name = "开始",
Type = WorkflowNodeType.Start
});
// 添加结束节点
workflow.Nodes.Add(new WorkflowNode
{
Id = "end",
Name = "结束",
Type = WorkflowNodeType.End
});
await _workflowStore.SaveWorkflowAsync(workflow);
_logger.LogInformation("创建工作流: {Name}", workflow.Name);
return workflow;
}
// 添加节点
public async Task<WorkflowNode> AddNodeAsync(
string workflowId,
AddNodeRequest request)
{
var workflow = await _workflowStore.GetWorkflowAsync(workflowId)
?? throw new InvalidOperationException($"工作流不存在: {workflowId}");
var node = new WorkflowNode
{
Name = request.Name,
Type = request.NodeType,
Properties = request.Properties ?? new Dictionary<string, object>()
};
workflow.Nodes.Add(node);
await _workflowStore.SaveWorkflowAsync(workflow);
return node;
}
// 连接节点
public async Task ConnectNodesAsync(
string workflowId,
string sourceNodeId,
string targetNodeId,
string? condition = null)
{
var workflow = await _workflowStore.GetWorkflowAsync(workflowId)
?? throw new InvalidOperationException($"工作流不存在: {workflowId}");
// 验证节点存在
if (!workflow.Nodes.Any(n => n.Id == sourceNodeId))
{
throw new InvalidOperationException($"源节点不存在: {sourceNodeId}");
}
if (!workflow.Nodes.Any(n => n.Id == targetNodeId))
{
throw new InvalidOperationException($"目标节点不存在: {targetNodeId}");
}
// 添加边
var edge = new WorkflowEdge
{
SourceNodeId = sourceNodeId,
TargetNodeId = targetNodeId,
Condition = condition
};
workflow.Edges.Add(edge);
workflow.UpdatedAt = DateTime.UtcNow;
await _workflowStore.SaveWorkflowAsync(workflow);
_logger.LogInformation("连接节点: {Source} -> {Target}", sourceNodeId, targetNodeId);
}
// 验证工作流
public async Task<WorkflowValidationResult> ValidateWorkflowAsync(string workflowId)
{
var workflow = await _workflowStore.GetWorkflowAsync(workflowId)
?? throw new InvalidOperationException($"工作流不存在: {workflowId}");
var result = new WorkflowValidationResult { IsValid = true };
// 检查开始节点
if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.Start))
{
result.IsValid = false;
result.Errors.Add("工作流必须包含开始节点");
}
// 检查结束节点
if (!workflow.Nodes.Any(n => n.Type == WorkflowNodeType.End))
{
result.IsValid = false;
result.Errors.Add("工作流必须包含结束节点");
}
// 检查孤立节点
var connectedNodeIds = workflow.Edges
.SelectMany(e => new[] { e.SourceNodeId, e.TargetNodeId })
.Distinct()
.ToHashSet();
foreach (var node in workflow.Nodes)
{
if (!connectedNodeIds.Contains(node.Id) &&
node.Type != WorkflowNodeType.Start &&
node.Type != WorkflowNodeType.End)
{
result.Warnings.Add($"节点 {node.Name} 未连接到任何其他节点");
}
}
// 检查循环引用
if (HasCircularReference(workflow))
{
result.IsValid = false;
result.Errors.Add("工作流存在循环引用");
}
return result;
}
private bool HasCircularReference(WorkflowDefinition workflow)
{
// 简化实现:检查是否存在从End回到Start的路径
var visited = new HashSet<string>();
var path = new List<string>();
var startNode = workflow.Nodes.FirstOrDefault(n => n.Type == WorkflowNodeType.Start);
if (startNode == null) return false;
return CanReachNode(workflow, startNode.Id, "start", visited, path);
}
private bool CanReachNode(
WorkflowDefinition workflow,
string currentNodeId,
string targetNodeId,
HashSet<string> visited,
List<string> path)
{
if (currentNodeId == targetNodeId && path.Count > 0)
{
return true;
}
if (visited.Contains(currentNodeId))
{
return false;
}
visited.Add(currentNodeId);
path.Add(currentNodeId);
var outgoingEdges = workflow.Edges.Where(e => e.SourceNodeId == currentNodeId);
foreach (var edge in outgoingEdges)
{
if (CanReachNode(workflow, edge.TargetNodeId, targetNodeId, visited, path))
{
return true;
}
}
path.RemoveAt(path.Count - 1);
return false;
}
}
五、工作流版本管理
5.1 版本控制实现
// WorkflowVersioningService.cs
public class WorkflowVersioningService
{
private readonly IWorkflowStore _workflowStore;
private readonly ILogger<WorkflowVersioningService> _logger;
public WorkflowVersioningService(
IWorkflowStore workflowStore,
ILogger<WorkflowVersioningService> logger)
{
_workflowStore = workflowStore;
_logger = logger;
}
// 创建新版本
public async Task<WorkflowVersion> CreateVersionAsync(
string workflowId,
string changeDescription)
{
var workflow = await _workflowStore.GetWorkflowAsync(workflowId)
?? throw new InvalidOperationException($"工作流不存在: {workflowId}");
// 获取当前版本号
var currentVersion = await GetLatestVersionAsync(workflowId);
var newVersionNumber = currentVersion + 1;
// 创建版本快照
var version = new WorkflowVersion
{
Id = Guid.NewGuid().ToString(),
WorkflowId = workflowId,
VersionNumber = newVersionNumber,
Definition = JsonSerializer.Serialize(workflow),
ChangeDescription = changeDescription,
CreatedAt = DateTime.UtcNow,
CreatedBy = "system"
};
// 保存版本
await _workflowStore.SaveVersionAsync(version);
_logger.LogInformation("创建工作流版本: WorkflowId={WorkflowId}, Version={Version}",
workflowId, newVersionNumber);
return version;
}
// 回滚到指定版本
public async Task<WorkflowDefinition> RollbackToVersionAsync(
string workflowId,
int versionNumber)
{
var version = await _workflowStore.GetVersionAsync(workflowId, versionNumber)
?? throw new InvalidOperationException($"版本不存在: {versionNumber}");
var workflow = JsonSerializer.Deserialize<WorkflowDefinition>(version.Definition)
?? throw new InvalidOperationException("无法解析工作流定义");
// 更新当前工作流
workflow.UpdatedAt = DateTime.UtcNow;
await _workflowStore.SaveWorkflowAsync(workflow);
_logger.LogInformation("回滚工作流: WorkflowId={WorkflowId}, Version={Version}",
workflowId, versionNumber);
return workflow;
}
// 获取版本历史
public async Task<List<WorkflowVersion>> GetVersionHistoryAsync(string workflowId)
{
return await _workflowStore.GetVersionHistoryAsync(workflowId);
}
// 获取最新版本号
private async Task<int> GetLatestVersionAsync(string workflowId)
{
var versions = await _workflowStore.GetVersionHistoryAsync(workflowId);
return versions.Any() ? versions.Max(v => v.VersionNumber) : 0;
}
}
六、工作流监控与日志
6.1 执行监控
// WorkflowMonitor.cs
public class WorkflowMonitor
{
private readonly IMetrics _metrics;
private readonly ILogger<WorkflowMonitor> _logger;
public WorkflowMonitor(IMetrics metrics, ILogger<WorkflowMonitor> logger)
{
_metrics = metrics;
_logger = logger;
}
public void RecordExecutionStarted(string workflowName, string executionId)
{
_metrics.IncrementCounter("workflow.executions.started");
_metrics.IncrementCounter($"workflow.executions.started.{workflowName}");
_logger.LogInformation("工作流开始执行: Workflow={Workflow}, ExecutionId={ExecutionId}",
workflowName, executionId);
}
public void RecordExecutionCompleted(
string workflowName,
string executionId,
TimeSpan duration,
bool success)
{
_metrics.RecordHistogram("workflow.execution.duration", duration.TotalMilliseconds);
_metrics.RecordHistogram(
$"workflow.execution.duration.{workflowName}",
duration.TotalMilliseconds);
if (success)
{
_metrics.IncrementCounter("workflow.executions.completed");
_metrics.IncrementCounter($"workflow.executions.completed.{workflowName}");
}
else
{
_metrics.IncrementCounter("workflow.executions.failed");
_metrics.IncrementCounter($"workflow.executions.failed.{workflowName}");
}
_logger.LogInformation(
"工作流执行完成: Workflow={Workflow}, ExecutionId={ExecutionId}, Duration={Duration}ms, Success={Success}",
workflowName, executionId, duration.TotalMilliseconds, success);
}
public void RecordNodeExecution(
string workflowName,
string nodeName,
TimeSpan duration,
bool success)
{
_metrics.RecordHistogram(
$"workflow.node.execution.duration.{workflowName}.{nodeName}",
duration.TotalMilliseconds);
if (!success)
{
_metrics.IncrementCounter(
$"workflow.node.execution.failed.{workflowName}.{nodeName}");
}
}
public void RecordRetry(string workflowName, string nodeName, int attemptNumber)
{
_metrics.IncrementCounter("workflow.retries");
_metrics.IncrementCounter($"workflow.retries.{workflowName}.{nodeName}");
_logger.LogWarning(
"工作流重试: Workflow={Workflow}, Node={Node}, Attempt={Attempt}",
workflowName, nodeName, attemptNumber);
}
}
七、实战:客服工作流
7.1 智能客服工作流
// CustomerServiceWorkflow.cs
public class CustomerServiceWorkflow
{
public static WorkflowDefinition CreateCustomerServiceWorkflow()
{
var workflow = new WorkflowDefinition
{
Name = "智能客服工作流",
Description = "处理客户咨询、问题分类、转接等流程"
};
// 开始
var start = new WorkflowNode
{
Id = "start",
Name = "接收咨询",
Type = WorkflowNodeType.Start
};
// 理解用户意图
var understandIntent = new WorkflowNode
{
Id = "understand_intent",
Name = "理解意图",
Type = WorkflowNodeType.Agent,
Properties = new Dictionary<string, object>
{
{ "agentId", "intent_classifier" },
{ "prompt", "分析用户意图,分类为:订单查询、退款申请、技术支持、其他" }
}
};
// 意图分类
var classifyIntent = new WorkflowNode
{
Id = "classify_intent",
Name = "意图分类",
Type = WorkflowNodeType.Switch,
Properties = new Dictionary<string, object>
{
{ "expression", "{{intent}}" }
}
};
// 订单查询流程
var handleOrder = new WorkflowNode
{
Id = "handle_order",
Name = "处理订单查询",
Type = WorkflowNodeType.SubWorkflow,
Properties = new Dictionary<string, object>
{
{ "subWorkflowId", "order_inquiry" }
}
};
// 退款流程
var handleRefund = new WorkflowNode
{
Id = "handle_refund",
Name = "处理退款",
Type = WorkflowNodeType.SubWorkflow,
Properties = new Dictionary<string, object>
{
{ "subWorkflowId", "refund_process" }
}
};
// 技术支持流程
var handleTechSupport = new WorkflowNode
{
Id = "handle_tech_support",
Name = "技术支持",
Type = WorkflowNodeType.SubWorkflow,
Properties = new Dictionary<string, object>
{
{ "subWorkflowId", "tech_support" }
}
};
// 其他问题
var handleOther = new WorkflowNode
{
Id = "handle_other",
Name = "处理其他问题",
Type = WorkflowNodeType.Agent,
Properties = new Dictionary<string, object>
{
{ "agentId", "general_support_agent" }
}
};
// 满意度调查
var satisfactionSurvey = new WorkflowNode
{
Id = "satisfaction_survey",
Name = "满意度调查",
Type = WorkflowNodeType.Tool,
Properties = new Dictionary<string, object>
{
{ "toolName", "SatisfactionSurveyTool" }
}
};
// 结束
var end = new WorkflowNode
{
Id = "end",
Name = "结束",
Type = WorkflowNodeType.End
};
// 添加节点
workflow.Nodes.AddRange(new[]
{
start, understandIntent, classifyIntent,
handleOrder, handleRefund, handleTechSupport, handleOther,
satisfactionSurvey, end
});
// 添加边
workflow.Edges.AddRange(new[]
{
new WorkflowEdge { SourceNodeId = "start", TargetNodeId = "understand_intent" },
new WorkflowEdge { SourceNodeId = "understand_intent", TargetNodeId = "classify_intent" },
// 意图分支
new WorkflowEdge
{
SourceNodeId = "classify_intent",
TargetNodeId = "handle_order",
Properties = new Dictionary<string, object> { { "case", "order_inquiry" } }
},
new WorkflowEdge
{
SourceNodeId = "classify_intent",
TargetNodeId = "handle_refund",
Properties = new Dictionary<string, object> { { "case", "refund" } }
},
new WorkflowEdge
{
SourceNodeId = "classify_intent",
TargetNodeId = "handle_tech_support",
Properties = new Dictionary<string, object> { { "case", "tech_support" } }
},
new WorkflowEdge
{
SourceNodeId = "classify_intent",
TargetNodeId = "handle_other",
Properties = new Dictionary<string, object> { { "case", "other" } }
},
// 汇合到满意度调查
new WorkflowEdge { SourceNodeId = "handle_order", TargetNodeId = "satisfaction_survey" },
new WorkflowEdge { SourceNodeId = "handle_refund", TargetNodeId = "satisfaction_survey" },
new WorkflowEdge { SourceNodeId = "handle_tech_support", TargetNodeId = "satisfaction_survey" },
new WorkflowEdge { SourceNodeId = "handle_other", TargetNodeId = "satisfaction_survey" },
new WorkflowEdge { SourceNodeId = "satisfaction_survey", TargetNodeId = "end" }
});
return workflow;
}
}
八、总结与展望
通过本文的学习,我们已经掌握了工作流编排的核心技术:
✅ 工作流基础:理解工作流编排的概念和重要性
✅ 图引擎实现:基于DAG的工作流定义和执行
✅ 节点类型:Agent、Tool、Condition、Switch、Parallel等节点
✅ 订单处理示例:完整的订单处理工作流实现
✅ 动态工作流:可视化工作流设计器
✅ 版本控制:工作流的版本管理和回滚
✅ 监控日志:工作流执行的监控和日志记录
关键收获:
工作流编排是构建复杂业务系统的核心能力。通过合理的工作流设计,可以将复杂的业务逻辑分解为可管理的步骤,实现业务流程的自动化和可视化。在实际应用中,需要根据业务复杂度选择合适的工作流方案,并重视版本管理和监控。
下一篇文章预告:
在第七篇文章中,我们将探索自定义工具与中间件开发。我们将学习如何扩展Agent Framework的功能,创建自定义工具和中间件,以满足特定的业务需求。
实践建议:
从简单的工作流开始,逐步增加复杂性
为工作流添加完善的错误处理和重试机制
重视工作流的版本管理,便于问题排查和回滚
实现充分的监控和日志,便于运维和问题分析
相关资源:
Agent Framework工作流文档
DAG算法详解
工作流模式参考
"好的工作流设计让复杂的业务逻辑变得清晰可控。"
