如何构建飞书.NET SDK事件处理的去重与幂等性机制?

摘要:飞书事件处理过程中如何让你的应用不再"重复劳动",如何用三层防护筑起安全墙,结合内存与 Redis 双重保障,让你的飞书应用稳如磐石——不再重复处理,告别混乱状态。 为什么需要&a
飞书事件处理过程中如何让你的应用不再"重复劳动",如何用三层防护筑起安全墙,结合内存与 Redis 双重保障,让你的飞书应用稳如磐石——不再重复处理,告别混乱状态。 为什么需要"去重"? 想象一下这样的场景: 你在飞书里收到一条消息,应用收到通知后创建了待办事项。但因为网络不稳定,飞书以为你没收到,又发了一遍同样的通知——结果呢?你的应用又创建了一次待办,同一个任务出现了两次。 这就是我们所说的"重复处理"问题。 什么时候会出现这种情况? 在飞书事件驱动的世界里,以下情况都可能导致同一事件被多次送达: 📡 网络波动:飞书服务器没收到你的确认,于是重发 🔄 服务重启:内存清空,之前的事件又来了 👥 多实例运行:多个实例同时收到同一事件 🔌 断线重连:WebSocket 重连后可能重复消息 真实案例:一分钟内的混乱 时间线: ───────────────────────────────────────────────────────────── 09:00:00 飞书推送:收到一条新消息 09:00:01 实例A 接收并处理 → 创建待办 ✅ 09:00:05 飞书没收到确认,再次推送 09:00:06 实例B 接收并处理 → 又创建待办 ❌ ───────────────────────────────────────────────────────────── 后果有多严重? 问题 实际影响 📉 数据重复 用户收到两条相同的待办 💰 资金损失 订单重复扣款,退钱都退不完 📧 骚扰用户 同一条通知发十次 🔄 状态混乱 数据库里说"已处理",实际只做了一半 三层防护:像保安一样层层把关 Mud.Feishu SDK 设计了一套三层递进式防护机制,就像小区的三道岗哨,从外到内层层把关,确保不会有"坏人"(重复事件)混进来。 graph TB subgraph AppLayer["应用层去重(业务键)"] AppHandler["IdempotentFeishuEventHandler<T>"] AppDesc["基于业务主键(消息ID、订单ID等)去重"] end subgraph DispatchLayer["分发层去重(EventId)"] EventDedup["IFeishuEventDeduplicator / IFeishuEventDistributedDeduplicator"] EventDesc["基于飞书事件ID去重,24小时窗口期"] end subgraph ProtocolLayer["协议层去重(SeqID/Nonce)"] WS_Dedup["WebSocket: IFeishuSeqIDDeduplicator"] Webhook_Dedup["Webhook: IFeishuNonceDistributedDeduplicator"] ProtoDesc["基于消息序列号去重,过滤重复消息"] end AppHandler -->|处理器内部业务逻辑| EventDedup EventDedup -->|事件路由与分发| WS_Dedup EventDedup -->|事件路由与分发| Webhook_Dedup style AppLayer fill:#e1f5ff style DispatchLayer fill:#fff4e1 style ProtocolLayer fill:#ffe1e1 这三层分别负责什么? 可以把这三层想象成工厂流水线上的三个质检员: 质检员 检查什么? 在哪检查? 过滤范围 适合什么时候用? 协议层 消息编号(SeqID) 消息刚到达时 每条消息 过滤网络重复,最外层防护 分发层 事件ID(EventId) 事件分发前 每个事件 过滤飞书重发,中间层防护 应用层 业务键(TaskId) 业务处理时 每次业务操作 防止逻辑重复,最后一道防线 分发层:事件的"身份证检查" 这是最核心的一层,就像门口的保安,每个进来的事件都要先出示身份证(EventId),保安查过记录后才能放行。 public interface IFeishuEventDeduplicator { /// <summary> /// 尝试标记事件为处理中 /// </summary> /// <returns> /// true: 已处理或正在处理中(跳过) /// false: 新事件(继续处理) /// </returns> bool TryMarkAsProcessing(string eventId); /// <summary> /// 标记事件为已完成 /// </summary> void MarkAsCompleted(string eventId); /// <summary> /// 回滚处理中状态(异常时调用) /// </summary> void RollbackProcessing(string eventId); /// <summary> /// 检查事件是否已处理 /// </summary> bool IsProcessed(string eventId); } 事件的一生:三种状态流转 每个事件在去重系统里都像过安检一样,经历三种状态的变化: stateDiagram-v2 [*] --> Pending: 收到新事件 Pending --> Processing: TryMarkAsProcessing() Processing --> Completed: 处理成功 Processing --> Pending: 超时/异常(Rollback) Completed --> Pending: 缓存过期(24小时后) note right of Processing 处理中超时(默认5分钟) 允许重新处理 end note note right of Completed 24小时后自动清理 支持TTL配置 end note 真实场景:WebSocket 是怎么去重的? 来看看 WebSocket 收到事件后做了什么(FeishuEventMessageHandler.cs#104-147): // 1. 去重检查 bool isProcessing = false; if (_options.EnableDistributedDeduplication && _distributedDeduplicator != null) { // 优先使用分布式去重(Redis) isProcessing = await _distributedDeduplicator.TryMarkAsProcessedAsync( eventData.EventId, cancellationToken: cancellationToken); } else if (_options.EnableEventDeduplication && _deduplicator != null) { // 使用内存去重 isProcessing = _deduplicator.TryMarkAsProcessing(eventData.EventId); } // 2. 跳过已处理事件 if (isProcessing) { _logger.LogDebug("事件 {EventId} 已在处理中或已处理,跳过", eventData.EventId); return; } // 3. 处理事件 try { await _eventHandlerFactory.HandleEventParallelAsync( eventData.EventType, eventData, cancellationToken); // 4. 处理成功,标记为已完成 if (_options.EnableEventDeduplication && _deduplicator != null) { _deduplicator.MarkAsCompleted(eventData.EventId); } } catch (Exception ex) { // 5. 处理失败,回滚状态 if (_options.EnableEventDeduplication && _deduplicator != null) { _deduplicator.RollbackProcessing(eventData.EventId); } throw; } 方案一:内存去重(适合开发和小规模应用) 就像在大脑里记笔记:把每个事件ID记在内存里,收到新事件时先查查笔记,如果有就跳过。 特点: 🧠 全靠脑子记:所有数据存在内存里 ⏰ 记24小时:超过时间就自动忘掉 🧹 定期打扫:每5分钟清理一次过期的记录 ⏱️ 超时保护:处理超过5分钟还没完成,就允许重试 核心代码(FeishuEventDeduplicator.cs#86-135): public bool TryMarkAsProcessing(string eventId) { lock (_lock) { // 检查是否已存在 if (_eventCache.TryGetValue(eventId, out var entry)) { // 如果已处理,返回 true(跳过) if (entry.Status == DeduplicationStatus.Completed) return true; // 如果已在处理中,检查是否超时 if (entry.Status == DeduplicationStatus.Processing) { if (DateTimeOffset.UtcNow - entry.ProcessedAt > _processingTimeout) { // 处理中超时,允许重新处理 _logger?.LogWarning("事件 {EventId} 处理中超时,允许重新处理", eventId); _eventCache.Remove(eventId); // 继续处理 } else { // 仍在处理中,跳过 return true; } } } // 标记为处理中 _eventCache[eventId] = new EventCacheEntry { ProcessedAt = DateTimeOffset.UtcNow, EventId = eventId, Status = DeduplicationStatus.Processing }; return false; // 未处理,新事件 } } 方案二:Redis 分布式去重(生产环境首选) 就像共享笔记本:用 Redis 把去重记录记在外部,所有实例都能查到。就算重启服务,笔记本还在,不会忘记。 特点: 📒 写在共享笔记本上:所有实例一起看 🔄 自动翻页:到期自动清理,不用管 🤝 大家一起用:多实例部署没问题 ⚡ 原子操作:SETNX + EXPIRE 确保不会写错 核心代码(RedisFeishuEventDistributedDeduplicator.cs#53-89): public async Task<bool> TryMarkAsProcessedAsync( string eventId, TimeSpan? ttl = null, CancellationToken cancellationToken = default) { var actualTtl = ttl ?? _defaultCacheExpiration; var redisKey = $"{_keyPrefix}{eventId}"; // 使用 SETNX + EXPIRE 实现原子性去重 // 仅当键不存在时设置,并设置过期时间 var setResult = await _database.StringSetAsync( redisKey, "1", actualTtl, When.NotExists); if (!setResult) { _logger?.LogDebug("事件 {EventId} 已处理过,跳过", eventId); return true; // 已处理 } _logger?.LogDebug("事件 {EventId} 标记为已处理,TTL: {Ttl}", eventId, actualTtl); return false; // 未处理,新事件 } 两种方案怎么选? 对比项 内存去重(自己记) Redis 去重(共享笔记本) 速度 ⚡ 像闪电一样快(直接读内存) 🚀 很快(但需要网络) 可靠性 ⚠️ 重启就忘光 ✅ 永远记着,重启也还在 多实例 ❌ 每个人各记各的 ✅ 大家一起看同一本笔记 麻烦程度 🟢 零依赖,开箱即用 🟡 需要部署 Redis 适合谁 🏠 开发环境、单机运行 🏢 生产环境、多实例部署 协议层:消息的"排队号检查" WebSocket 的 SeqID 是什么? 飞书 WebSocket 用的是 ProtoBuf 二进制协议,每条消息都带着一个递增的序号,就像去银行办事拿的排队号。通过记住已经处理过的排队号,就能在消息层面就把重复的挡在外面。 SeqID 去重怎么工作? public interface IFeishuSeqIDDeduplicator { /// <summary> /// 尝试标记 SeqID 为已处理 /// </summary> /// <returns> /// true: 已处理过(跳过) /// false: 新消息(继续处理) /// </returns> bool TryMarkAsProcessed(ulong seqId); /// <summary> /// 检查 SeqID 是否已处理 /// </summary> bool IsProcessed(ulong seqId); /// <summary> /// 异步检查 SeqID 是否已处理 /// </summary> Task<bool> IsProcessedAsync(ulong seqId); /// <summary> /// 清空缓存 /// </summary> void ClearCache(); /// <summary> /// 获取缓存中的 SeqID 数量 /// </summary> int GetCacheCount(); /// <summary> /// 获取已处理的最大 SeqID /// </summary> ulong GetMaxProcessedSeqId(); } 实现机制 graph LR A[收到ProtoBuf消息] --> B{SeqID检查} B -->|已处理| C[跳过消息] B -->|未处理| D[记录SeqID] D --> E[处理消息] E --> F[发送ACK确认] D --> G[更新最大SeqID] G --> H[定时清理过期SeqID] 核心实现 内存实现(FeishuSeqIDDeduplicator.cs): public bool TryMarkAsProcessed(ulong seqId) { lock (_lock) { // 检查是否已存在 if (_processedSeqIds.Contains(seqId)) { _logger?.LogDebug("SeqID {SeqId} 已处理过,跳过", seqId); return true; // 已处理 } // 记录新 SeqID _processedSeqIds.Add(seqId); _seqIdTimestamps[seqId] = DateTimeOffset.UtcNow; // 更新最大 SeqID if (seqId > _maxProcessedSeqId) { _maxProcessedSeqId = seqId; } return false; // 未处理,新消息 } } 使用位置(BinaryMessageProcessor.cs#186-194): // SeqID 去重检查 if (_seqIdDeduplicator != null && _seqIdDeduplicator.TryMarkAsProcessed(frame.SeqID)) { _logger.LogDebug("SeqID {SeqID} 已处理过,跳过", frame.SeqID); eventArgs.SkipReason = $"SeqID {frame.SeqID} 已处理过"; BinaryMessageReceived?.Invoke(this, eventArgs); // 仍然发送ACK确认 await SendAckMessageAsync(frame, true, cancellationToken); return; } 特性总结 ✅ 基于 HashSet 高效查找:O(1) 查询复杂度 ✅ 自动跟踪最大 SeqID:支持顺序性验证 ✅ 24小时过期机制:定期清理历史数据 ✅ 内存友好:仅存储 SeqID,不存储完整消息 Webhook 的 Nonce:防重放攻击的秘密武器 飞书 Webhook 的请求里带有一个 nonce(随机数),这就像一次性密码——用过的密码就不能再用了,这样就能防止"重放攻击"(坏人拿同一个请求重复发送)。 防重放攻击流程 攻击者尝试重放请求: ───────────────────────────────────────────────────────────── 原始请求:Nonce="abc123", Timestamp="1234567890" ✅ 正常处理(首次接收) 重放请求:Nonce="abc123", Timestamp="1234567890" ❌ 拒绝处理(Nonce 已记录) ───────────────────────────────────────────────────────────── Redis 实现 public class RedisFeishuNonceDistributedDeduplicator : IFeishuNonceDistributedDeduplicator { private readonly IDatabase _database; private readonly TimeSpan _defaultTtl = TimeSpan.FromMinutes(5); // 5分钟过期 public async Task<bool> TryMarkAsProcessedAsync(string nonce, CancellationToken cancellationToken = default) { var redisKey = $"{_keyPrefix}{nonce}"; // SETNX + EXPIRE:5分钟内重复 nonce 会被拒绝 var setResult = await _database.StringSetAsync( redisKey, "1", _defaultTtl, When.NotExists); if (!setResult) { _logger?.LogWarning("检测到重复的 Nonce: {Nonce},可能为重放攻击", nonce); return true; // 已处理,拒绝 } return false; // 首次处理 } } 应用层:给每个业务操作发"身份证" 为什么还需要这一层? 就算前面两层都挡住了,业务逻辑还是有可能重复执行,比如: 🔀 同一事件触发了多个处理器(并行处理) 🔄 处理器内部多次访问同一个资源 📡 第三方接口重试导致重复调用 实际的幂等性实现方式 在实际的 Mud.Feishu SDK 中,业务层幂等性主要通过以下两种方式实现: 方式1:使用 DefaultFeishuObjectEventHandler<T>(推荐) SDK 提供了 DefaultFeishuObjectEventHandler<T> 基类,专门用于处理对象类型的事件。你只需要继承这个基类,并在业务逻辑中检查数据是否已存在即可。 方式2:使用 IdempotentFeishuEventHandler<T> SDK 还提供了一个 IdempotentFeishuEventHandler<T> 基类,提供了基于业务键的自动去重能力。你需要重写 GetBusinessKey() 方法和 HandleEventInternalAsync() 方法,基类会自动处理业务去重。 手把手教你用:三个实际案例 案例 1:消息处理——防止重复创建待办 public class MessageEventHandler : DefaultFeishuObjectEventHandler<MessageReceiveResult> { private readonly IMessageService _messageService; public MessageEventHandler( ILogger<MessageEventHandler> logger, IMessageService messageService) : base(logger) { _messageService = messageService; } protected override async Task ProcessBusinessLogicAsync( EventData eventData, ObjectEventResult<MessageReceiveResult>? messageData, CancellationToken cancellationToken = default) { if (eventData == null) throw new ArgumentNullException(nameof(eventData)); // 检查消息是否已处理(业务层幂等性) var messageId = messageData?.Object.MessageId; if (string.IsNullOrEmpty(messageId)) { _logger.LogWarning("消息ID为空,跳过处理"); return; } var alreadyProcessed = await _messageService.IsMessageProcessedAsync(messageId, cancellationToken); if (alreadyProcessed) { _logger.LogInformation("消息 {MessageId} 已处理,跳过", messageId); return; } // 处理消息 await _messageService.ProcessMessageAsync(messageData!.Object, cancellationToken); } } 案例 2:部门处理——避免重复创建 public class DepartmentCreatedEventHandler : DefaultFeishuObjectEventHandler<DepartmentCreatedResult> { private readonly IDepartmentService _departmentService; public DepartmentCreatedEventHandler( ILogger<DepartmentCreatedEventHandler> logger, IDepartmentService departmentService) : base(logger) { _departmentService = departmentService; } protected override async Task ProcessBusinessLogicAsync( EventData eventData, ObjectEventResult<DepartmentCreatedResult>? departmentData, CancellationToken cancellationToken = default) { if (eventData == null) throw new ArgumentNullException(nameof(eventData)); if (departmentData?.Object == null) { _logger.LogWarning("部门数据为空,跳过处理"); return; } // 检查部门是否已存在(业务层幂等性) var departmentId = departmentData.Object.DepartmentId; var exists = await _departmentService.ExistsAsync(departmentId, cancellationToken); if (exists) { _logger.LogInformation("部门 {DepartmentId} 已存在,跳过创建", departmentId); return; } // 创建部门 await _departmentService.CreateAsync(departmentData.Object, cancellationToken); } } 案例 3:用户创建——避免重复注册 public class UserCreatedEventHandler : DefaultFeishuObjectEventHandler<UserCreatedResult> { private readonly IUserService _userService; public UserCreatedEventHandler( ILogger<UserCreatedEventHandler> logger, IUserService userService) : base(logger) { _userService = userService; } protected override async Task ProcessBusinessLogicAsync( EventData eventData, ObjectEventResult<UserCreatedResult>? userData, CancellationToken cancellationToken = default) { if (eventData == null) throw new ArgumentNullException(nameof(eventData)); if (userData?.Object == null) { _logger.LogWarning("用户数据为空,跳过处理"); return; } // 检查用户是否已存在(业务层幂等性) var userId = userData.Object.UserId; var exists = await _userService.ExistsAsync(userId, cancellationToken); if (exists) { _logger.LogInformation("用户 {UserId} 已存在,跳过创建", userId); return; } // 创建用户 await _userService.CreateAsync(userData.Object, cancellationToken); } } 怎么设计业务键?有套路吗? 业务场景 业务键长什么样? 为什么这么设计? 收到消息 im.message.receive_v1:om_xxxxxxxxxx 事件类型 + 消息ID,一眼看出是哪条消息 创建部门 contact.department.created_v3:od_xxxxxxxxxx 事件类型 + 部门ID,避免和其他ID冲突 创建用户 contact.user.created_v3:ou_xxxxxxxxxx 事件类型 + 用户ID,唯一标识用户 删除部门 contact.department.deleted_v3:od_xxxxxxxxxx 事件类型 + 部门ID,处理删除事件 消息已读 im.message.message_read_v1:ou_xxxxxxxxxx:om_xxxxxxxxxx 事件类型 + 用户ID + 消息ID 四条黄金法则: 🔑 唯一性:一个操作对应一个键,不能有两个操作撞车 👀 可读性:看日志时能快速知道这是什么 🧱 稳定性:别用时间戳这种会变的字段做键 ✂️ 简洁性:别写太长,省内存、好查询 配置实战:开发 vs 生产环境 WebSocket 怎么配置? // 使用建造者模式配置 WebSocket builder.Services.AddFeishuWebSocketServiceBuilder(configuration) .ConfigureOptions(options => { // 事件去重配置 options.EnableEventDeduplication = true; // 启用事件去重 options.EnableDistributedDeduplication = false; // 单机场景使用内存去重 options.EventDeduplicationCacheExpirationMs = 86400000; // 24小时过期 options.EventDeduplicationCleanupIntervalMs = 300000; // 5分钟清理间隔 }); 生产环境必备配置(一定要看!) // 1. 配置 Redis(在 appsettings.json 中) // { // "Feishu": { // "Redis": { // "ConnectionString": "your-redis-server" // } // } // } // 2. 注册 Redis 服务和去重服务 builder.Services .AddFeishuRedis() .AddFeishuRedisDeduplicators(); // 3. 启用分布式去重 builder.Services.AddFeishuWebSocketServiceBuilder(configuration) .ConfigureOptions(options => { options.EnableDistributedDeduplication = true; // 启用分布式去重 options.EventDeduplicationCacheExpirationMs = 86400000; // 24小时 }); Webhook 配置 // 注册 Webhook 服务 builder.Services.AddFeishuWebhookServiceBuilder(configuration) .ConfigureOptions(options => { options.VerificationToken = "your_verification_token"; options.EncryptKey = "your_encrypt_key"; options.EventHandlingTimeoutMs = 30000; // 30秒超时 options.MaxConcurrentEvents = 10; // 最大并发数 }); // 注册 Redis 去重 builder.Services .AddFeishuRedis() .AddFeishuRedisEventDeduplicator(); 一张表看懂开发与生产的区别 配置项 开发环境 生产环境 说明 EnableEventDeduplication true true 始终启用内存去重 EnableDistributedDeduplication false true 生产环境启用Redis去重 EventDeduplicationCacheExpirationMs 3600000 (1小时) 86400000 (24小时) 生产环境延长窗口期 EventDeduplicationCleanupIntervalMs 60000 (1分钟) 300000 (5分钟) 生产环境降低清理频率 完整配置:从零到上线一次搞定 public void ConfigureServices(IServiceCollection services) { // 配置 Redis(appsettings.json 中配置) services .AddFeishuRedis() .AddFeishuRedisDeduplicators(); // 注册 WebSocket 服务 services.AddFeishuWebSocketServiceBuilder(configuration) .ConfigureOptions(wsOptions => { wsOptions.EnableEventDeduplication = true; wsOptions.EnableDistributedDeduplication = true; wsOptions.EventDeduplicationCacheExpirationMs = 86400000; wsOptions.AutoReconnect = true; wsOptions.MaxReconnectAttempts = 5; wsOptions.HeartbeatIntervalMs = 30000; }) .AddHandler<MessageEventHandler>() .AddHandler<DepartmentCreatedEventHandler>() .Build(); // 注册 Webhook 服务 services.AddFeishuWebhookServiceBuilder(configuration) .ConfigureOptions(webhookOptions => { webhookOptions.VerificationToken = "your_token"; webhookOptions.EncryptKey = "your_key"; webhookOptions.EventHandlingTimeoutMs = 30000; webhookOptions.MaxConcurrentEvents = 20; }) .AddHandler<UserCreatedEventHandler>() .Build(); } 还有个贴心功能:配置错误自动提醒 SDK 会自动检查配置,如果你把去重功能全关了,它会直接报错提醒你: // FeishuWebSocketOptions.Validate() 自动执行 // SDK 会在服务启动时检查配置,如果去重功能全关闭会抛出警告 踩坑指南:五个常见问题和解决方案 问题 1:服务重启,重复事件又来了 现场是这样的: 现象: 09:00 服务接收 EventId="evt_123" ✅ 处理成功 09:05 服务重启(内存缓存清空) 09:06 飞书重发 EventId="evt_123" ❌ 重复处理 为什么? 🧠 内存去重就像短期记忆,重启就忘光了 📡 飞书没收到你的确认,以为你没收到,继续发 怎么解决? ✅ 方案1:启用 Redis 分布式去重(推荐) // 配置 Redis(appsettings.json 中配置连接信息) services .AddFeishuRedis() .AddFeishuRedisDeduplicators(); services.AddFeishuWebSocketServiceBuilder(configuration) .ConfigureOptions(options => { options.EnableDistributedDeduplication = true; // 使用Redis options.EnableEventDeduplication = false; // 禁用内存去重 }); ✅ 方案2:实现去重状态持久化 public class PersistentEventDeduplicator : IFeishuEventDeduplicator { private readonly IDatabase _database; public bool TryMarkAsProcessing(string eventId) { // 尝试写入数据库 var exists = _database.EventExists(eventId); if (!exists) { _database.MarkProcessing(eventId); return false; } return true; } } 问题 2:多实例一起跑,重复处理挡不住 现场是这样的: 实例A 和 实例B 同时启动 飞书推送 EventId="evt_123" 实例A 收到 → 处理 ✅ 实例B 收到 → 处理 ❌ (重复!) 为什么? 🧠 每个实例都有自己独立的"小本本" 🚫 实例之间互相看不到对方的记录 怎么解决? ✅ 必须使用 Redis 分布式去重 // 所有实例连接到同一个 Redis(在配置文件中配置) services .AddFeishuRedis() .AddFeishuRedisDeduplicators(); services.AddFeishuWebSocketServiceBuilder(configuration) .ConfigureOptions(options => { options.EnableDistributedDeduplication = true; }); 一张图看懂区别: graph TB subgraph Wrong["❌ 错误架构(内存去重)"] A1["实例A<br/>内存缓存A<br/>evt_123 ✅"] A2["实例B<br/>内存缓存B<br/>evt_123 ❌"] end subgraph Right["✅ 正确架构(Redis去重)"] B1["实例A"] B2["实例B"] Redis["Redis 去重<br/>evt_123 ✅"] end B1 --> Redis B2 --> Redis style Wrong fill:#ffebee style Right fill:#e8f5e9 style Redis fill:#e3f2fd 问题 3:处理超时,重复事件趁虚而入 现场是这样的: 09:00 开始处理 EventId="evt_123" 09:01 处理超时(超时30秒) 09:01 回滚 Processing 状态 09:02 飞书重发 EventId="evt_123" 09:02 重复处理 ❌ (实际已成功!) 为什么? ⏱️ 超时后系统以为没完成,把"处理中"标记撤回了 ✅ 但业务可能已经做完了,只是响应慢了一点点 怎么解决? ✅ 方案1:增加最终一致性检查 public class DepartmentCreatedEventHandler : DefaultFeishuObjectEventHandler<DepartmentCreatedResult> { private readonly IDepartmentService _departmentService; protected override async Task ProcessBusinessLogicAsync( EventData eventData, ObjectEventResult<DepartmentCreatedResult>? departmentData, CancellationToken ct) { if (departmentData?.Object == null) return; var departmentId = departmentData.Object.DepartmentId; // 先检查部门是否已存在(幂等性) var existingDepartment = await _departmentService.GetAsync(departmentId, ct); if (existingDepartment != null) { _logger.LogInformation("部门 {DepartmentId} 已存在,跳过创建", departmentId); return; } // 创建部门 await _departmentService.CreateAsync(departmentData.Object, ct); } } ✅ 方案2:实现处理续期机制 public class LongRunningTaskHandler : DefaultFeishuEventHandler { private readonly IFeishuEventDeduplicator _deduplicator; protected override async Task ProcessBusinessLogicAsync(EventData eventData, CancellationToken ct) { var eventId = eventData.EventId; // 定期续期处理时间(每30秒) var renewalTask = Task.Run(async () => { while (!ct.IsCancellationRequested) { await Task.Delay(30000, ct); _deduplicator.RenewProcessing(eventId); } }, ct); try { await ProcessLongRunningTask(eventData, ct); } finally { await renewalTask; } } } 问题 4:Redis 宕机,去重防护全失效 现场是这样的: Redis 宕机 服务无法调用 TryMarkAsProcessedAsync() 返回 false(允许处理) 多个实例重复处理同一事件 ❌ 为什么? 🚫 Redis 挂了,代码为了不阻塞选择"允许处理" ❌ 结果重复事件全涌进来 怎么解决? ✅ 方案1:实现降级到内存去重 public class HybridEventDeduplicator : IFeishuEventDeduplicator { private readonly IFeishuEventDistributedDeduplicator _redis; private readonly IFeishuEventDeduplicator _memory; public bool TryMarkAsProcessing(string eventId) { try { // 优先使用 Redis return _redis.TryMarkAsProcessedAsync(eventId).GetAwaiter().GetResult(); } catch (Exception ex) { _logger.LogError(ex, "Redis去重失败,降级到内存去重"); // 降级到内存去重 return _memory.TryMarkAsProcessing(eventId); } } } ✅ 方案2:配置熔断机制 public class CircuitBreakerEventDeduplicator : IFeishuEventDeduplicator { private readonly CircuitBreaker _circuitBreaker; private int _failureCount; private DateTime _lastFailureTime; public bool TryMarkAsProcessing(string eventId) { if (_circuitBreaker.IsOpen()) { _logger.LogWarning("Redis去重熔断,拒绝处理"); return true; // 拒绝处理,保护下游 } try { var result = _redis.TryMarkAsProcessedAsync(eventId).GetAwaiter().GetResult(); _circuitBreaker.RecordSuccess(); return result; } catch (Exception ex) { _circuitBreaker.RecordFailure(); throw; } } } 问题 5:缓存越积越多,内存快爆了 现场是这样的: 服务运行24小时 去重缓存条目:100万+ 内存占用:超过500MB GC压力增大,性能下降 为什么? 📈 高频事件(比如接收消息)一天能产生几十万条记录 🕐 清理间隔太长,旧记录堆积如山 怎么解决? ✅ 方案1:缩短缓存过期时间 services.AddFeishuWebSocketServiceBuilder(configuration) .ConfigureOptions(options => { // 根据实际业务调整 options.EventDeduplicationCacheExpirationMs = 3600000; // 缩短为1小时 options.EventDeduplicationCleanupIntervalMs = 60000; // 提高清理频率为1分钟 }); ✅ 方案2:使用 LRU 缓存 public class LRUEventDeduplicator : IFeishuEventDeduplicator { private readonly LRUCache<string, EventCacheEntry> _cache; private const int MaxCacheSize = 10000; // 最多保留1万条 public LRUEventDeduplicator() { _cache = new LRUCache<string, EventCacheEntry>(MaxCacheSize); } public bool TryMarkAsProcessing(string eventId) { if (_cache.TryGetValue(eventId, out var entry)) return true; _cache.Add(eventId, new EventCacheEntry { /* ... */ }); return false; } } ✅ 方案3:业务层优化 // 某些高频事件不需要去重 public class SystemEventHandler : DefaultFeishuEventHandler { // 不继承 IdempotentFeishuEventHandler // 直接处理,减少业务层去重压力 protected override Task ProcessBusinessLogicAsync(EventData eventData, CancellationToken ct) { // 仅记录日志,不进行业务操作 _logger.LogInformation("系统事件:{EventType}", eventData.EventType); return Task.CompletedTask; } } 给系统做体检:监控与调试 采集什么数据最有效? public class DeduplicatorMetrics { public long TotalProcessed { get; set; } public long DuplicateSkipped { get; set; } public long ProcessingTimeout { get; set; } public long CacheHitCount { get; set; } public long CacheMissCount { get; set; } public double DuplicateRate => TotalProcessed > 0 ? (double)DuplicateSkipped / TotalProcessed : 0; public double CacheHitRate => (CacheHitCount + CacheMissCount) > 0 ? (double)CacheHitCount / (CacheHitCount + CacheMissCount) : 0; } 暴露一个接口,随时查看健康状态 [ApiController] [Route("api/[controller]")] public class DeduplicatorController : ControllerBase { private readonly IFeishuEventDeduplicator _deduplicator; [HttpGet("stats")] public ActionResult<DeduplicatorStats> GetStats() { return Ok(new DeduplicatorStats { CacheCount = _deduplicator.GetCacheCount(), MaxProcessedSeqId = _seqIdDeduplicator?.GetMaxProcessedSeqId(), DuplicateRate = _metrics.DuplicateRate, CacheHitRate = _metrics.CacheHitRate }); } [HttpGet("check/{eventId}")] public ActionResult<bool> CheckEventId(string eventId) { var isProcessed = _deduplicator.IsProcessed(eventId); return Ok(new { eventId, isProcessed }); } } 日志怎么写才容易排查问题? public class EnhancedEventDeduplicator : IFeishuEventDeduplicator { private readonly ILogger<EnhancedEventDeduplicator> _logger; public bool TryMarkAsProcessing(string eventId) { lock (_lock) { if (_eventCache.TryGetValue(eventId, out var entry)) { // 详细记录去重命中原因 _logger.LogInformation( "[Deduplication] EventId={EventId} 已处理," + "Status={Status}, ProcessedAt={ProcessedAt}", eventId, entry.Status, entry.ProcessedAt); return true; } _logger.LogDebug("[Deduplication] EventId={EventId} 新事件", eventId); // ... return false; } } } 快速回顾与行动清单 三层防护一表览 去重层级 去重依据 实现方式 推荐配置 协议层 SeqID / Nonce IFeishuSeqIDDeduplicator / IFeishuNonceDistributedDeduplicator 始终启用 分发层 EventId IFeishuEventDeduplicator / IFeishuEventDistributedDeduplicator 生产环境启用 Redis 应用层 数据存在性检查 DefaultFeishuObjectEventHandler<T> 中的业务逻辑 按需实现 还想了解更多? 📖 飞书官方文档:事件订阅 💻 MudFeishu GitHub 仓库 💻 MudFeishu Gitee 仓库 🔒 Redis 分布式锁最佳实践