Cet article examinera l'utilisation de l'API JavaScript Streams qui permet d'effectuer un appel HTTP de récupération et de recevoir une réponse en streaming par morceaux, ce qui permet à un client de commencer à répondre davantage à une réponse du serveur. rapidement et créez des interfaces utilisateur comme ChatGPT.
À titre d'exemple motivant, nous implémenterons une fonction pour gérer la réponse LLM en streaming d'OpenAI (ou de tout serveur utilisant la même API de streaming http), en n'utilisant aucune dépendance npm, juste la récupération intégrée. Le code complet est ici, y compris les tentatives avec interruption exponentielle, les intégrations, le chat sans streaming et des API plus simples pour interagir avec les complétions et les intégrations de chat.
Si vous souhaitez savoir comment renvoyer également un flux HTTP aux clients, consultez cet article.
Voici l’exemple complet. Nous examinerons chaque pièce ci-dessous :
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
Voir le code ici pour une version qui comporte de belles surcharges typées pour les variantes de paramètres de streaming et non-streaming, ainsi que des tentatives et d'autres améliorations.
Le reste de l'article concerne la compréhension de ce que fait ce code.
Cette partie est en fait très simple. Une réponse HTTP en streaming provient d'une requête HTTP normale :
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
Les en-têtes HTTP sont envoyés comme d'habitude et ne nécessitent rien de particulier pour activer le streaming. Et vous pouvez toujours exploiter les en-têtes de mise en cache classiques pour le streaming HTTP.
L'histoire des erreurs côté client est un peu malheureuse pour le streaming HTTP. L'avantage est que pour le streaming HTTP, le client obtient immédiatement les codes d'état dans la réponse initiale et peut y détecter un échec. L'inconvénient du protocole http est que si le serveur renvoie un succès mais s'interrompt ensuite en cours de flux, rien au niveau du protocole n'indiquera au client que le flux a été interrompu. Nous verrons ci-dessous comment OpenAI encode une sentinelle « tout est fait » à la fin pour contourner ce problème.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
Afin de lire une réponse de streaming HTTP, le client peut utiliser la propriété Response.body qui est un ReadableStream vous permettant de parcourir les morceaux à mesure qu'ils arrivent du serveur à l'aide de la méthode .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
Cela gère chaque bit de données que nous récupérons, mais pour le protocole HTTP OpenAI, nous nous attendons à ce que les données soient en JSON séparées par des nouvelles lignes, nous allons donc diviser le corps de la réponse et « céder » chaque ligne au fur et à mesure. re terminé. Nous tamponnons la ligne en cours dans lastFragment et ne renvoyons que les lignes complètes séparées par deux nouvelles lignes :
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
Si cette fonction* et cette syntaxe de rendement ne vous sont pas familières, traitez simplement function* comme une fonction qui peut renvoyer plusieurs éléments dans une boucle, et rendement comme un moyen de renvoyer quelque chose plusieurs fois à partir d'une fonction.
Vous pouvez ensuite parcourir cette fonction splitStream comme :
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
Si cette syntaxe "for wait" vous déstabilise, elle utilise ce qu'on appelle un "itérateur asynchrone" - comme un itérateur normal que vous utiliseriez avec une boucle for, mais chaque fois qu'il obtient la valeur suivante, il est attendu.
Pour notre exemple, lorsque nous avons obtenu du texte d'OpenAI et que nous attendons plus, la boucle for attendra que splitStream donne une autre valeur, ce qui se produira lorsque wait reader.read() retournera une valeur qui se termine une ou plusieurs lignes de texte.
Ensuite, nous examinerons une autre façon de renvoyer un itérateur asynchrone qui n'est pas une fonction comme splitStream, afin qu'un appelant puisse utiliser une boucle « for wait » pour parcourir ces données.
Maintenant que nous avons un itérateur asynchrone renvoyant des lignes complètes de texte, nous pourrions simplement renvoyer splitStream(response.body), mais nous voulons intercepter chacune des lignes et les transformer, tout en laissant l'appelant de notre fonction itérer .
L'approche est similaire à la syntaxe de la fonction asynchrone* ci-dessus. Ici, nous renverrons directement un itérateur asynchrone, au lieu d'une fonction asynchrone qui en renvoie un lorsqu'elle est appelée. La différence est que le type est AsyncIterator au lieu de AsyncGenerator qui doit être appelé en premier. Un AsyncIterator peut être défini en ayant une certaine fonction nommée : Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
Ceci est utile lorsque vous souhaitez renvoyer quelque chose de différent des données provenant de splitStream. Chaque fois qu'une nouvelle ligne provient de la requête HTTP de streaming, splitStream la donnera, cette fonction la recevra sous forme de données et pourra faire quelque chose avant de la donner à son appelant.
Nous verrons ensuite comment interpréter ces données spécifiquement dans le cas de l'API de complétion de chat en streaming d'OpenAI.
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!