cover

使用服务器发送事件(SSE)向 Web 客户端发送消息

本文的主要内容是如何在服务端创建 SSE 接口以及如何在 Web 端如何发起和处理 SSE 请求。

2024-05-12

服务器发送事件(Server Sent Event,简称 SSE)是一种服务器推送技术。使用 SSE 技术,在客户端与服务端建立连接后,服务端可以持续地向客户端发送消息。SSE 常用于客户端发送消息更新或连续数据流,如果在 Web 客户端中请求,可以直接使用 JavaScript 提供的 EventSource 接口进行请求。EventSource API 已经被 WHATWG 标准化为 HTML5 的一部分,常用的浏览器均支持该功能特性。

heading

如何在服务端创建 SSE 接口?

服务器创建 SSE 接口的基本流程为:

  1. 将响应的 Content-Type 设置为 text/event-stream
  2. 通过设置响应头禁用缓存:Cache-Control: no-cache
  3. 通过设置响应头确保长链接:Connection: keep-alive
  4. 发送消息(message 为默认的事件):resp.write("data: message\n\n")
  5. 发送自定义事件:resp.write("event: close\ndata: data\n\n")

下面是一个使用 express 创建的 SSE 接口的代码示例:

// yarn add express const express = require("express"); const app = express(); app.use(express.json()); app.get("/api/sse", (req, resp) => { console.log(`[INFO] got connection from ${req.ip}`); resp.status(200); resp.setHeader("Content-Type", "text/event-stream"); resp.setHeader("Cache-Control", "no-cache"); resp.setHeader("Connection", "keep-alive"); // 可选 let tick = Number((req.query.tick || "10").toString()); const timer = setInterval(() => { const now = new Date().toLocaleTimeString(); // 默认的 event type 为 message resp.write(`data: ${JSON.stringify({ time: now })}\n\n`); if (tick > 0) { tick -= 1; } else { clearInterval(timer); // 也可以自定义事件类型 resp.write(`event: close\ndata: ${JSON.stringify({ message: "bye" })}\n\n`); resp.end(); } }, 1000); }); app.listen(3001, () => console.log(`[INFO] Server is running on port 3001`));
index.js

启动服务并通过下面的方式验证:

curl -H "Accept: text/event-stream" http://127.0.0.1:3001/api/sse
heading

如何在 Web 应用中发起和处理 SSE 请求

一般而言,在 Web 应用中处理 SSE 响应主要是通过 EventSource 接口实现的,一个简单的例子如下:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" /> <title>SSE 示例</title> </head> <body> <main> <div> <button id="send-button">发起 SSE 请求</button> <button id="close-button">关闭 SSE 请求</button> </div> <ul id="event-list"></ul> </main> <script> const state = {}; function send() { const evtSource = new EventSource("/api/sse"); const eventList = document.querySelector("ul"); // 当服务端消息未指定 `event` 时,默认为 `message` evtSource.onmessage = (e) => { const newElement = document.createElement("li"); newElement.textContent = `message: ${e.data}`; eventList.appendChild(newElement); }; // 处理连接建立 `open` 事件 evtSource.onopen = (e) => { const newElement = document.createElement("li"); newElement.textContent = `connected: ${JSON.stringify(e)}`; eventList.appendChild(newElement); state.evtSource = evtSource; }; // 处理连接创建失败 `error` 事件 evtSource.onerror = (e) => { alert("connect error: " + JSON.stringify(e)); }; // 处理自定义事件 - close,当服务端发送的消息中指定的 event 字段为 `close` 字段时触发 evtSource.addEventListener("close", (e) => { alert("close event: " + event.data); evtSource.close(); state.evtSource = undefined; }); } // 点击 send-button 发起一个 SSE 请求 document.querySelector("#send-button").addEventListener("click", (e) => { send(); }); // 手动停止 SSE 请求 function close() { if (state.evtSource) { state.evtSource.close(); state.evtSource = undefined; alert("sse closed"); } } // 点击 close-button 主动停止 SSE 请求 document.querySelector("#close-button").addEventListener("click", (e) => { close(); }); </script> </body> </html>
index.html

要注意的是,浏览器原生的 EventSource 不支持自定义 headers,因此无法通过添加 Authorization 头的方式进行用户认证。如果有自定义 Headers 的需求,可以尝试使用 Polyfill。在 script 标签中引入 polyfill 后,使用下面的方式创建请求即可:

const evtSource = EventSourcePolyfill("/target", { headers: { Authorization: "Bearer <token>" } })

还有一点要注意的是,EventSource 并不支持调用非 GET 方法的 SSE 接口,如果希望调用非 GET 方法的接口,需要通过第三方库或基于 fetch 自行实现,具体的方法请参考这篇文章

heading

在 Taro、微信小程序中处理 SSE 请求

微信小程序中并没有提供 EventSource 接口,但是可以通过 requestTask 接口和 request 方法的 enableChunked 选项变相地实现。

由于微信小程序中没有提供 TextDecoder 类,因此需要先安装 fastestsmallesttextencoderdecoder-encodeinto 库(注意:该库仅支持 utf8 编码):

npm install fastestsmallesttextencoderdecoder-encodeinto

安装后可以使用下面的代码处理 SSE 响应:

import Taro, { RequestTask } from "@tarojs/taro"; // 解码 SSE 响应 //// SSE 推送消息的格式为: //// type: <type_name> //// id: <event_id> //// data: <data string> //// //// type: .... //// 使用该方法将消息格式化 function extractSSEEvents(buffer: any[]) { const uint8 = new Uint8Array(buffer); const text = new TextDecoder().decode(uint8); return text.split("\n\n").filter(p => p.trim()).map((event_str: string) => { const [type, id, data] = event_str.split("\n").map(p => (p.split(": ").pop() || "").trim()); return { type, id, data } }); } // 发送 SSE 请求 function startSSE(url: string, callback: Function) { const task = Taro.request({ url, timeout: 30 * 60_000, // 设置超市时间 method: "GET", enableChunked: true, header: {}, // 可以自定义 header,如添加 Authorization success: (_) => { task.abort(); // 在响应结束后终止 request task callback(); }, fail: (error) => { Taro.showToast({ title: error.errMsg == 'request: fail timeout' ? '请求超时,请稍后重试' : '网络错误', icon: "none", duration: 1000 }) } }); return task; } function sendAndHandleSSE() { const task = startSSE(url, () => console.log("done")); task.onChunkReceived((resp: any) => { const events = extractSSEEvents(resp.data); for (const event of events) { switch (event.type) { case "message": console.log(`[INFO] message 事件: ${event.data}`) break; case "error": console.log(`[INFO] error 事件: ${event.data}`) // 注意:使用 request 无法接收到 open 事件, // 因此如果需要在连接打开后处理某些任务, // 需要通过自定义事件或其他方法进行处理 case "open": default: break } } }) }
sse.ts

部分内容参考自这篇文章

heading

配置 Nginx 以兼容 SSE 请求

如果使用 Nginx 反向代理一个 SSE 接口,如果不进行特定的配置,Nginx 会缓存该请求的结果然后等到响应结束后再一次性返回所有内容,添加下面的配置选项以避免此问题:

# ... location /sse/ { # 启用 event-stream 输出格式,可以直接在 api 服务器中配置 # default_type text/event-stream; # add_header Cache-Control: no-cache; # 关闭 Nginx 内置的缓存机制 sendfile off; aio off; proxy_buffering off; # 禁用 proxy buffering # ... 其他配置 } # ...