这篇文章将着眼于使用 JavaScript Streams API,它允许进行 fetch HTTP 调用并以块的形式接收流响应,这允许客户端开始更多地响应服务器响应快速构建像 ChatGPT 这样的 UI。
作为一个激励性的示例,我们将实现一个函数来处理来自 OpenAI(或任何使用相同 http 流 API 的服务器)的流 LLM 响应,不使用 npm 依赖项,仅使用内置的 fetch。完整的代码在这里,包括指数退避重试、嵌入、非流式聊天以及用于与聊天完成和嵌入交互的更简单的 API。
如果您有兴趣了解如何将 HTTP 流返回给客户端,请查看这篇文章。
这是完整的示例。我们将看看下面的每一个部分:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
请参阅此处的代码,了解具有流式和非流式参数变体的良好类型重载的版本,以及重试和其他改进。
帖子的其余部分是关于理解这段代码的作用。
这部分其实很简单。流式 HTTP 响应来自普通的 HTTP 请求:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
HTTP 标头按平常方式发送,无需特别设置任何内容即可启用流式传输。您仍然可以利用常规缓存标头进行 HTTP 流式传输。
关于客户端错误的故事对于 HTTP 流来说有点不幸。好处是,对于 HTTP 流式传输,客户端会在初始响应中立即获取状态代码,并可以检测到故障。 http 协议的缺点是,如果服务器返回成功,但随后在流中中断,则协议级别没有任何内容可以告诉客户端流已中断。我们将在下面看到 OpenAI 如何在最后编码“全部完成”哨兵来解决这个问题。
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
为了读取 HTTP 流响应,客户端可以使用 response.body 属性,该属性是一个 ReadableStream,允许您使用 .getReader() 方法迭代从服务器传入的块。 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
这会处理我们返回的每一位数据,但对于 OpenAI HTTP 协议,我们期望数据是由换行符分隔的 JSON,因此我们将拆分响应正文并按每行的形式“生成”它们重新完成。我们将正在进行的行缓冲到lastFragment中,并且只返回由两个换行符分隔的完整行:
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
如果你不熟悉这个function*和yield语法,只需将function*视为可以循环返回多个内容的函数,而将yield视为从函数中多次返回内容的方式。
然后你可以循环这个 splitStream 函数,例如:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
如果这个“for wait”语法让您感到困惑,那么它正在使用所谓的“异步迭代器” - 就像您在 for 循环中使用的常规迭代器一样,但每次它获取下一个值时,都会等待它。
对于我们的示例,当我们从 OpenAI 获取一些文本并且正在等待更多文本时,for 循环将等待直到 splitStream 产生另一个值,这将在 wait reader.read() 返回一个完成的值时发生一行或多行文本。
接下来我们将研究返回异步迭代器的另一种方法,该迭代器不是 splitStream 等函数,因此调用者可以使用“for wait”循环来迭代此数据。
现在我们有一个返回整行文本的异步迭代器,我们可以只返回 splitStream(response.body),但我们希望拦截每一行并转换它们,同时仍然让函数的调用者迭代。
该方法类似于上面的 async function* 语法。这里我们将直接返回一个异步迭代器,而不是调用时返回一个的异步函数。不同之处在于类型是 AsyncIterator 而不是需要首先调用的 AsyncGenerator。 AsyncIterator 可以通过特定的命名函数来定义:Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
当您想要返回与来自 splitStream 的数据不同的内容时,这非常有用。每次从流式 HTTP 请求中传入新行时,splitStream 都会生成它,该函数将在数据中接收它,并可以在将其生成给调用者之前执行一些操作。
接下来我们将了解如何在 OpenAI 的流式聊天完成 API 的情况下具体解释这些数据。
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
以上是使用 fetch 流式传输 HTTP 响应的详细内容。更多信息请关注PHP中文网其他相关文章!