开始之前… #
最近又接到一个模块的需求,起初是需要跟后端对接ai对话的api,这是最初版的,当时还是用的其他的返回格式 application/x-ndjson
。
但是由于最近的deepseek,后端又将ai换成了deepseek的了,deepseek的返回格式是 text/event-stream
,用的协议是 SSE (Server-Sent Events)
。
返回数据格式 #
可以参考deepseek api文档,可知我们需要处理的数据的格式是怎样的:
data: {"id": "1f633d8bfc032625086f14113c411638", "choices": [{"index": 0, "delta": {"content": "", "role": "assistant"}, "finish_reason": null, "logprobs": null}], "created": 1718345013, "model": "deepseek-chat", "system_fingerprint": "fp_a49d71b8a1", "object": "chat.completion.chunk", "usage": null}
data: {"choices": [{"delta": {"content": "Hello", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": "!", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": " How", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": " can", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": " I", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": " assist", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": " you", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": " today", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": "?", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}
data: {"choices": [{"delta": {"content": "", "role": null}, "finish_reason": "stop", "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1", "usage": {"completion_tokens": 9, "prompt_tokens": 17, "total_tokens": 26}}
data: [DONE]
TIP
要注意的是每一条数据之间都是有\n\n
分隔的,标准SSE事件,每条事件就是用’\n\n’分隔的。这个信息在后面处理数据时有用。
处理方法 #
其实这个在网上搜索就知道了,我也看过,很多答案都是用一个前端的API EventSource
来处理的,或者就是用 event-source 的改库 event-source-polyfill
。
EventSource
限制在于只能使用 'GET' 方法,请求需要的参数都只能明文传输。
event-source-polyfill
虽然是基于(XHR)的版本,理论上我们是可以改造xhr来发post请求的。但是为了兼容SSE协议规范,还是会默认使用get方法;另外就算可以模拟post请求,但也已经偏离了标准SSE,相当于自己造轮子了。
我的原则是还是少用点库,有些能自己解决就更好。
所有我还是用常规请求,用fetch和axios都可以,但是fetch是原生支持SSE的,用axios需要指定下适配器 adapter
。
代码实现 #
请求方法 #
先找你的后端拿到对应的接口地址先。
// const params = {}
const response = await fetch('/v1/chat/completions', {
method: 'POST',
body: JSON.stringify(params), // 需要序列化
headers: {
'Content-Type': 'application/json',
},
})
axios版
const response = await axios.post('/v1/chat/completions', params, {
responseType: 'stream', // axios 需要指定响应格式
adapter: 'fetch'
})
解析流数据 #
返回的数据打印你看不到内容,只能知道它是一个 ReadableStream
。最简单的处理方式就是:
const reader = response.body.getReader()
let data = ''
while (true) {
const { done, value } = await reader.read()
if (done)
break
data += new TextDecoder().decode(value)
}
这就是一个常用的办法来处理的。但是吧,这应该是针对后端已经帮你处理好了ai api返回的数据,能直接使用的。如果是原始的数据格式,那还需要我们自己处理一下。
TIP
如果是 axios 那么是通过 response.data
接下来我还是用更加完善的另一种方法实现数据处理。
在mdn文档中还看得到另外一种处理流的方法,就是异步的for循环:
for await (const chunk of response.body) {
// ...
}
现在是需要将 response.body
改造成遍历后能直接使用的数据,而不是这一串字符串。
data: {"id": "1f633d8bfc032625086f14113c411638", "choices": [{"index": 0, "delta": {"content": "", "role": "assistant"}, "finish_reason": null, "logprobs": null}], "created": 1718345013, "model": "deepseek-chat", "system_fingerprint": "fp_a49d71b8a1", "object": "chat.completion.chunk", "usage": null}
需要先了解几个方法:
TextDecoderStream
这个方法跟TextDecoder
是类似的,不同就在于TextCoder
是一次性拿到完整二进制数据解析成文本;而TextDecoderStream
则是实时将数据流转成文本流(边传边解析)。不过TextDecoder
也可以实现,只不过需要{ stream: true }
手动模拟流。
pipeThrough(transformStream, option)
提供将当前流管道输出到一个转换(transform)流或可写/可读流对的链式方法。
就是把一个流的数据通过转换流处理一下,输出新的流。通俗讲就是对流的数据边收边改、边流边处理。 跟TextDecoder
的read和write类似,只不过更方便更现代化。
new TransformStream({})
这个就是pipeThrough
需要用到的转换流对象,它包含了transform
(每一段流的处理过程)和flush
(接收完所有流后的收尾工作)
具体实现
function transformStream(readableStream) {
const decoderStream = new TextDeCoderStream()
const stream = readableStream
.pipeThrough(decoderStream)
.pipeThrough(function () {
let buffer = ''
return new TransformStream({
transform(streamChunk, controller) {
buffer += streamChunk
const parts = buffer.split('\n\n')
parts.slice(0, -1).forEach((part) => {
controller.enqueue(part)
})
buffer = parts[parts.length - 1]
},
flush(controller) {
if ((buffer ?? '').trim() !== '') {
controller.enqueue(buffer)
}
}
})
}())
.pipeThrough(new TransformStream({
transform(chunk, controller) {
const lines = chunk.split('\n')
const sseEvent = lines.reduce((acc, line) => {
const separatorIndex = line.indexOf(':')
if (separatorIndex === -1) {
throw new Error('The key-value separator ":" is not found in the sse line chunk!')
}
const key = line.slice(0, separatorIndex)
const value = line.slice(separatorIndex + 1)
return {
...acc,
[key]: value
}
}, {})
if (Object.keys(sseEvent).length === 0)
return
controller.enqueue(sseEvent)
}
}))
return stream
}
步骤解析
- 第一个
pipeThrough()
这个步骤目的是将原始的二进制流转成字符串流。这个二进制流你可以通过浏览器F12,在network栏查看流请求的二进制数据。
Uint8Array([100, 97, 116, 97, 58, 32, 123, ...])
=> "data: {\"id\": \"1f633...\"}"
- 第二个
pipeThrough()
这个步骤目的是将完整的字符串切成每条独立的SSE消息。 最后一条消息也同样是有\n\n
分隔符的,所以最后一项一定是空字符串,所以处理的时候要排除掉最后一项的空字符串。
- 第三个
pipeThrough()
这个步骤目的是将每条SSE消息字符串解析成一个对象。 因为每条SSE消息可能会包含多个data行的,它们通过’\n’分隔。拆分后,再通过reduce
方法或其他办法处理成一个对象 { data: "{...}" }
。
最后在异步for循环中遍历得到的 chunk
就是处理后的数据: { data: "{...}" }
。
TIP
其实还可以最后一层的transform()处理一下,把data的值解析成一个对象
const parsed = JSON.parse(sseEvent.data)
// controller.enqueue(sseEvent) 改成
controller.enqueue(parsed)
处理解析后的结果 #
let fullContent = ''
for await (const chunk of transformStream(response.body)) {
const { data } = chunk
// 返回的字符串流,它们可能会包含一个空格符在前面 eg: "id": "1f633d8bfc032625086f14113c411638"
if (data.trim() === '[DONE]')
break
const parsed = JSON.parse(data)
const content = parse.choices[0].delta.content ?? '' // 具体格式看你请求具体返回
fullContent += content
}
这就可以逐步拿到整个内容了!
代码优化 #
这样看着很冗长,我们可以拆分业务到单独的方法中去(这里我把三个方法拆分开看这方便一点):
async function handleStream() {
const params = {}
const response = await fetchStream(params)
for await (const chunk of transformStream(response.body)) {
// ...
}
}
function transformStream(readablesStream) {
const decoderStream = new TextCoderStream()
const stream = readableStream
.pipeThrough(decoderStream)
.pipeThrough(splitStream())
.pipeThrough(splitParts())
return stream
}
function splitStream() {
let buffer = ''
return new TransformStream({
transform(streamChunk, controller) {
buffer += streamChunk
const parts = buffer.split('\n\n')
parts.slice(0, -1).forEach((part) => {
controller.enqueue(part)
})
buffer = parts[parts.length - 1]
},
flush(controller) {
if ((buffer ?? '').trim() !== '') {
controller.enqueue(buffer)
}
}
})
}
function splitParts() {
return new TransformStream({
transform(chunk, controller) {
const lines = chunk.split('\n')
const sseEvent = lines.reduce((acc, line) => {
const separatorIndex = line.indexOf(':')
if (separatorIndex === -1) {
throw new Error('The key-value separator ":" is not found in the sse line chunk!')
}
const key = line.slice(0, separatorIndex)
const value = line.slice(separatorIndex + 1)
return {
...acc,
[key]: value
}
}, {})
if (Object.keys(sseEvent).length === 0)
return
controller.enqueue(sseEvent)
}
})
}
async function fetchStream(params) {
const response = await fetch('/v1/chat/completions', {
method: 'POST',
body: JSON.stringify(params), // 需要序列化
headers: {
'Content-Type': 'application/json',
},
})
return response
}
这样最终版本就完成了。这个版本优势我觉得在于还可以进一步封装来支持其他协议的流,做到真正通用的处理流数据的方法。