/** * SSE 封装,使微信小程序等环境可通过 enableChunked + 分块解析使用 SSE * - 小程序:使用 uni.request enableChunked,按块接收后解析 SSE 格式并回调 onMessage * - H5:可优先使用原生 EventSource,不支持时回退到 chunked 方式 */ // 将 UTF-8 字节数组解码为字符串(兼容小程序无 TextDecoder、避免中文乱码) function utf8Decode(bytes) { if (!bytes || bytes.length === 0) return ''; if (typeof TextDecoder !== 'undefined') { return new TextDecoder('utf-8').decode(bytes instanceof Uint8Array ? bytes : new Uint8Array(bytes)); } const arr = bytes instanceof Uint8Array ? bytes : new Uint8Array(bytes); let s = ''; let i = 0; const len = arr.length; while (i < len) { const b = arr[i++]; if (b < 0x80) { s += String.fromCharCode(b); } else if (b < 0xe0 && i < len) { s += String.fromCharCode(((b & 0x1f) << 6) | (arr[i++] & 0x3f)); } else if (b < 0xf0 && i + 1 < len) { s += String.fromCharCode(((b & 0x0f) << 12) | ((arr[i++] & 0x3f) << 6) | (arr[i++] & 0x3f)); } else if (b < 0xf8 && i + 2 < len) { let c = ((b & 0x07) << 18) | ((arr[i++] & 0x3f) << 12) | ((arr[i++] & 0x3f) << 6) | (arr[i++] & 0x3f); c -= 0x10000; s += String.fromCharCode(0xd800 + (c >> 10), 0xdc00 + (c & 0x3ff)); } else { s += String.fromCharCode(b); } } return s; } // 将分块数据转为字符串(UTF-8 解码,避免中文乱码) function chunkToText(data) { if (data == null) return ''; if (typeof data === 'string') return data; if (data instanceof ArrayBuffer) data = new Uint8Array(data); if (data instanceof Uint8Array) return utf8Decode(data); return ''; } // 解析 SSE 缓冲区:按 \n\n 分割消息,解析 data:/event:/id: 行,返回 { messages, remaining } function parseSSEBuffer(buffer) { const messages = []; let rest = buffer; for (;;) { const idx = rest.indexOf('\n\n'); if (idx === -1) break; const block = rest.slice(0, idx); rest = rest.slice(idx + 2); let data = ''; let event = ''; let id = ''; block.split('\n').forEach((line) => { const s = line.trim(); if (s.startsWith('data:')) data = (data ? data + '\n' : '') + s.slice(5).trim(); else if (s.startsWith('event:')) event = s.slice(6).trim(); else if (s.startsWith('id:')) id = s.slice(3).trim(); }); messages.push({ data, event, id }); } return { messages, remaining: rest }; } /** * 创建 SSE 连接(兼容微信小程序等无原生 EventSource 的环境) * @param {string} url - 完整请求 URL * @param {object} options - { header, timeout, ... } 会透传给 uni.request * @returns {object} - { abort(), onMessage(cb), onOpen(cb), onError(cb) } */ export function createSSEConnection(url, options = {}) { let buffer = ''; const callbacks = { onMessage: null, onOpen: null, onError: null }; const { header = {}, timeout = 0, ...rest } = options; const requestTask = uni.request({ url, method: 'GET', header: { ...header }, enableChunked: true, timeout, ...rest, fail(err) { if (callbacks.onError) callbacks.onError(err); if (options.fail) options.fail(err); } }); if (requestTask && typeof requestTask.onChunkReceived === 'function') { requestTask.onChunkReceived((res) => { try { buffer += chunkToText(res.data); const { messages, remaining } = parseSSEBuffer(buffer); buffer = remaining; messages.forEach((msg) => { if (callbacks.onMessage) callbacks.onMessage(msg); }); } catch (e) { if (callbacks.onError) callbacks.onError(e); } }); } if (requestTask && typeof requestTask.onHeadersReceived === 'function') { requestTask.onHeadersReceived(() => { if (callbacks.onOpen) callbacks.onOpen(); }); } return { get requestTask() { return requestTask; }, onMessage(cb) { callbacks.onMessage = cb; return this; }, onOpen(cb) { callbacks.onOpen = cb; return this; }, onError(cb) { callbacks.onError = cb; return this; }, abort() { if (requestTask && typeof requestTask.abort === 'function') requestTask.abort(); } }; }