[db:标题]

摘要:前言 在物联网(IoT)蓬勃发展的今天,MQTT 协议已经成为设备通信的事实标准。无论是智能家居、工业自动化还是车联网,MQTT 都扮演着至关重要的角色。今天,我要为大家介绍一个完全使用 C# 实现的高性能 MQTT 库 这个库不仅提供了完
前言 在物联网(IoT)蓬勃发展的今天,MQTT 协议已经成为设备通信的事实标准。无论是智能家居、工业自动化还是车联网,MQTT 都扮演着至关重要的角色。今天,我要为大家介绍一个完全使用 C# 实现的高性能 MQTT 库 这个库不仅提供了完整的 MQTT 客户端实现,还包含了一个功能齐全的 Broker 服务器,支持桥接、集群等企业级特性。 核心特性 协议支持 MQTT 3.1.1 - 完整支持 MQTT 5.0 - 完整支持(包括用户属性、消息过期、主题别名等新特性) MQTT-SN - 基于 UDP 的轻量级 MQTT 变体,适合受限设备 CoAP - 约束应用协议网关支持 性能特性 高性能异步实现 零不必要的内存分配 缓冲区池技术 支持 10000+ 并发连接 企业级功能 Broker 桥接(多 Broker 消息同步) 集群支持(去中心化 P2P 架构) 灵活的认证与授权机制 TLS/SSL 加密传输 持久会话与离线消息存储 框架支持 .NET 6.0 .NET 8.0 .NET 10.0 技术实现 本项目采用了大量现代 .NET 高性能技术,下面详细介绍核心技术点。 内存管理技术 Span<T> 和 Memory<T> - 零拷贝处理 项目使用 ref struct 实现的二进制读写器,完全在栈上分配,避免堆内存压力: // 零拷贝的二进制读取器 public ref struct MqttBinaryReader { private readonly ReadOnlySpan<byte> _buffer; private int _position; // 零拷贝切片操作 [MethodImpl(MethodImplOptions.AggressiveInlining)] public ReadOnlySpan<byte> ReadBytes(int count) { var span = _buffer.Slice(_position, count); _position += count; return span; } } 技术优势: ref struct 只能在栈上分配,无 GC 压力 ReadOnlySpan<byte> 支持零拷贝切片 避免大量字节数组复制操作 ArrayPool<T> - 缓冲区复用 使用共享内存池减少频繁的内存分配: // 从共享池租借缓冲区 var buffer = ArrayPool<byte>.Shared.Rent(1024); try { await stream.ReadAsync(buffer.AsMemory(0, length), cancellationToken); // 处理数据... } finally { ArrayPool<byte>.Shared.Return(buffer); // 归还缓冲区 } stackalloc - 小缓冲区栈分配 对于小型临时缓冲区,直接在栈上分配: // 4 字节的可变长度编码缓冲区,栈分配 Span<byte> remainingLengthBytes = stackalloc byte[4]; var size = EncodeRemainingLength(length, remainingLengthBytes); 异步编程模型 async/await + ConfigureAwait 所有 IO 操作均采用异步模式,并使用 ConfigureAwait(false) 优化: public async Task<MqttConnectResult> ConnectAsync(CancellationToken cancellationToken = default) { // 建立 TCP 连接 await _tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false); // TLS 握手 if (Options.UseTls) { await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false); } // 发送 CONNECT 报文 await SendPacketAsync(connectPacket, cancellationToken).ConfigureAwait(false); } Channel<T> - 高性能事件队列 Broker 使用有界通道实现非阻塞的事件分发: public sealed class MqttBrokerEventDispatcher { private readonly Channel<BrokerEvent> _eventChannel; public MqttBrokerEventDispatcher(int capacity = 10000) { // 有界通道,队列满时丢弃最旧事件 _eventChannel = Channel.CreateBounded<BrokerEvent>(new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.DropOldest, SingleReader = true, SingleWriter = false }); } // 非阻塞事件发送 public void Dispatch<TEventArgs>(BrokerEventType type, TEventArgs args, EventHandler<TEventArgs>? handler) { _eventChannel.Writer.TryWrite(new BrokerEvent(type, args, handler)); } } TaskCompletionSource - 请求/响应模式 实现 QoS 1/2 的确认等待机制: private readonly Dictionary<ushort, TaskCompletionSource<object?>> _pendingPackets = new(); private async Task<object?> WaitForPacketAsync(ushort packetId, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously); _pendingPackets[packetId] = tcs; using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(TimeSpan.FromSeconds(30)); // 30 秒超时 using var registration = cts.Token.Register(() => tcs.TrySetCanceled()); return await tcs.Task.ConfigureAwait(false); } SemaphoreSlim - 发送同步 确保报文发送的串行化: private readonly SemaphoreSlim _sendLock = new(1, 1); private async Task SendPacketBytesAsync(byte[] packet, CancellationToken cancellationToken) { await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { await _stream.WriteAsync(packet.AsMemory(), cancellationToken).ConfigureAwait(false); await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); } finally { _sendLock.Release(); } } 编译器优化 MethodImpl 特性 针对不同场景使用合适的编译器优化指令: // 强制内联 - 用于频繁调用的短方法 [MethodImpl(MethodImplOptions.AggressiveInlining)] public ushort ReadUInt16() { var value = (ushort)((_buffer[_position] << 8) | _buffer[_position + 1]); _position += 2; return value; } // 最积极的优化 - 用于热路径 [MethodImpl(MethodImplOptions.AggressiveOptimization)] private static async Task<int> DecodeRemainingLengthAsync(Stream stream, CancellationToken ct) { // 可变长度解码实现... } // 禁止内联 - 避免异常处理代码膨胀热路径 [MethodImpl(MethodImplOptions.NoInlining)] private void ThrowIfDisposed() { if (_disposed) throw new ObjectDisposedException(nameof(MqttClient)); } 网络编程 多层传输抽象 支持 TCP、UDP 等多种传输方式: public interface ITransportConnection : IAsyncDisposable { string ConnectionId { get; } TransportType TransportType { get; } EndPoint? RemoteEndPoint { get; } bool IsConnected { get; } ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default); ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default); ValueTask FlushAsync(CancellationToken cancellationToken = default); } TLS/SSL 支持 使用 SslStream 实现加密传输,支持 TLS 1.2 和 TLS 1.3: var sslOptions = new SslClientAuthenticationOptions { TargetHost = Options.Host, EnabledSslProtocols = SslProtocols.Tls12 | SslProtocols.Tls13, ClientCertificates = Options.ClientCertificate != null ? new X509CertificateCollection { Options.ClientCertificate } : null }; await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken); 协议序列化 工厂模式 + 延迟初始化 协议处理器采用单例 + 延迟初始化模式: public static class MqttProtocolHandlerFactory { private static readonly Lazy<IMqttProtocolHandler> _v311Handler = new(() => new V311ProtocolHandler()); private static readonly Lazy<IMqttProtocolHandler> _v500Handler = new(() => new V500ProtocolHandler()); public static IMqttProtocolHandler GetHandler(MqttProtocolVersion version) { return version switch { MqttProtocolVersion.V311 => _v311Handler.Value, MqttProtocolVersion.V500 => _v500Handler.Value, _ => throw new NotSupportedException() }; } } 可变长度整数编码 MQTT 协议特有的可变长度编码,1-4 字节可表示 0 到 268,435,455: public uint ReadVariableByteInteger() { uint value = 0; int multiplier = 1; byte encodedByte; do { encodedByte = _buffer[_position++]; value += (uint)((encodedByte & 0x7F) * multiplier); multiplier *= 128; } while ((encodedByte & 0x80) != 0); return value; } 并发数据结构 ConcurrentDictionary - 线程安全集合 用于管理客户端会话和订阅: private readonly ConcurrentDictionary<string, MqttClientSession> _sessions = new(); private readonly ConcurrentDictionary<string, MqttApplicationMessage> _retainedMessages = new(); public int ConnectedClients => _sessions.Count; public IEnumerable<MqttClientSession> Sessions => _sessions.Values; 设计模式应用 模式 应用场景 示例 工厂模式 协议处理器创建 MqttProtocolHandlerFactory 策略模式 不同协议版本实现 V311ProtocolHandler / V500ProtocolHandler 建造者模式 报文构建 IPublishPacketBuilder / IConnectPacketBuilder 观察者模式 事件系统 MessageReceived / ClientConnected 装饰器模式 传输层 TLS SslStream 装饰 NetworkStream 单例模式 协议处理器缓存 全局共享的处理器实例 技术栈总结 类别 技术 作用 内存管理 Span<T>, Memory<T>, ref struct, ArrayPool<T>, stackalloc 零拷贝、栈分配、缓冲区复用 异步编程 async/await, Channel<T>, TaskCompletionSource, SemaphoreSlim 高效并发、非阻塞事件处理 编译优化 AggressiveInlining, AggressiveOptimization, NoInlining JIT 编译器优化提示 网络层 TcpClient, TcpListener, SslStream, 传输抽象 多协议支持、安全传输 并发集合 ConcurrentDictionary, ConcurrentQueue 线程安全的数据结构 序列化 自定义二进制读写器、可变长度编码 高效的协议解析 性能优化建议 客户端优化 选择合适的 QoS:大多数场景 QoS 1 就足够了,QoS 2 开销较大 批量发送:如果有大量消息,考虑合并后发送 合理设置 KeepAlive:根据网络环境调整,一般 60 秒即可 使用持久会话:如果需要接收离线消息,设置 CleanSession = false Broker 优化 调整最大连接数:根据服务器性能设置 MaxConnections 限制消息大小:设置 MaxMessageSize 防止恶意大消息 离线消息限制:设置 MaxOfflineMessagesPerClient 防止内存溢出 使用集群:高可用场景使用集群部署 客户端使用指南 基础连接 using System.Net.MQTT; // 配置客户端选项 var options = new MqttClientOptions { Host = "localhost", Port = 1883, ClientId = "my-iot-device", CleanSession = true }; // 创建客户端 using var client = new MqttClient(options); // 连接到 Broker var result = await client.ConnectAsync(); if (result.IsSuccess) { Console.WriteLine("连接成功!"); } 订阅主题 // 订阅单个主题 await client.SubscribeAsync("sensors/temperature", MqttQualityOfService.AtLeastOnce); // 使用通配符订阅多个主题 await client.SubscribeAsync("sensors/#", MqttQualityOfService.AtLeastOnce); // 多级通配符 await client.SubscribeAsync("sensors/+/status", MqttQualityOfService.AtMostOnce); // 单级通配符 接收消息 client.MessageReceived += (sender, e) => { Console.WriteLine($"收到消息:"); Console.WriteLine($" 主题: {e.Message.Topic}"); Console.WriteLine($" 内容: {e.Message.PayloadAsString}"); Console.WriteLine($" QoS: {e.Message.QualityOfService}"); }; 发布消息 // 简单发布 await client.PublishAsync("sensors/temperature", "25.5"); // 指定 QoS 发布 await client.PublishAsync("sensors/humidity", "60%", MqttQualityOfService.AtLeastOnce); // 发布保留消息 await client.PublishAsync("device/status", "online", MqttQualityOfService.AtLeastOnce, retain: true); // 使用完整的消息对象 var message = MqttApplicationMessage.Create( topic: "sensors/data", payload: "{\"temp\": 25.5, \"humidity\": 60}", qos: MqttQualityOfService.ExactlyOnce, retain: false ); await client.PublishAsync(message); 遗嘱消息(Last Will) 遗嘱消息会在客户端异常断开时自动发布: var options = new MqttClientOptions { Host = "localhost", ClientId = "my-device", WillMessage = MqttApplicationMessage.Create( topic: "devices/my-device/status", payload: "offline", qos: MqttQualityOfService.AtLeastOnce, retain: true ) }; TLS 加密连接 var options = new MqttClientOptions { Host = "secure-broker.example.com", Port = 8883, UseTls = true, // 可选:客户端证书 ClientCertificate = new X509Certificate2("client.pfx", "password") }; 自动重连 var options = new MqttClientOptions { Host = "localhost", AutoReconnect = true, ReconnectDelayMs = 5000 // 5秒后重连 }; client.Connected += (s, e) => Console.WriteLine("已连接"); client.Disconnected += (s, e) => Console.WriteLine("连接断开,正在重连..."); 完整客户端示例 using System.Net.MQTT; var options = new MqttClientOptions { Host = "localhost", Port = 1883, ClientId = $"client-{Guid.NewGuid():N}", Username = "user", Password = "password", CleanSession = true, KeepAliveSeconds = 60, AutoReconnect = true }; using var client = new MqttClient(options); // 设置事件处理 client.Connected += (s, e) => Console.WriteLine("[事件] 已连接到 Broker"); client.Disconnected += (s, e) => Console.WriteLine("[事件] 连接已断开"); client.MessageReceived += (s, e) => { Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}"); }; // 连接 var result = await client.ConnectAsync(); if (!result.IsSuccess) { Console.WriteLine($"连接失败: {result.ReasonCode}"); return; } // 订阅 await client.SubscribeAsync("test/#", MqttQualityOfService.AtLeastOnce); // 发布测试消息 for (int i = 0; i < 10; i++) { await client.PublishAsync("test/counter", i.ToString()); await Task.Delay(1000); } // 断开连接 await client.DisconnectAsync(); 服务器(Broker)使用指南 启动基础 Broker using System.Net.MQTT.Broker; var options = new MqttBrokerOptions { Port = 1883, AllowAnonymous = true, EnableRetainedMessages = true, MaxConnections = 10000 }; using var broker = new MqttBroker(options); // 启动服务器 await broker.StartAsync(); Console.WriteLine("MQTT Broker 已启动,监听端口 1883"); // 保持运行 await Task.Delay(Timeout.Infinite); // 停止服务器 await broker.StopAsync(); 配置认证 // 使用简单认证器 broker.Authenticator = new SimpleAuthenticator() .AddUser("admin", "admin123") .AddUser("device1", "device1pass") .AddUser("device2", "device2pass"); var options = new MqttBrokerOptions { Port = 1883, AllowAnonymous = false // 禁用匿名访问 }; 自定义认证器 public class MyAuthenticator : IMqttAuthenticator { public Task<MqttAuthenticationResult> AuthenticateAsync( MqttAuthenticationContext context, CancellationToken cancellationToken) { // 从数据库验证用户 if (ValidateFromDatabase(context.Username, context.Password)) { return Task.FromResult(MqttAuthenticationResult.Success()); } return Task.FromResult(MqttAuthenticationResult.Failure( MqttConnectReasonCode.BadUserNameOrPassword)); } } broker.Authenticator = new MyAuthenticator(); Broker 事件处理 // 客户端连接事件 broker.ClientConnected += (s, e) => { Console.WriteLine($"[连接] 客户端 {e.Session.ClientId} 已连接"); Console.WriteLine($" 地址: {e.Session.RemoteEndpoint}"); Console.WriteLine($" 当前连接数: {broker.ConnectedClients}"); }; // 客户端断开事件 broker.ClientDisconnected += (s, e) => { Console.WriteLine($"[断开] 客户端 {e.Session.ClientId} 已断开"); }; // 客户端订阅事件 broker.ClientSubscribed += (s, e) => { Console.WriteLine($"[订阅] {e.Session.ClientId} 订阅了 {e.TopicFilter}"); }; // 消息发布事件 broker.MessagePublished += (s, e) => { Console.WriteLine($"[消息] {e.Message.Topic}: {e.Message.PayloadAsString}"); Console.WriteLine($" 来自: {e.SourceClientId}"); }; // 消息发布前拦截(可以阻止消息发布) broker.MessagePublishing += (s, e) => { // 检查敏感主题 if (e.Message.Topic.StartsWith("admin/") && e.SourceClientId != "admin") { e.Cancel = true; // 阻止非管理员发布到 admin 主题 } }; TLS 配置 var options = new MqttBrokerOptions { // 普通端口 Port = 1883, // TLS 端口 UseTls = true, TlsPort = 8883, ServerCertificate = new X509Certificate2("server.pfx", "password"), RequireClientCertificate = false }; 高级功能 Broker 桥接 桥接功能允许将多个 Broker 连接起来,实现消息的跨 Broker 同步。 var broker = new MqttBroker(new MqttBrokerOptions { Port = 2883 }); // 添加桥接到父 Broker var bridge = broker.AddBridge(new MqttBridgeOptions { Name = "parent-bridge", RemoteHost = "parent-broker.example.com", RemotePort = 1883, ClientId = "bridge-client-1", // 上行规则:本地消息 -> 远程 Broker UpstreamRules = { new MqttBridgeRule { LocalTopicFilter = "sensor/#", Enabled = true }, new MqttBridgeRule { LocalTopicFilter = "device/+/data", Enabled = true } }, // 下行规则:远程消息 -> 本地 DownstreamRules = { new MqttBridgeRule { LocalTopicFilter = "commands/#", Enabled = true }, new MqttBridgeRule { LocalTopicFilter = "config/#", Enabled = true } } }); // 桥接事件 bridge.Connected += (s, e) => Console.WriteLine("桥接已连接"); bridge.MessageForwarded += (s, e) => { var direction = e.Direction == BridgeDirection.Upstream ? "上行" : "下行"; Console.WriteLine($"[桥接-{direction}] {e.OriginalTopic}"); }; // 获取统计信息 var stats = bridge.GetStatistics(); Console.WriteLine($"上行消息: {stats.UpstreamMessageCount}"); Console.WriteLine($"下行消息: {stats.DownstreamMessageCount}"); 集群部署 集群功能实现了去中心化的 P2P 架构,任何节点都可以独立运行,支持自动故障检测和恢复。 var broker = new MqttBroker(new MqttBrokerOptions { Port = 1883 }); // 启用集群 broker.EnableCluster(new MqttClusterOptions { NodeId = "node-1", ClusterName = "my-cluster", ClusterPort = 11883, SeedNodes = new List<string> { "node2.example.com:11883", "node3.example.com:11883" }, HeartbeatIntervalMs = 5000, NodeTimeoutMs = 15000, EnableDeduplication = true // 防止消息重复 }); // 集群事件 broker.Cluster!.PeerJoined += (s, e) => Console.WriteLine($"节点加入: {e.Peer.NodeId}"); broker.Cluster!.PeerLeft += (s, e) => Console.WriteLine($"节点离开: {e.Peer.NodeId}"); broker.Cluster!.MessageForwarded += (s, e) => Console.WriteLine($"消息转发: {e.Topic}"); await broker.StartAsync(); MQTT-SN 网关 MQTT-SN 是基于 UDP 的轻量级协议,适合资源受限的嵌入式设备: var options = new MqttBrokerOptions { Port = 1883, EnableMqttSn = true, MqttSnPort = 1885 }; CoAP 网关 CoAP 网关允许 CoAP 设备与 MQTT 生态系统互通: var options = new MqttBrokerOptions { Port = 1883, EnableCoAP = true, CoapPort = 5683, CoapMqttPrefix = "mqtt" }; // CoAP 客户端可以通过以下方式访问 MQTT 主题: // GET coap://broker:5683/mqtt/sensors/temperature // PUT coap://broker:5683/mqtt/sensors/temperature (发布消息) QoS 服务质量 MQTT 定义了三种服务质量级别: QoS 名称 说明 适用场景 0 At Most Once 最多一次,不保证送达 传感器数据,丢失可接受 1 At Least Once 至少一次,可能重复 重要数据,可处理重复 2 Exactly Once 恰好一次,保证送达且不重复 计费、订单等关键数据 // QoS 0 - 最多一次 await client.PublishAsync("sensor/temp", "25", MqttQualityOfService.AtMostOnce); // QoS 1 - 至少一次 await client.PublishAsync("alert/fire", "detected", MqttQualityOfService.AtLeastOnce); // QoS 2 - 恰好一次 await client.PublishAsync("order/create", orderJson, MqttQualityOfService.ExactlyOnce); 主题通配符 通配符 说明 示例 + 匹配单个层级 sensor/+/temp 匹配 sensor/room1/temp # 匹配多个层级 sensor/# 匹配 sensor/room1/temp 和 sensor/room1/humidity // 订阅所有房间的温度 await client.SubscribeAsync("sensor/+/temperature", MqttQualityOfService.AtLeastOnce); // 订阅所有传感器数据 await client.SubscribeAsync("sensor/#", MqttQualityOfService.AtLeastOnce); MQTT 5.0 新特性 如果你使用 MQTT 5.0 协议,可以利用以下新特性: var options = new MqttClientOptions { Host = "localhost", ProtocolVersion = MqttProtocolVersion.V500 // 使用 MQTT 5.0 }; // 创建带有 MQTT 5.0 属性的消息 var message = MqttApplicationMessage.CreateWithProperties( topic: "request/data", payload: "{\"query\": \"temperature\"}", qos: MqttQualityOfService.AtLeastOnce, retain: false, // MQTT 5.0 特有属性 responseTopic: "response/client1", // 响应主题 correlationData: Encoding.UTF8.GetBytes("req-123"), // 关联数据 messageExpiryInterval: 60, // 消息60秒后过期 contentType: "application/json", // 内容类型 userProperties: new List<MqttUserProperty> // 用户自定义属性 { new MqttUserProperty("version", "1.0"), new MqttUserProperty("source", "sensor-hub") } ); await client.PublishAsync(message); 总结 是一个功能完整、性能优秀的 .NET MQTT 库,具有以下优势: 完整的协议支持:MQTT 3.1.1、MQTT 5.0、MQTT-SN、CoAP 全覆盖 高性能设计:异步 IO、零分配、缓冲区池 企业级特性:桥接、集群、认证授权 易于使用:简洁的 API 设计,丰富的示例代码 现代化:支持最新的 .NET 版本 无论你是构建 IoT 平台、实现设备通信,还是搭建消息中间件,这个库都能满足你的需求。 相关链接 源码地址:https://github.com/hnlyf1688/mqtt