基于ServerSendEvent实现AI实时群聊

如今,AI 底层基础研究突飞猛进,上层应用紧锣密鼓、遍地开花。作为应用层开发者,今天来研究下 AI 相关的工程技术—“SSE 服务端推送技术”。

我这里自己设计了一个产品需求,就是实现了一个“老崔 AI 畅聊群”,在群内大家可以互相聊天,也可以 at “AI 小助手” 进行 AI 问答。目前已经实现的功能有:

  • 群界面 UI。模仿了微信 UI 大概的样子。
  • 群成员列表。用户进群和离开会实时更新成员列表。(采用了“SSE”技术广播)。
  • 发言输入框,回车后可以在群内发言。所有成员都能实时看到各自的发言消息。(采用了“SSE”技术广播)。
  • 在群里可以 at 各个 ai 小助手,或者 at 其他用户。(at 富文本组件,采用我在微信时自研的 atEditor 组件)
  • 【TODO】可以 at 人工智能 ai 小助手发言,如果 at 的是 ai 小助手,则可得到 ai 的回答。(采用当下火热的 SSE 推送技术和光标跟随技术实现“类似 chaggpt/deepseek”的效果)
  • 成员无需登录,自动随机用户 id。
  • redis 保存最近历史消息。

效果图演示

您可以访问 https://product.cuiyongjian.com/chatroom 进行体验。

MacBook 效果:

iPad 效果:

what

说到 SSE 这个技术,上学时候就在著名的“JavaScript 红宝书”中看到这个技术,后来大概在 2016 年我刚刚毕业进入腾讯做运营管理后台的时候,那个内部系统中就使用了这个来异步通知服务器上 excel 文件的生成进度。

而如今他又大规模运用到了 ai 对话场景,可见他依然有他的技术优势和生命力。

其实对于多人聊天场景,使用 websocket 更能提高性能。毕竟你要“全双工发言”,而不是你问一句他回一句。但 AI 场景下是 “问一句+回一句”的串行模式,这种情况下,完全可以采用“原始 http 的请求+响应模型”即可,之所以各个厂商都把 SSE 引入,是因为 AI 在“回复”这一步的速度是很慢的而且是“流式的”,因此 AI 场景下使用 SSE 可以实现如下目的:

  1. 让用户尽快看到已经生成的答复内容,减缓用户焦虑。
  2. 充分利用后台流的能力和浏览器的“可分块接收流”的能力,将已经产生的流内容尽快扔到前端浏览器。此处这个流式传输技术就是 SSE(换个角度看,他又可以看作一种服务端推送技术,因为每次是 push 一个完整的消息给前端浏览器)。

本文也基于此,来研究 SSE 的使用和实战。

优势

缺点:

  • 相比 websocket 的全双工,这个 SSE 它是单工的。即它只能由服务器向前端 push,而不能由前端同时并行向服务器也传送数据。

他也有自己的优势:

  • SSE 默认支持断线重连,WebSocket 需要自己实现。
  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。

有人说 SSE 相对 websocket 而言对服务器的资源占用更少,说原因是 socket 要占用服务器连接不释放。 然而我不这么认为,其实 SSE 既然要推送消息他依然要维持这个 http 连接不释放,它本质上不依然是一个 tcp 连接么(只是浏览器没有给你暴露向服务器双工发送信息的 api 而已)。

原理

首先,http1.1 连接本来就是不关闭的

自从 http1.1 以来,一般服务器和浏览器都是采用 “keep-alive” 方式开启一条 http 请求通道。这个通道在用完后不会立刻关闭,而是会给后续其他请求服用。

所以你不能说 SSE 是一个长连接,因为即使你不用 SSE 的话,现代浏览器很多情况下的 http 也已经是 keepAlive 的长连接了。

直接用 http 发个普通请求能否做到服务器推送?

http1.1 尽管采用了链接通道复用(即 keepAlive),这仿佛让服务端推送有了前置条件,但是它依然采用 “单工+请求响应模型”。 即 “前端发一个请求,后端响应一个内容;前端不发请求,则后端不会响应内容”。

