首页 > web前端 > js教程 > 使用 fetch 流式传输 HTTP 响应

使用 fetch 流式传输 HTTP 响应

王林
发布: 2024-07-24 11:35:41
原创
586 人浏览过

Streaming HTTP Responses using fetch

这篇文章将着眼于使用 JavaScript Streams API,它允许进行 fetch HTTP 调用并以块的形式接收流响应,这允许客户端开始更多地响应服务器响应快速构建像 ChatGPT 这样的 UI。

作为一个激励性的示例,我们将实现一个函数来处理来自 OpenAI(或任何使用相同 http 流 API 的服务器)的流 LLM 响应,不使用 npm 依赖项,仅使用内置的 fetch。完整的代码在这里,包括指数退避重试、嵌入、非流式聊天以及用于与聊天完成和嵌入交互的更简单的 API。

如果您有兴趣了解如何将 HTTP 流返回给客户端,请查看这篇文章。

完整示例代码

这是完整的示例。我们将看看下面的每一个部分:

async function createChatCompletion(body: ChatCompletionCreateParams) {
  // Making the request
  const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
  const response = await fetch(baseUrl + "/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Authorization": "Bearer " + process.env.LLM_API_KEY,
    },
    body: JSON.stringify(body),
  });
  // Handling errors
  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed (${response.status}): ${error}`,
  }
  if (!body.stream) { // the non-streaming case
    return response.json();
  }
  const stream = response.body;
  if (!stream) throw new Error("No body in response");
  // Returning an async iterator
  return {
    [Symbol.asyncIterator]: async function* () {
      for await (const data of splitStream(stream)) {
        // Handling the OpenAI HTTP streaming protocol
        if (data.startsWith("data:")) {
          const json = data.substring("data:".length).trimStart();
          if (json.startsWith("[DONE]")) {
            return;
          }
          yield JSON.parse(json);
        }
      }
    },
  };
}

// Reading the stream  
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}
登录后复制

请参阅此处的代码,了解具有流式和非流式参数变体的良好类型重载的版本,以及重试和其他改进。

帖子的其余部分是关于理解这段代码的作用。

提出请求

这部分其实很简单。流式 HTTP 响应来自普通的 HTTP 请求:

const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Authorization": "Bearer " + process.env.LLM_API_KEY,
  },
  body: JSON.stringify(body),
});
登录后复制

HTTP 标头按平常方式发送,无需特别设置任何内容即可启用流式传输。您仍然可以利用常规缓存标头进行 HTTP 流式传输。

处理错误

关于客户端错误的故事对于 HTTP 流来说有点不幸。好处是,对于 HTTP 流式传输,客户端会在初始响应中立即获取状态代码,并可以检测到故障。 http 协议的缺点是,如果服务器返回成功,但随后在流中中断,则协议级别没有任何内容可以告诉客户端流已中断。我们将在下面看到 OpenAI 如何在最后编码“全部完成”哨兵来解决这个问题。

if (!response.ok) {
  const error = await response.text();
  throw new Error(`Failed (${response.status}): ${error}`,
}
登录后复制

读取流

为了读取 HTTP 流响应,客户端可以使用 response.body 属性,该属性是一个 ReadableStream,允许您使用 .getReader() 方法迭代从服务器传入的块。 1

const reader = request.body.getReader();
try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      const text = TextDecoder().decode(value);
      //... do something with the chunk
    }
} finally {
  reader.releaseLock();
}
登录后复制

这会处理我们返回的每一位数据,但对于 OpenAI HTTP 协议,我们期望数据是由换行符分隔的 JSON,因此我们将拆分响应正文并按每行的形式“生成”它们重新完成。我们将正在进行的行缓冲到lastFragment中,并且只返回由两个换行符分隔的完整行:

// stream here is request.body
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}
登录后复制

如果你不熟悉这个function*和yield语法,只需将function*视为可以循环返回多个内容的函数,而将yield视为从函数中多次返回内容的方式。

然后你可以循环这个 splitStream 函数,例如:

for await (const data of splitStream(response.body)) {
  // data here is a full line of text. For OpenAI, it might look like
  // "data: {...some json object...}" or "data: [DONE]" at the end
}
登录后复制

如果这个“for wait”语法让您感到困惑,那么它正在使用所谓的“异步迭代器” - 就像您在 for 循环中使用的常规迭代器一样,但每次它获取下一个值时,都会等待它。

对于我们的示例,当我们从 OpenAI 获取一些文本并且正在等待更多文本时,for 循环将等待直到 splitStream 产生另一个值,这将在 wait reader.read() 返回一个完成的值时发生一行或多行文本。

接下来我们将研究返回异步迭代器的另一种方法,该迭代器不是 splitStream 等函数,因此调用者可以使用“for wait”循环来迭代此数据。

返回一个异步迭代器

现在我们有一个返回整行文本的异步迭代器,我们可以只返回 splitStream(response.body),但我们希望拦截每一行并转换它们,同时仍然让函数的调用者迭代。

该方法类似于上面的 async function* 语法。这里我们将直接返回一个异步迭代器,而不是调用时返回一个的异步函数。不同之处在于类型是 AsyncIterator 而不是需要首先调用的 AsyncGenerator。 AsyncIterator 可以通过特定的命名函数来定义:Symbol.asyncIterator.2

      return {
        [Symbol.asyncIterator]: async function* () {
          for await (const data of splitStream(stream)) {
            //handle the data
            yield data;
          }
        },
      };
登录后复制

当您想要返回与来自 splitStream 的数据不同的内容时,这非常有用。每次从流式 HTTP 请求中传入新行时,sp​​litStream 都会生成它,该函数将在数据中接收它,并可以在将其生成给调用者之前执行一些操作。

接下来我们将了解如何在 OpenAI 的流式聊天完成 API 的情况下具体解释这些数据。

Handling the OpenAI HTTP streaming protocol

The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.

for await (const data of splitStream(stream)) {
  if (data.startsWith("data:")) {
    const json = data.substring("data:".length).trimStart();
    if (json.startsWith("[DONE]")) {
      return;
    }
    yield JSON.parse(json);
  } else {
    console.debug("Unexpected data:", data);
  }
}
登录后复制

Bringing it all together

Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:

  const response = await createChatCompletion({
    model: "llama3",
    messages: [...your messages...],
    stream: true,
  });
  for await (const chunk of response) {
    if (chunk.choices[0].delta?.content) {
      console.log(chunk.choices[0].delta.content);
    }
  }
登录后复制

See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:

const response = await chatStream(messages);
for await (const content of response) {
  console.log(content);
}
登录后复制

Before you copy the code, try to implement it yourself as an exercise in async functions.

More resources

  • For information about returning HTTP streaming data from your own server endpoint, check out this post on AI Chat with HTTP Streaming that both streams data from OpenAI (or similar) to your server and simultaneously streams it down to a client, while doing custom logic as it goes (such as saving chunks to a database).
  • The MDN docs, as always, are great. Beyond the links above, here’s a guide on the readable streams API that shows how to connect a readable stream to an tag to stream in an image request. Note: this guide uses response.body as an async iterator, but currently that is not widely implemented and not in the TypeScript types.
    1. Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩

    2. Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩

    以上是使用 fetch 流式传输 HTTP 响应的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板