如何实现低延迟WebRTC视频流与WebSocket状态驱动的大屏可视化?

摘要:Vue3 :封装 WebRTC 低延迟视频流与 WebSocket 实时状态驱动的大屏可视化 在工业互联网和智慧安防领域,实时监控大屏是核心业务场景之一。本文将分享在最近的“油罐车作业智能监控系统”中,如何利用 Vue3 &#x2B
Vue3 :封装 WebRTC 低延迟视频流与 WebSocket 实时状态驱动的大屏可视化 在工业互联网和智慧安防领域,实时监控大屏是核心业务场景之一。本文将分享在最近的“油罐车作业智能监控系统”中,如何利用 Vue3 + TypeScript 技术栈,实现低延迟的 WebRTC 视频流播放,以及基于 WebSocket 的全链路作业状态实时同步。 一、 业务背景与要求 我们公司需要开发一个监控大屏,实时展示油罐车在卸油作业过程中的监控画面,并同步显示 AI 识别出的作业状态(如:是否佩戴安全帽、是否连接静电球、卸油操作步骤等),原本是打算采用 videojs 来实现视频播放,但是在开发中发现,videojs 的延迟较高(3-10 秒),无法满足实时风控需求,后来使用了别的一些视频播放库,如 hls.js、flv.js 等,但是这些库的延迟也较高(1-3 秒),无法达到业主要求,最后去了解了下直播用的啥插件,尝试了了下 webRtc 效果还不错。 什么是 WebRTC? WebRTC (Web Real-Time Communication)是一项开源技术,旨在让浏览器和移动应用通过简单的 API 实现实时音视频通信和数据传输,而无需安装任何插件。它由 Google、Mozilla、Opera 等巨头推动,已成为 W3C 和 IETF 的国际标准。 WebRTC 的核心在于点对点 (P2P)通信能力。不同于传统的流媒体技术(如 HLS、RTMP)通常需要经过服务器中转和缓存,WebRTC 允许两个客户端直接建立连接,从而极大地降低了延迟。 核心用法: 信令交换 (Signaling):虽然 WebRTC 是 P2P 的,但在建立连接前,双方需要通过一个“中间人”(信令服务器,通常使用 WebSocket,用普通的 http 请求也可以)来交换元数据。 SDP (Session Description Protocol):交换媒体能力信息(如编码格式、分辨率)。双方通过 Offer 和 Answer 模式进行协商。 ICE (Interactive Connectivity Establishment):交换网络地址候选者 (ICE Candidates),用于穿越 NAT/防火墙建立连接。 建立连接:通过 RTCPeerConnection API 建立 P2P 通道。 媒体流传输:连接建立后,音视频流直接在两端传输,延迟通常控制在 500ms 以内。 关于 webRtc 信令交换原理,和更多用途,可参考管网(https://webrtc.org.cn/)。 技术优势: 低延迟:WebRTC 基于 P2P 通信,延迟通常在 500ms 以内,满足实时监控需求。 跨平台:支持所有现代浏览器(如 Chrome、Firefox、Safari)和移动应用(如 Android、iOS)。 无需插件:无需安装任何插件,直接在浏览器中运行。 安全:所有通信均在 HTTPS 环境下进行,确保数据隐私。 二、 WebRTC 播放器的优雅封装 为了复用逻辑并隔离底层复杂度,我封装了一个 WebRTCPlayer 类,专门负责与信令服务器交互和流媒体渲染。 1. 核心类设计 (WebRTCPlayer.ts) 我用 WebSocket 作为信令通道,设计了一套信令交互协议。 class WebRTCPlayer { ws: WebSocket | null = null; pc: RTCPeerConnection | null = null; pendingCandidates: any[] = []; // 暂存的 ICE 候选者,等待远程描述设置完成后添加 isConnecting = false; // 是否正在连接中 videoElement: HTMLVideoElement; // 视频播放元素 serverUrl: string; // WebSocket 信令服务器地址 taskId: string; // 任务ID,用于标识视频流 rtcConfig: RTCConfiguration; // WebRTC 配置(STUN/TURN 服务器) maxRetry =30; // 最大重连次数 retryCount = 0; // 当前重连次数 reconnectTimer: any = null; // 重连定时器 heartbeatTimer: any = null; // 心跳定时器 /** * 构造函数 * @param videoElement HTMLVideoElement 视频播放的 DOM 节点 * @param serverIp string 服务器 IP 地址 * @param taskId string 任务 ID */ constructor(videoElement: HTMLVideoElement, serverIp: string, taskId: string) { this.videoElement = videoElement; this.serverUrl = `ws://${serverIp}:8080/ws`; this.taskId = taskId; // 配置 ICE 服务器,包含 Google 的公共 STUN 和自建的 TURN 服务 this.rtcConfig = { iceServers: [ { urls: 'stun:stun.l.google.com:19302' }, // STUN { urls: 'turn:192.168.1.111:10002', // ZLMediaKit TURN username: 'your_username', credential: 'your_password' } ]}; } /** * 启动播放 * 重置重连计数并开始连接 WebSocket */ start() { this.retryCount = 0; this.connectWs(); } /** * 连接 WebSocket 信令服务器 */ connectWs() { // 如果 WebSocket 已连接,直接发送请求流指令 if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.send({ type: 'request_stream', task_id: this.taskId }); return; } if (this.isConnecting) return; this.isConnecting = true; // 清理旧的 PeerConnection 和 WebSocket this.cleanupPeer(); if (this.ws) { try { this.ws.close(); } catch {} this.ws = null; } const ws = new WebSocket(this.serverUrl); this.ws = ws; ws.onopen = () => { this.isConnecting = false; this.retryCount = 0; // 连接成功后请求视频流 this.send({ type: 'request_stream', task_id: this.taskId }); this.startHeartbeat(); }; ws.onmessage = async (event) => { const msg = JSON.parse(event.data); await this.handleSignalingMessage(msg); }; ws.onerror = () => { this.isConnecting = false; this.scheduleReconnect(); }; ws.onclose = () => { this.isConnecting = false; this.stopHeartbeat(); this.scheduleReconnect(); }; } /** * 发送 WebSocket 消息 * @param payload 消息体 */ send(payload: any) { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(payload)); } } /** * 处理信令消息 * @param msg 信令消息对象 */ async handleSignalingMessage(msg: any) { if (!this.pc) this.createPeerConnection(); const pc = this.pc!; switch (msg.type) { case 'offer': { // 收到服务器的 Offer,设置远程描述 await pc.setRemoteDescription({ type: 'offer', sdp: msg.sdp }); // 创建 Answer const answer = await pc.createAnswer(); // 设置本地描述 await pc.setLocalDescription(answer); // 发送 Answer 给服务器 this.send({ type: 'answer', sdp: answer.sdp }); // 处理暂存的 ICE 候选者 while (this.pendingCandidates.length) { const candidate = this.pendingCandidates.shift(); try { await pc.addIceCandidate(candidate); } catch (e) { console.error('Adding pending ICE candidate failed:', e); } } break; } case 'ice_candidate': { // 收到 ICE 候选者 if (msg.candidate) { const candidate = { candidate: msg.candidate, sdpMLineIndex: msg.sdpMLineIndex }; if (pc.remoteDescription) { try { await pc.addIceCandidate(candidate); } catch (e) { console.error('添加 ICE 候选失败:', e); } } else { // 如果远程描述还没设置好,先暂存 this.pendingCandidates.push(candidate); } } break; } case 'pong': // 收到心跳回应,不做处理 break; } } /** * 创建 WebRTC 连接对象 */ createPeerConnection() { this.cleanupPeer(); const pc = new RTCPeerConnection(this.rtcConfig); this.pc = pc; // 收到远程流时的回调 pc.ontrack = (event) => { console.log(`[${this.taskId}] ontrack`, event); const stream = event.streams[0]; this.videoElement.srcObject = stream; this.videoElement.play().catch(() => {}); // 监听流结束事件 stream.getTracks().forEach((t) => { t.onended = () => this.scheduleReconnect(); }); }; // 收集到本地 ICE 候选者时,发送给服务器 pc.onicecandidate = (event) => { if (event.candidate) { this.send({ type: 'ice_candidate', candidate: event.candidate.candidate, sdpMLineIndex: event.candidate.sdpMLineIndex }); } }; // 连接状态变化监听 pc.onconnectionstatechange = () => { const s = pc.connectionState as any; if (s === 'failed' || s === 'disconnected') { this.scheduleReconnect(); } }; pc.oniceconnectionstatechange = () => { const s = pc.iceConnectionState as any; if (s === 'failed' || s === 'disconnected') { this.scheduleReconnect(); } }; } /** * 调度重连 * 使用指数退避算法计算重连延迟 */ scheduleReconnect() { if (this.reconnectTimer) return; if (this.retryCount >= this.maxRetry) return; const delay = Math.min(30000, 1000 * Math.pow(2, this.retryCount)); this.retryCount++; this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this.connectWs(); }, delay); } /** * 开始发送心跳 */ startHeartbeat() { this.stopHeartbeat(); this.heartbeatTimer = setInterval(() => { this.send({ type: 'ping' }); }, 15000); } /** * 停止心跳 */ stopHeartbeat() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } } /** * 清理 WebRTC 连接资源 */ cleanupPeer() { if (this.pc) { try { this.pc.close(); } catch {} this.pc = null; } } /** * 停止播放并清理所有资源 */ stop() { this.stopHeartbeat(); if (this.ws) try { this.ws.close(); } catch {} this.ws = null; this.cleanupPeer(); if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } } } export default WebRTCPlayer; 2. 页面使用、信令交互流程 WebRTC 的核心在于 SDP (Session Description Protocol) 的交换。我们的实现流程如下: 使用 video 标签渲染视频流 <div class="video-card" v-for="(cfg, index) in playersConfig" :key="index"> <video :ref="(el) => (videoRefs[index] = el as HTMLVideoElement)" autoplay muted controls playsinline webkit-playsinline class="video-player" ></video> <div class="video-label">CAM-0{{ index + 1 }}</div> </div> <script lang="ts" setup name="AnalysisDashboard"> import { onMounted, ref, onUnmounted, unref, computed } from 'vue'; // 引入 WebRTC 类,根据项目需求,可根据实际情况调整引入路径 import WebRTCPlayerClass from '/@/components/Ljh/WebRTC/index'; const videoRefs = ref<HTMLVideoElement[]>([]); // serverIp我配置在环境变量中,可根据需求自行配置在哪,写在这也行。 const cameraServerIp = import.meta.env.VITE_CAMERA_SERVER_IP as string; // 配置前端和后端的服务器 serverIp 和任务 taskId(这个是和后端约定传的参数,这个项目根据这个参数来区分是前端流还是后端流,因为要展示两个流,如果没有特殊要求传参,就不用配) const playersConfig = ref([ { serverIp: cameraServerIp, taskId: 'front' }, { serverIp: cameraServerIp, taskId: 'backend' }, ]); onMounted(() => { // 初始化 WebRTC 连接 playersConfig.value.forEach((cfg, index) => { const el = videoRefs.value[index]; if (el) { const p = new WebRTCPlayerClass(el, cfg.serverIp, cfg.taskId); players.value[index] = p; try { p.start(); } catch (e) { console.error(e); } } }); }); onUnmounted(() => { players.value.forEach((p) => { try { p.stop(); } catch (e) { console.error(e); } }); }); </script> <style scoped> .video-card{ ...... // 写视频样式 } 前端发起请求 :连接 WS 成功后,发送 request_stream 指令。 后端响应 Offer :后端创建 WebRTC Peer,发送 offer SDP。 前端应答 Answer :前端收到 Offer,设置 Remote Description,创建 Answer SDP 并发送给后端。 ICE 穿透 :双方交换 ICE Candidate 信息,建立 P2P 连接(或通过 TURN 中转)。 最终实现效果(https://img2024.cnblogs.com/blog/2819675/202601/2819675-20260108132950357-835871349.png) 总结 通过 WebRTC ,我们将视频流延迟控制在了 500ms 以内,实现了“所见即所得”的监控体验;通过 WebSocket + Vue3体系,我们构建了一套高效的状态同步机制,让复杂的作业流程数据能够实时、准确地呈现在用户面前,当然这个需要后端配合,后端需要将传统流,转换为 WebRTC 流,具体事项可以参考 WebRTC 官方文档。 另外这种“实时流 + 实时信令”的架构模式,不仅适用于智慧安防,在远程医疗、在线教育等对实时性要求极高的场景中也具有广泛的应用价值。 最后功能实现后,建议可以详细去官网详细了解下 WebRTC 信令交互流程,上面提供有,代码里有注释,也是根据我自己的理解写的,不一定准确,而且还有其他一些有意思的功能,像是webRTC实现视频通话,视频会议这些。 如有问题,欢迎交流。