Maison > interface Web > js tutoriel > Diffusion de réponses HTTP à l'aide de fetch

Diffusion de réponses HTTP à l'aide de fetch

王林
Libérer: 2024-07-24 11:35:41
original
588 Les gens l'ont consulté

Streaming HTTP Responses using fetch

Cet article examinera l'utilisation de l'API JavaScript Streams qui permet d'effectuer un appel HTTP de récupération et de recevoir une réponse en streaming par morceaux, ce qui permet à un client de commencer à répondre davantage à une réponse du serveur. rapidement et créez des interfaces utilisateur comme ChatGPT.

À titre d'exemple motivant, nous implémenterons une fonction pour gérer la réponse LLM en streaming d'OpenAI (ou de tout serveur utilisant la même API de streaming http), en n'utilisant aucune dépendance npm, juste la récupération intégrée. Le code complet est ici, y compris les tentatives avec interruption exponentielle, les intégrations, le chat sans streaming et des API plus simples pour interagir avec les complétions et les intégrations de chat.

Si vous souhaitez savoir comment renvoyer également un flux HTTP aux clients, consultez cet article.

Exemple de code complet

Voici l’exemple complet. Nous examinerons chaque pièce ci-dessous :

async function createChatCompletion(body: ChatCompletionCreateParams) {
  // Making the request
  const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
  const response = await fetch(baseUrl + "/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Authorization": "Bearer " + process.env.LLM_API_KEY,
    },
    body: JSON.stringify(body),
  });
  // Handling errors
  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed (${response.status}): ${error}`,
  }
  if (!body.stream) { // the non-streaming case
    return response.json();
  }
  const stream = response.body;
  if (!stream) throw new Error("No body in response");
  // Returning an async iterator
  return {
    [Symbol.asyncIterator]: async function* () {
      for await (const data of splitStream(stream)) {
        // Handling the OpenAI HTTP streaming protocol
        if (data.startsWith("data:")) {
          const json = data.substring("data:".length).trimStart();
          if (json.startsWith("[DONE]")) {
            return;
          }
          yield JSON.parse(json);
        }
      }
    },
  };
}

// Reading the stream  
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}
Copier après la connexion

Voir le code ici pour une version qui comporte de belles surcharges typées pour les variantes de paramètres de streaming et non-streaming, ainsi que des tentatives et d'autres améliorations.

Le reste de l'article concerne la compréhension de ce que fait ce code.

Faire la demande

Cette partie est en fait très simple. Une réponse HTTP en streaming provient d'une requête HTTP normale :

const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Authorization": "Bearer " + process.env.LLM_API_KEY,
  },
  body: JSON.stringify(body),
});
Copier après la connexion

Les en-têtes HTTP sont envoyés comme d'habitude et ne nécessitent rien de particulier pour activer le streaming. Et vous pouvez toujours exploiter les en-têtes de mise en cache classiques pour le streaming HTTP.

Gestion des erreurs

L'histoire des erreurs côté client est un peu malheureuse pour le streaming HTTP. L'avantage est que pour le streaming HTTP, le client obtient immédiatement les codes d'état dans la réponse initiale et peut y détecter un échec. L'inconvénient du protocole http est que si le serveur renvoie un succès mais s'interrompt ensuite en cours de flux, rien au niveau du protocole n'indiquera au client que le flux a été interrompu. Nous verrons ci-dessous comment OpenAI encode une sentinelle « tout est fait » à la fin pour contourner ce problème.

if (!response.ok) {
  const error = await response.text();
  throw new Error(`Failed (${response.status}): ${error}`,
}
Copier après la connexion

Lire le flux

Afin de lire une réponse de streaming HTTP, le client peut utiliser la propriété Response.body qui est un ReadableStream vous permettant de parcourir les morceaux à mesure qu'ils arrivent du serveur à l'aide de la méthode .getReader(). 1

const reader = request.body.getReader();
try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      const text = TextDecoder().decode(value);
      //... do something with the chunk
    }
} finally {
  reader.releaseLock();
}
Copier après la connexion

Cela gère chaque bit de données que nous récupérons, mais pour le protocole HTTP OpenAI, nous nous attendons à ce que les données soient en JSON séparées par des nouvelles lignes, nous allons donc diviser le corps de la réponse et « céder » chaque ligne au fur et à mesure. re terminé. Nous tamponnons la ligne en cours dans lastFragment et ne renvoyons que les lignes complètes séparées par deux nouvelles lignes :

// stream here is request.body
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}
Copier après la connexion

Si cette fonction* et cette syntaxe de rendement ne vous sont pas familières, traitez simplement function* comme une fonction qui peut renvoyer plusieurs éléments dans une boucle, et rendement comme un moyen de renvoyer quelque chose plusieurs fois à partir d'une fonction.

Vous pouvez ensuite parcourir cette fonction splitStream comme :

for await (const data of splitStream(response.body)) {
  // data here is a full line of text. For OpenAI, it might look like
  // "data: {...some json object...}" or "data: [DONE]" at the end
}
Copier après la connexion

Si cette syntaxe "for wait" vous déstabilise, elle utilise ce qu'on appelle un "itérateur asynchrone" - comme un itérateur normal que vous utiliseriez avec une boucle for, mais chaque fois qu'il obtient la valeur suivante, il est attendu.

Pour notre exemple, lorsque nous avons obtenu du texte d'OpenAI et que nous attendons plus, la boucle for attendra que splitStream donne une autre valeur, ce qui se produira lorsque wait reader.read() retournera une valeur qui se termine une ou plusieurs lignes de texte.

Ensuite, nous examinerons une autre façon de renvoyer un itérateur asynchrone qui n'est pas une fonction comme splitStream, afin qu'un appelant puisse utiliser une boucle « for wait » pour parcourir ces données.

Renvoyer un itérateur asynchrone

Maintenant que nous avons un itérateur asynchrone renvoyant des lignes complètes de texte, nous pourrions simplement renvoyer splitStream(response.body), mais nous voulons intercepter chacune des lignes et les transformer, tout en laissant l'appelant de notre fonction itérer .

L'approche est similaire à la syntaxe de la fonction asynchrone* ci-dessus. Ici, nous renverrons directement un itérateur asynchrone, au lieu d'une fonction asynchrone qui en renvoie un lorsqu'elle est appelée. La différence est que le type est AsyncIterator au lieu de AsyncGenerator qui doit être appelé en premier. Un AsyncIterator peut être défini en ayant une certaine fonction nommée : Symbol.asyncIterator.2

      return {
        [Symbol.asyncIterator]: async function* () {
          for await (const data of splitStream(stream)) {
            //handle the data
            yield data;
          }
        },
      };
Copier après la connexion

Ceci est utile lorsque vous souhaitez renvoyer quelque chose de différent des données provenant de splitStream. Chaque fois qu'une nouvelle ligne provient de la requête HTTP de streaming, splitStream la donnera, cette fonction la recevra sous forme de données et pourra faire quelque chose avant de la donner à son appelant.

Nous verrons ensuite comment interpréter ces données spécifiquement dans le cas de l'API de complétion de chat en streaming d'OpenAI.

Handling the OpenAI HTTP streaming protocol

The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.

for await (const data of splitStream(stream)) {
  if (data.startsWith("data:")) {
    const json = data.substring("data:".length).trimStart();
    if (json.startsWith("[DONE]")) {
      return;
    }
    yield JSON.parse(json);
  } else {
    console.debug("Unexpected data:", data);
  }
}
Copier après la connexion

Bringing it all together

Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:

  const response = await createChatCompletion({
    model: "llama3",
    messages: [...your messages...],
    stream: true,
  });
  for await (const chunk of response) {
    if (chunk.choices[0].delta?.content) {
      console.log(chunk.choices[0].delta.content);
    }
  }
Copier après la connexion

See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:

const response = await chatStream(messages);
for await (const content of response) {
  console.log(content);
}
Copier après la connexion

Before you copy the code, try to implement it yourself as an exercise in async functions.

More resources

  • For information about returning HTTP streaming data from your own server endpoint, check out this post on AI Chat with HTTP Streaming that both streams data from OpenAI (or similar) to your server and simultaneously streams it down to a client, while doing custom logic as it goes (such as saving chunks to a database).
  • The MDN docs, as always, are great. Beyond the links above, here’s a guide on the readable streams API that shows how to connect a readable stream to an tag to stream in an image request. Note: this guide uses response.body as an async iterator, but currently that is not widely implemented and not in the TypeScript types.
    1. Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩

    2. Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal