In diesem Beitrag geht es um die Arbeit mit der JavaScript Streams API, die es ermöglicht, einen Fetch-HTTP-Aufruf durchzuführen und eine Streaming-Antwort in Blöcken zu empfangen, wodurch ein Client mehr auf eine Serverantwort reagieren kann schnell und erstellen Sie Benutzeroberflächen wie ChatGPT.
Als motivierendes Beispiel implementieren wir eine Funktion, um die Streaming-LLM-Antwort von OpenAI (oder einem beliebigen Server, der dieselbe http-Streaming-API verwendet) zu verarbeiten, ohne npm-Abhängigkeiten – nur den integrierten Abruf. Der vollständige Code ist hier, einschließlich Wiederholungsversuchen mit exponentiellem Backoff, Einbettungen, Nicht-Streaming-Chat und einfacheren APIs für die Interaktion mit Chat-Abschlüssen und Einbettungen.
Wenn Sie wissen möchten, wie Sie auch einen HTTP-Stream an Clients zurückgeben können, schauen Sie sich diesen Beitrag an.
Hier ist das vollständige Beispiel. Wir werden uns jedes Stück unten ansehen:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
Sehen Sie sich den Code hier für eine Version an, die schöne typisierte Überladungen für Streaming- und Nicht-Streaming-Parametervarianten sowie Wiederholungsversuche und andere Verbesserungen bietet.
Im Rest des Beitrags geht es darum, zu verstehen, was dieser Code bewirkt.
Dieser Teil ist eigentlich sehr einfach. Eine Streaming-HTTP-Antwort kommt von einer normalen HTTP-Anfrage:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
Die HTTP-Header werden wie gewohnt gesendet und es müssen keine besonderen Einstellungen vorgenommen werden, um Streaming zu ermöglichen. Und Sie können weiterhin normale Caching-Header für HTTP-Streaming nutzen.
Die Geschichte rund um Fehler auf der Clientseite ist für HTTP-Streaming etwas unglücklich. Der Vorteil besteht darin, dass der Client beim HTTP-Streaming sofort in der ersten Antwort Statuscodes erhält und dort einen Fehler erkennen kann. Der Nachteil des http-Protokolls besteht darin, dass, wenn der Server Erfolg meldet, dann aber mitten im Stream abbricht, es auf Protokollebene nichts gibt, was dem Client mitteilt, dass der Stream unterbrochen wurde. Wir werden unten sehen, wie OpenAI am Ende einen „Alles erledigt“-Sentinel codiert, um dieses Problem zu umgehen.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
Um eine HTTP-Streaming-Antwort zu lesen, kann der Client die Eigenschaft „response.body“ verwenden, bei der es sich um einen ReadableStream handelt, der es Ihnen ermöglicht, mit der Methode .getReader() über die vom Server eingehenden Blöcke zu iterieren. 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
Dies verarbeitet jedes Datenbit, das wir zurückerhalten, aber für das OpenAI-HTTP-Protokoll erwarten wir, dass die Daten durch Zeilenumbrüche getrennt im JSON-Format vorliegen. Stattdessen teilen wir den Antworttext auf und „ergeben“ jede Zeile so, wie sie ist. wieder abgeschlossen. Wir puffern die in Bearbeitung befindliche Zeile in lastFragment und geben nur vollständige Zeilen zurück, die durch zwei Zeilenumbrüche getrennt wurden:
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
Wenn Ihnen diese Funktions*- und Yield-Syntax nicht vertraut ist, betrachten Sie Funktion* einfach als eine Funktion, die mehrere Dinge in einer Schleife zurückgeben kann, und Yield als die Möglichkeit, etwas mehrmals von einer Funktion zurückzugeben.
Sie können diese SplitStream-Funktion dann wie folgt durchlaufen:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
Wenn Sie diese „for-await“-Syntax abschreckt, verwendet sie einen sogenannten „asynchronen Iterator“ – wie einen regulären Iterator, den Sie mit einer for-Schleife verwenden würden, aber jedes Mal, wenn er den nächsten Wert erhält, wird darauf gewartet.
Wenn wir in unserem Beispiel Text von OpenAI erhalten haben und auf weiteren Text warten, wartet die for-Schleife, bis splitStream einen anderen Wert liefert eine oder mehrere Textzeilen.
Als nächstes schauen wir uns eine andere Möglichkeit an, einen asynchronen Iterator zurückzugeben, der keine Funktion wie „splitStream“ ist, sodass ein Aufrufer eine „for-await“-Schleife verwenden kann, um über diese Daten zu iterieren.
Da wir nun einen asynchronen Iterator haben, der vollständige Textzeilen zurückgibt, könnten wir einfach splitStream(response.body) zurückgeben, aber wir möchten jede der Zeilen abfangen und transformieren, während wir dem Aufrufer unserer Funktion weiterhin die Iteration ermöglichen .
Der Ansatz ähnelt der oben genannten Syntax der asynchronen Funktion*. Hier geben wir direkt einen asynchronen Iterator zurück, anstelle einer asynchronen Funktion, die einen zurückgibt, wenn sie aufgerufen wird. Der Unterschied besteht darin, dass der Typ AsyncIterator und nicht AsyncGenerator ist, der zuerst aufgerufen werden muss. Ein AsyncIterator kann durch eine bestimmte benannte Funktion definiert werden: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
Dies ist nützlich, wenn Sie etwas anderes als die von splitStream stammenden Daten zurückgeben möchten. Jedes Mal, wenn eine neue Zeile von der Streaming-HTTP-Anfrage eingeht, gibt splitStream diese aus. Diese Funktion empfängt sie in Daten und kann etwas tun, bevor sie sie an ihren Aufrufer weitergibt.
Als nächstes schauen wir uns an, wie diese Daten speziell im Fall der Streaming-Chat-Vervollständigungs-API von OpenAI interpretiert werden.
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
Das obige ist der detaillierte Inhalt vonStreaming von HTTP-Antworten mithilfe von Fetch. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!