使用服务器发送事件(SSE)向 Web 客户端发送消息
本文的主要内容是如何在服务端创建 SSE 接口以及如何在 Web 端如何发起和处理 SSE 请求。
2024-05-12
服务器发送事件(Server Sent Event,简称 SSE)是一种服务器推送技术。使用 SSE 技术,在客户端与服务端建立连接后,服务端可以持续地向客户端发送消息。SSE 常用于客户端发送消息更新或连续数据流,如果在 Web 客户端中请求,可以直接使用 JavaScript 提供的 EventSource 接口进行请求。
EventSource
如何在服务端创建 SSE 接口?
在服务器创建 SSE 接口的基本流程为:
- 将响应的 设置为
Content-Type
;text/event-stream
- 通过设置响应头禁用缓存:;
Cache-Control: no-cache
- 通过设置响应头确保长链接:;
Connection: keep-alive
- 发送消息(message 为默认的事件):
resp.write("data: message\n\n")
- 发送自定义事件:
resp.write("event: close\ndata: data\n\n")
下面是一个使用
express
// yarn add express
const express = require("express");
const app = express();
app.use(express.json());
app.get("/api/sse", (req, resp) => {
console.log(`[INFO] got connection from ${req.ip}`);
resp.status(200);
resp.setHeader("Content-Type", "text/event-stream");
resp.setHeader("Cache-Control", "no-cache");
resp.setHeader("Connection", "keep-alive"); // 可选
let tick = Number((req.query.tick || "10").toString());
const timer = setInterval(() => {
const now = new Date().toLocaleTimeString();
// 默认的 event type 为 message
resp.write(`data: ${JSON.stringify({ time: now })}\n\n`);
if (tick > 0) {
tick -= 1;
} else {
clearInterval(timer);
// 也可以自定义事件类型
resp.write(`event: close\ndata: ${JSON.stringify({ message: "bye" })}\n\n`);
resp.end();
}
}, 1000);
});
app.listen(3001, () => console.log(`[INFO] Server is running on port 3001`));
index.js
启动服务并通过下面的方式验证:
curl -H "Accept: text/event-stream" http://127.0.0.1:3001/api/sse
如何在 Web 应用中发起和处理 SSE 请求
一般而言,在 Web 应用中处理 SSE 响应主要是通过 接口实现的,一个简单的例子如下:EventSource
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SSE 示例</title>
</head>
<body>
<main>
<div>
<button id="send-button">发起 SSE 请求</button>
<button id="close-button">关闭 SSE 请求</button>
</div>
<ul id="event-list"></ul>
</main>
<script>
const state = {};
function send() {
const evtSource = new EventSource("/api/sse");
const eventList = document.querySelector("ul");
// 当服务端消息未指定 `event` 时,默认为 `message`
evtSource.onmessage = (e) => {
const newElement = document.createElement("li");
newElement.textContent = `message: ${e.data}`;
eventList.appendChild(newElement);
};
// 处理连接建立 `open` 事件
evtSource.onopen = (e) => {
const newElement = document.createElement("li");
newElement.textContent = `connected: ${JSON.stringify(e)}`;
eventList.appendChild(newElement);
state.evtSource = evtSource;
};
// 处理连接创建失败 `error` 事件
evtSource.onerror = (e) => {
alert("connect error: " + JSON.stringify(e));
};
// 处理自定义事件 - close,当服务端发送的消息中指定的 event 字段为 `close` 字段时触发
evtSource.addEventListener("close", (e) => {
alert("close event: " + event.data);
evtSource.close();
state.evtSource = undefined;
});
}
// 点击 send-button 发起一个 SSE 请求
document.querySelector("#send-button").addEventListener("click", (e) => {
send();
});
// 手动停止 SSE 请求
function close() {
if (state.evtSource) {
state.evtSource.close();
state.evtSource = undefined;
alert("sse closed");
}
}
// 点击 close-button 主动停止 SSE 请求
document.querySelector("#close-button").addEventListener("click", (e) => {
close();
});
</script>
</body>
</html>
index.html
要注意的是,浏览器原生的
EventSource
headers
Authorization
script
const evtSource = EventSourcePolyfill("/target", {
headers: { Authorization: "Bearer <token>" }
})
还有一点要注意的是,
EventSource
GET
GET
fetch
在 Taro、微信小程序中处理 SSE 请求
微信小程序中并没有提供
EventSource
requestTask
request
enableChunked
由于微信小程序中没有提供
TextDecoder
fastestsmallesttextencoderdecoder-encodeinto
npm install fastestsmallesttextencoderdecoder-encodeinto
安装后可以使用下面的代码处理 SSE 响应:
import Taro, { RequestTask } from "@tarojs/taro";
// 解码 SSE 响应
//// SSE 推送消息的格式为:
//// type: <type_name>
//// id: <event_id>
//// data: <data string>
////
//// type: ....
//// 使用该方法将消息格式化
function extractSSEEvents(buffer: any[]) {
const uint8 = new Uint8Array(buffer);
const text = new TextDecoder().decode(uint8);
return text.split("\n\n").filter(p => p.trim()).map((event_str: string) => {
const [type, id, data] = event_str.split("\n").map(p => (p.split(": ").pop() || "").trim());
return { type, id, data }
});
}
// 发送 SSE 请求
function startSSE(url: string, callback: Function) {
const task = Taro.request({
url,
timeout: 30 * 60_000, // 设置超市时间
method: "GET",
enableChunked: true,
header: {}, // 可以自定义 header,如添加 Authorization
success: (_) => {
task.abort(); // 在响应结束后终止 request task
callback();
},
fail: (error) => {
Taro.showToast({
title: error.errMsg == 'request: fail timeout' ? '请求超时,请稍后重试' : '网络错误',
icon: "none",
duration: 1000
})
}
});
return task;
}
function sendAndHandleSSE() {
const task = startSSE(url, () => console.log("done"));
task.onChunkReceived((resp: any) => {
const events = extractSSEEvents(resp.data);
for (const event of events) {
switch (event.type) {
case "message":
console.log(`[INFO] message 事件: ${event.data}`)
break;
case "error":
console.log(`[INFO] error 事件: ${event.data}`)
// 注意:使用 request 无法接收到 open 事件,
// 因此如果需要在连接打开后处理某些任务,
// 需要通过自定义事件或其他方法进行处理
case "open":
default:
break
}
}
})
}
sse.ts
部分内容参考自这篇文章。
配置 Nginx 以兼容 SSE 请求
如果使用 Nginx 反向代理一个 SSE 接口,如果不进行特定的配置,Nginx 会缓存该请求的结果然后等到响应结束后再一次性返回所有内容,添加下面的配置选项以避免此问题:
# ... location /sse/ { # 启用 event-stream 输出格式,可以直接在 api 服务器中配置 # default_type text/event-stream; # add_header Cache-Control: no-cache; # 关闭 Nginx 内置的缓存机制 sendfile off; aio off; proxy_buffering off; # 禁用 proxy buffering # ... 其他配置 } # ...