この投稿では、フェッチ HTTP 呼び出しを実行し、ストリーミング応答をチャンクで受信できるようにする JavaScript Streams API の操作について説明します。これにより、クライアントはサーバー応答への応答を開始できるようになります。 ChatGPT のような UI をすばやく構築できます。
やる気を起こさせる例として、npm 依存関係を使用せず、組み込みフェッチのみを使用して、OpenAI (または同じ http ストリーミング API を使用するサーバー) からのストリーミング LLM 応答を処理する関数を実装します。指数バックオフによる再試行、埋め込み、非ストリーミング チャット、チャットの完了と埋め込みを操作するためのより単純な API を含む完全なコードはここにあります。
クライアントに HTTP ストリームを返す方法にも興味がある場合は、この投稿を参照してください。
これが完全な例です。以下で各部分を見ていきます:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
ストリーミングおよび非ストリーミング パラメーターのバリアントに対する優れた型付きオーバーロード、再試行およびその他の改善を備えたバージョンについては、ここのコードを参照してください。
投稿の残りの部分は、このコードの機能を理解することについてです。
この部分は実際にはとても簡単です。ストリーミング HTTP 応答は、通常の HTTP 要求から来ます:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
HTTP ヘッダーは通常どおり送信されるため、ストリーミングを有効にするために特に何も設定する必要はありません。また、HTTP ストリーミングでは通常のキャッシュ ヘッダーを引き続き利用できます。
クライアント側のエラーに関する話は、HTTP ストリーミングにとって少し残念です。 HTTP ストリーミングの利点は、クライアントが最初の応答でステータス コードをすぐに取得し、そこで障害を検出できることです。 http プロトコルの欠点は、サーバーが成功を返してもストリームの途中で中断した場合、ストリームが中断されたことをクライアントに伝えるものがプロトコル レベルで何もないことです。これを回避するために、OpenAI が最後に「すべて完了」センチネルをエンコードする方法を以下で見ていきます。
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
HTTP ストリーミング応答を読み取るために、クライアントは ReadableStream であるresponse.body プロパティを使用できます。これにより、.getReader() メソッドを使用してサーバーから受信したチャンクを反復処理できます。 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
これは返されるデータのあらゆるビットを処理しますが、OpenAI HTTP プロトコルの場合、データは改行で区切られた JSON であることが期待されるため、代わりに応答本文を分割し、各行をそのまま「出力」します。再完成しました。進行中の行を lastFragment にバッファリングし、2 つの改行で区切られた完全な行のみを返します:
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
この function* と yield 構文に慣れていない場合は、function* をループ内で複数のものを返すことができる関数として扱い、yield を関数から複数回何かを返す方法として扱ってください。
次に、この SplitStream 関数を次のようにループできます。
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
この「for await」構文が気になる場合は、「非同期イテレータ」と呼ばれるものが使用されています。for ループで使用する通常のイテレータのようなものですが、次の値を取得するたびに待機されます。
この例では、OpenAI からテキストを取得し、さらに待機している場合、for ループは、splitStream が別の値を生成するまで待機します。これは、await Reader.read() が終了値を返したときに発生します。 1 行以上のテキスト。
次に、splitStream のような関数ではない非同期イテレータを返す別の方法を見ていきます。これにより、呼び出し元は「for await」ループを使用してこのデータを反復処理できます。
完全なテキスト行を返す非同期反復子ができたので、splitStream(response.body) を返すだけで済みますが、関数の呼び出し元に反復処理をさせながら、各行をインターセプトして変換したいと考えています。 。
このアプローチは、上記の async function* 構文と似ています。ここでは、呼び出されたときに非同期イテレータを返す非同期関数の代わりに、非同期イテレータを直接返します。違いは、型が AsyncGenerator ではなく AsyncIterator であり、最初に呼び出す必要があることです。 AsyncIterator は、特定の名前付き関数 Symbol.asyncIterator.2
を使用して定義できます。return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
これは、splitStream からのデータとは異なるものを返したい場合に便利です。ストリーミング HTTP リクエストから新しい行が入るたびに、splitStream がそれを生成し、この関数がそれをデータで受け取り、呼び出し元に渡す前に何かを行うことができます。
次に、OpenAI のストリーミング チャット完了 API の場合に特にこのデータを解釈する方法を見ていきます。
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
以上がフェッチを使用した HTTP 応答のストリーミングの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。