这种由浏览器和 web 服务器约定好的“请求后再响应”的模型,让协议处理变得简单,但基于此也导致它无法实现服务器按需多次响应。

SSE 是怎么搞的?

既然 http 协议不允许服务器主动往浏览器推送。那岂不是浪费了 keepAlive 的这条通道? 那我能否改一改让他能主动推呢?

于是浏览器和服务器约定了一种 MIME type, 当服务器返回 “text/event-stream” 这个 ContentType 类型的时候,那么浏览器就认为这个 http 请求通道中,服务端可能会持续往浏览器追加响应内容。于是,一条“可用于服务端推送”的通道就这样建立起来了。

这种 “流式” 的响应,其实就像视频内容一样,它要一点一点的流给浏览器,就间接实现了 “服务器推送”。当服务端响应 “text/event-stream”这个类型的时候 ,浏览器就认为数据还没有发送完毕,他会持续等数据并且根据每次 push 过来的数据内容给 js 进程内回调事件。

以上,就是 SSE 的实现原理。

AI 聊天室架构设计

整体设计

对于 chatgpt/deepseek 之类的对话应用,他是“单对单”聊天场景。 从 deepSeek 的网页端分析来看,他是通过“每次问答都创建一次 SSE 链接”来实现的。即 “完成一次问答互动”就关闭,当你再次输入框输入下一个问题回车,则“再次创建一个新的 SSE 链接”。所以服务器只需要在响应一次问答结束后,直接 res.end 关掉这个 SSE http 通道连接即可。

而我这个群聊场景“聊天群” ,属于“广播场景”,需要 A 发出提问或消息后,广播给所有人;服务器如果产生了回答,也要广播给群内所有人。于是,nodejs 层需要将 sse 的 socket 链接从“局部” 挪到外层 “全局”,从而让每个 http 会话做响应时直接广播给所有 socket。

而且我的这个聊天场景下所有用户的 SSE socket 链接都不应该从服务端进行主动关闭,只能由客户端自行退出时才触发 socket 关闭,因为只要用户没有离开界面,服务端就没有“结束”的概念。

简单画一下时序图:

ai 对话部分实现

at 选到小助手后的 ai 对话部分,这里相比 deepseek 的难点在于:我不止要将 ai 流持续推给“提问的用户”,而且要把 ai 回复持续推给所有被广播用户。而在此期间,任何用户都可能在群内已经发出新的言论,因此 ai 回复的广播必须重新定义 SSE 的 event 类型和消息体。

我打算这样设计:

1
2
3
4
5
event: chunkmessage, // 代表这不是一条新群消息,而是过往某一条消息的"后补部分"。基于此,前端可以交给 chunkmessage 的处理逻辑来处理。
data: {
msgId: 111, // 代表这是111 这个序号的消息的后续chunk 数据。前端可以去找到这条消息进行后补填充。
chunktext: '' // chunk 内容。
}

在单条 AI 消息的拼凑逻辑方面,直接借鉴 deepseek 等 ai 产品的结构即可。以下是 deepseek 的答复 SSE 回复,消息会持续不断的返回:

因时间原因,我暂时还未实现这块逻辑。先列为 TODO。

服务端实现

为了能向 nodejs 的原生 response 对象写入内容,在 koa 框架下我们需要访问 ctx.res 这个原始 http 响应对象。通过 write 方法写入数据:

1
ctx.res.write(`data: ${JSON.stringify(data)}\n\n`);

由于 koa 一般只让你操作 koa.body,而如果 koa.body 是一个流,则底层会 pipe 到 ctx.res 上。所以在 koa 下也有一个简便办法就是利用一个虚拟流 PassThrough:

1
2
3
const s = new PassThrough();
ctx.body = s;
s.write();

我这里没有使用 ctx.body ,而是直接操纵 ctx.res。 为此,我们需要将 koa 的自身 respond 逻辑关掉:

1
ctx.respond = false;

为了能提高推送数据的实时性,可以将 socket 的一些延迟配置关掉,如:

