sse.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. /**
  2. * SSE 封装,使微信小程序等环境可通过 enableChunked + 分块解析使用 SSE
  3. * - 小程序:使用 uni.request enableChunked,按块接收后解析 SSE 格式并回调 onMessage
  4. * - H5:可优先使用原生 EventSource,不支持时回退到 chunked 方式
  5. */
  6. // 将 UTF-8 字节数组解码为字符串(兼容小程序无 TextDecoder、避免中文乱码)
  7. function utf8Decode(bytes) {
  8. if (!bytes || bytes.length === 0) return '';
  9. if (typeof TextDecoder !== 'undefined') {
  10. return new TextDecoder('utf-8').decode(bytes instanceof Uint8Array ? bytes : new Uint8Array(bytes));
  11. }
  12. const arr = bytes instanceof Uint8Array ? bytes : new Uint8Array(bytes);
  13. let s = '';
  14. let i = 0;
  15. const len = arr.length;
  16. while (i < len) {
  17. const b = arr[i++];
  18. if (b < 0x80) {
  19. s += String.fromCharCode(b);
  20. } else if (b < 0xe0 && i < len) {
  21. s += String.fromCharCode(((b & 0x1f) << 6) | (arr[i++] & 0x3f));
  22. } else if (b < 0xf0 && i + 1 < len) {
  23. s += String.fromCharCode(((b & 0x0f) << 12) | ((arr[i++] & 0x3f) << 6) | (arr[i++] & 0x3f));
  24. } else if (b < 0xf8 && i + 2 < len) {
  25. let c = ((b & 0x07) << 18) | ((arr[i++] & 0x3f) << 12) | ((arr[i++] & 0x3f) << 6) | (arr[i++] & 0x3f);
  26. c -= 0x10000;
  27. s += String.fromCharCode(0xd800 + (c >> 10), 0xdc00 + (c & 0x3ff));
  28. } else {
  29. s += String.fromCharCode(b);
  30. }
  31. }
  32. return s;
  33. }
  34. // 将分块数据转为字符串(UTF-8 解码,避免中文乱码)
  35. function chunkToText(data) {
  36. if (data == null) return '';
  37. if (typeof data === 'string') return data;
  38. if (data instanceof ArrayBuffer) data = new Uint8Array(data);
  39. if (data instanceof Uint8Array) return utf8Decode(data);
  40. return '';
  41. }
  42. // 解析 SSE 缓冲区:按 \n\n 分割消息,解析 data:/event:/id: 行,返回 { messages, remaining }
  43. function parseSSEBuffer(buffer) {
  44. const messages = [];
  45. let rest = buffer;
  46. for (;;) {
  47. const idx = rest.indexOf('\n\n');
  48. if (idx === -1) break;
  49. const block = rest.slice(0, idx);
  50. rest = rest.slice(idx + 2);
  51. let data = '';
  52. let event = '';
  53. let id = '';
  54. block.split('\n').forEach((line) => {
  55. const s = line.trim();
  56. if (s.startsWith('data:')) data = (data ? data + '\n' : '') + s.slice(5).trim();
  57. else if (s.startsWith('event:')) event = s.slice(6).trim();
  58. else if (s.startsWith('id:')) id = s.slice(3).trim();
  59. });
  60. messages.push({ data, event, id });
  61. }
  62. return { messages, remaining: rest };
  63. }
  64. /**
  65. * 创建 SSE 连接(兼容微信小程序等无原生 EventSource 的环境)
  66. * @param {string} url - 完整请求 URL
  67. * @param {object} options - { header, timeout, ... } 会透传给 uni.request
  68. * @returns {object} - { abort(), onMessage(cb), onOpen(cb), onError(cb) }
  69. */
  70. export function createSSEConnection(url, options = {}) {
  71. let buffer = '';
  72. const callbacks = { onMessage: null, onOpen: null, onError: null };
  73. const { header = {}, timeout = 0, ...rest } = options;
  74. const requestTask = uni.request({
  75. url,
  76. method: 'GET',
  77. header: { ...header },
  78. enableChunked: true,
  79. timeout,
  80. ...rest,
  81. fail(err) {
  82. if (callbacks.onError) callbacks.onError(err);
  83. if (options.fail) options.fail(err);
  84. }
  85. });
  86. if (requestTask && typeof requestTask.onChunkReceived === 'function') {
  87. requestTask.onChunkReceived((res) => {
  88. try {
  89. buffer += chunkToText(res.data);
  90. const { messages, remaining } = parseSSEBuffer(buffer);
  91. buffer = remaining;
  92. messages.forEach((msg) => {
  93. if (callbacks.onMessage) callbacks.onMessage(msg);
  94. });
  95. } catch (e) {
  96. if (callbacks.onError) callbacks.onError(e);
  97. }
  98. });
  99. }
  100. if (requestTask && typeof requestTask.onHeadersReceived === 'function') {
  101. requestTask.onHeadersReceived(() => {
  102. if (callbacks.onOpen) callbacks.onOpen();
  103. });
  104. }
  105. return {
  106. get requestTask() {
  107. return requestTask;
  108. },
  109. onMessage(cb) {
  110. callbacks.onMessage = cb;
  111. return this;
  112. },
  113. onOpen(cb) {
  114. callbacks.onOpen = cb;
  115. return this;
  116. },
  117. onError(cb) {
  118. callbacks.onError = cb;
  119. return this;
  120. },
  121. abort() {
  122. if (requestTask && typeof requestTask.abort === 'function') requestTask.abort();
  123. }
  124. };
  125. }