前端如何处理流式数据(SSE)

Apr 28 · 12min

开始之前…

最近又接到一个模块的需求,起初是需要跟后端对接ai对话的api,这是最初版的,当时还是用的其他的返回格式 application/x-ndjson

但是由于最近的deepseek,后端又将ai换成了deepseek的了,deepseek的返回格式是 text/event-stream,用的协议是 SSE (Server-Sent Events)

返回数据格式

可以参考deepseek api文档,可知我们需要处理的数据的格式是怎样的:

data: {"id": "1f633d8bfc032625086f14113c411638", "choices": [{"index": 0, "delta": {"content": "", "role": "assistant"}, "finish_reason": null, "logprobs": null}], "created": 1718345013, "model": "deepseek-chat", "system_fingerprint": "fp_a49d71b8a1", "object": "chat.completion.chunk", "usage": null}

data: {"choices": [{"delta": {"content": "Hello", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": "!", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": " How", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": " can", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": " I", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": " assist", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": " you", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": " today", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": "?", "role": "assistant"}, "finish_reason": null, "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1"}

data: {"choices": [{"delta": {"content": "", "role": null}, "finish_reason": "stop", "index": 0, "logprobs": null}], "created": 1718345013, "id": "1f633d8bfc032625086f14113c411638", "model": "deepseek-chat", "object": "chat.completion.chunk", "system_fingerprint": "fp_a49d71b8a1", "usage": {"completion_tokens": 9, "prompt_tokens": 17, "total_tokens": 26}}

data: [DONE]

TIP

要注意的是每一条数据之间都是有\n\n分隔的,标准SSE事件,每条事件就是用’\n\n’分隔的。这个信息在后面处理数据时有用。

处理方法

其实这个在网上搜索就知道了,我也看过,很多答案都是用一个前端的API EventSource 来处理的,或者就是用 event-source 的改库 event-source-polyfill

EventSource 限制在于只能使用 'GET' 方法,请求需要的参数都只能明文传输。

event-source-polyfill 虽然是基于(XHR)的版本,理论上我们是可以改造xhr来发post请求的。但是为了兼容SSE协议规范,还是会默认使用get方法;另外就算可以模拟post请求,但也已经偏离了标准SSE,相当于自己造轮子了。

我的原则是还是少用点库,有些能自己解决就更好。

所有我还是用常规请求,用fetch和axios都可以,但是fetch是原生支持SSE的,用axios需要指定下适配器 adapter

代码实现

请求方法

先找你的后端拿到对应的接口地址先。

// const params = {}

const response = await fetch('/v1/chat/completions', {
  method: 'POST',
  body: JSON.stringify(params), // 需要序列化
  headers: {
    'Content-Type': 'application/json',
  },
})
axios版
const response = await axios.post('/v1/chat/completions', params, {
  responseType: 'stream', // axios 需要指定响应格式
  adapter: 'fetch'
})

解析流数据

返回的数据打印你看不到内容,只能知道它是一个 ReadableStream。最简单的处理方式就是:

const reader = response.body.getReader()

let data = ''

while (true) {
  const { done, value } = await reader.read()
  if (done)
    break

  data += new TextDecoder().decode(value)
}

这就是一个常用的办法来处理的。但是吧,这应该是针对后端已经帮你处理好了ai api返回的数据,能直接使用的。如果是原始的数据格式,那还需要我们自己处理一下。

TIP

如果是 axios 那么是通过 response.data

接下来我还是用更加完善的另一种方法实现数据处理。

在mdn文档中还看得到另外一种处理流的方法,就是异步的for循环:

for await (const chunk of response.body) {
  // ...
}

现在是需要将 response.body 改造成遍历后能直接使用的数据,而不是这一串字符串。

data: {"id": "1f633d8bfc032625086f14113c411638", "choices": [{"index": 0, "delta": {"content": "", "role": "assistant"}, "finish_reason": null, "logprobs": null}], "created": 1718345013, "model": "deepseek-chat", "system_fingerprint": "fp_a49d71b8a1", "object": "chat.completion.chunk", "usage": null}

需要先了解几个方法:

  • TextDecoderStream

这个方法跟TextDecoder是类似的,不同就在于TextCoder是一次性拿到完整二进制数据解析成文本;而TextDecoderStream则是实时将数据流转成文本流(边传边解析)。不过TextDecoder也可以实现,只不过需要{ stream: true }手动模拟流。

  • pipeThrough(transformStream, option)

提供将当前流管道输出到一个转换(transform)流或可写/可读流对的链式方法。

就是把一个流的数据通过转换流处理一下,输出新的流。通俗讲就是对流的数据边收边改、边流边处理。 跟TextDecoder的read和write类似,只不过更方便更现代化。

  • new TransformStream({})

这个就是pipeThrough需要用到的转换流对象,它包含了transform(每一段流的处理过程)和flush(接收完所有流后的收尾工作)

具体实现

function transformStream(readableStream) {
  const decoderStream = new TextDeCoderStream()

  const stream = readableStream
    .pipeThrough(decoderStream)
    .pipeThrough(function () {
      let buffer = ''
      return new TransformStream({
        transform(streamChunk, controller) {
          buffer += streamChunk

          const parts = buffer.split('\n\n')
          parts.slice(0, -1).forEach((part) => {
            controller.enqueue(part)
          })

          buffer = parts[parts.length - 1]
        },
        flush(controller) {
          if ((buffer ?? '').trim() !== '') {
            controller.enqueue(buffer)
          }
        }
      })
    }())
    .pipeThrough(new TransformStream({
      transform(chunk, controller) {
        const lines = chunk.split('\n')
        const sseEvent = lines.reduce((acc, line) => {
          const separatorIndex = line.indexOf(':')
          if (separatorIndex === -1) {
            throw new Error('The key-value separator ":" is not found in the sse line chunk!')
          }

          const key = line.slice(0, separatorIndex)
          const value = line.slice(separatorIndex + 1)

          return {
            ...acc,
            [key]: value
          }
        }, {})

        if (Object.keys(sseEvent).length === 0)
          return

        controller.enqueue(sseEvent)
      }
    }))

  return stream
}

步骤解析

  1. 第一个 pipeThrough()

这个步骤目的是将原始的二进制流转成字符串流。这个二进制流你可以通过浏览器F12,在network栏查看流请求的二进制数据。

Uint8Array([100, 97, 116, 97, 58, 32, 123, ...]) => "data: {\"id\": \"1f633...\"}"

  1. 第二个 pipeThrough()

这个步骤目的是将完整的字符串切成每条独立的SSE消息。 最后一条消息也同样是有\n\n分隔符的,所以最后一项一定是空字符串,所以处理的时候要排除掉最后一项的空字符串。

  1. 第三个 pipeThrough()

这个步骤目的是将每条SSE消息字符串解析成一个对象。 因为每条SSE消息可能会包含多个data行的,它们通过’\n’分隔。拆分后,再通过reduce方法或其他办法处理成一个对象 { data: "{...}" }

最后在异步for循环中遍历得到的 chunk 就是处理后的数据: { data: "{...}" }

TIP

其实还可以最后一层的transform()处理一下,把data的值解析成一个对象

const parsed = JSON.parse(sseEvent.data)

// controller.enqueue(sseEvent) 改成
controller.enqueue(parsed)

处理解析后的结果

let fullContent = ''

for await (const chunk of transformStream(response.body)) {
  const { data } = chunk

  // 返回的字符串流,它们可能会包含一个空格符在前面 eg: "id": "1f633d8bfc032625086f14113c411638"
  if (data.trim() === '[DONE]')
    break

  const parsed = JSON.parse(data)
  const content = parse.choices[0].delta.content ?? '' // 具体格式看你请求具体返回

  fullContent += content
}

这就可以逐步拿到整个内容了!

代码优化

这样看着很冗长,我们可以拆分业务到单独的方法中去(这里我把三个方法拆分开看这方便一点):

async function handleStream() {
  const params = {}

  const response = await fetchStream(params)

  for await (const chunk of transformStream(response.body)) {
    // ...
  }
}

function transformStream(readablesStream) {
  const decoderStream = new TextCoderStream()

  const stream = readableStream
    .pipeThrough(decoderStream)
    .pipeThrough(splitStream())
    .pipeThrough(splitParts())

  return stream
}
function splitStream() {
  let buffer = ''
  return new TransformStream({
    transform(streamChunk, controller) {
      buffer += streamChunk

      const parts = buffer.split('\n\n')
      parts.slice(0, -1).forEach((part) => {
        controller.enqueue(part)
      })

      buffer = parts[parts.length - 1]
    },
    flush(controller) {
      if ((buffer ?? '').trim() !== '') {
        controller.enqueue(buffer)
      }
    }
  })
}
function splitParts() {
  return new TransformStream({
    transform(chunk, controller) {
      const lines = chunk.split('\n')
      const sseEvent = lines.reduce((acc, line) => {
        const separatorIndex = line.indexOf(':')
        if (separatorIndex === -1) {
          throw new Error('The key-value separator ":" is not found in the sse line chunk!')
        }

        const key = line.slice(0, separatorIndex)
        const value = line.slice(separatorIndex + 1)

        return {
          ...acc,
          [key]: value
        }
      }, {})

      if (Object.keys(sseEvent).length === 0)
        return

      controller.enqueue(sseEvent)
    }
  })
}
async function fetchStream(params) {
  const response = await fetch('/v1/chat/completions', {
    method: 'POST',
    body: JSON.stringify(params), // 需要序列化
    headers: {
      'Content-Type': 'application/json',
    },
  })

  return response
}

这样最终版本就完成了。这个版本优势我觉得在于还可以进一步封装来支持其他协议的流,做到真正通用的处理流数据的方法。

CC BY-NC-SA 4.0 2023-PRESENT © Leet