1
2
3
ctx.request.socket.setTimeout(0); // 只要客户端不死,咱服务器就永远别触发超时事件(默认情况下如果 socket 不活动则会触发超时事件,即使超时了也得手动关闭)。
ctx.request.socket.setKeepAlive(true); // 让底层tcp 走心跳包保活,以避免客户端超时掉线。 这个配置是否有检测客户端离开的能力?还待设置成 false测试。
ctx.request.socket.setNoDelay(true); // 这个会开启 tcp 的 nagle算法,即使没有 ack 也会把新数据立刻发出去。

还要处理异常情况,例如客户端关闭后,咱们要准备退出。

1
2
3
4
5
ctx.request.socket.on("close", (hadError) => {
// 浏览器关闭 tab 等行为都会触发 close。经过测试,发现例如 safari 浏览器,我直接活动监视器杀掉,他也能 close 事件触发。可能是操作系统行为?
console.log("socket 关闭了", hadError);
eventBus.emit("removeClient", userInfo);
});

小技巧

SSE 建联时,服务器端务必要先返回一个 http 报文头。否则前端会一直 pending 导致 SSE 报错。如果你 nodejs 中仅仅调用了 ctx.set 设置响应头,他底层其实只是 ctx.res.setHeader 进行设置,不一定写到了底层 tcp 协议缓冲区。为此你需要主动调用 ctx.res.write 发送一点欢迎消息。

或者,直接调用 ctx.res.flushHeaders(); 将响应头发送给浏览器,则可以解决该问题。

消息顺序一致性的实现

群聊场景,以服务端接收时机生成 seq 顺序自增,所有群成员以服务端 seq 顺序为准。

nginx 配置

注意这个 nginx 必须得禁用缓存,否则会导致前端一直 pending(估计如果你不设置的话, nginx 会缓存 sse 响应内容导致浏览器等不到报文头):

1
2
3
4
5
location /sse {
proxy_pass http://localhost:3000;
proxy_buffering off; # 禁用缓冲
proxy_cache off; # 禁用缓存
}

总之,sse 连接的 http 报文头很重要,一定要第一时间返回给浏览器,让浏览器知道你是一个 SSE 建联的响应。

前端实现

SSE 建联:

1
2
3
4
5
6
const sseUrl =
"https://api.cuiyongjian.com/api/chatroom/waitRoomMsg?id=1&r=" +
Math.random();
const eventSource = new window.EventSource(sseUrl, {
withCredentials: true,
});

首个欢迎消息(即个人身份信息)处理:

1
2
3
4
const myUserId = ref("");
eventSource.addEventListener("whoame", (event) => {
myUserId.value = event.data;
});

发言消息广播接收后的处理:

1
2
3
4
5
6
7
8
9
10
11
12
eventSource.addEventListener("message", (event) => {
const msgObj = JSON.parse(event.data);
const uid = cookies.get("user_chat_id");
const msgInfo = {
isMe: uid === msgObj.from, // 发言身份处理
...msgObj,
};
messageInfoList.value.push(msgInfo);
nextTick(() => {
// 滚动条处理
});
});

群成员更新消息处理:

1
2
3
4
5
6
7
8
9
10
11
const personList = ref<Array<Record<string, string>>>([]);
eventSource.addEventListener("list", (event) => {
const tmp = JSON.parse(event.data);
debugger;
personList.value = tmp.map((item: Record<string, string>) => {
return {
...item,
isMe: myUserId.value === item.userChatId,
};
});
});

发言,发送消息:

1
2
3
4
5
6
7
8
async function sendMessage() {
console.log("send message", messageText.value);
await myApiAxios.post("/api/chatroom/send", {
text: messageText.value,
whoame: myUserId.value,
});
messageText.value = "";
}

光标跟随的实现

AI 消息由于是动态拼凑出来的,因此一般需要光标跟随效果。

todo。这里可采用 css 技巧实现。

refer

https://juejin.cn/post/7205412097953808445
https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
https://juejin.cn/post/7337250744515035146
https://blog.csdn.net/qq_40074694/article/details/120250835