This post will look at working with the JavaScript Streams API which allows making a fetch HTTP call and receiving a streaming response in chunks, which allows a client to start responding to a server response more quickly and build UIs like ChatGPT.
As a motivating example, we’ll implement a function to handle the streaming LLM response from OpenAI (or any server using the same http streaming API), using no npm dependencies—just the built-in fetch. The full code is here including retries with exponential backoff, embeddings, non-streaming chat, and a simpler APIs for interacting with chat completions and embeddings.
If you’re interested in seeing how to also return an HTTP stream to clients, check out this post.
Here’s the full example. We’ll look at each piece below:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
See the code here for a version that has nice typed overloads for streaming & non-streaming parameter variants, along with retries and other improvements.
The rest of the post is about understanding what this code does.
This part is actually very easy. A streaming HTTP response comes from a normal HTTP request:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
The HTTP headers are sent up per usual, and don’t have to set anything in particular to enable streaming. And you can still leverage regular caching headers for HTTP streaming.
The story around errors on the client side is a little unfortunate for HTTP streaming. The upside is that for HTTP streaming, the client gets status codes right away in the initial response and can detect failure there. The downside to the http protocol is that if the server returns success but then breaks mid-stream, there isn’t anything at the protocol level that will tell the client that the stream was interrupted. We’ll see below how OpenAI encodes an “all done” sentinel at the end to work around this.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
In order to read an HTTP streaming response, the client can use the response.body property which is a ReadableStream allowing you to iterate over the chunks as they come in from the server using the .getReader() method.1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
This handles every bit of data that we get back, but for the OpenAI HTTP protocol we are expecting the data to be JSON separated by newlines, so instead we will split up the response body and “yield” each line as they’re completed. We buffer the in-progress line into lastFragment and only return full lines that have been separated by two newlines:
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
If this function* and yield syntax is unfamiliar to you, just treat function* as a function that can return multiple things in a loop, and yield as the way of returning something multiple times from a function.
You can then loop over this splitStream function like:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
If this "for await" syntax throws you off, it's using what’s called an “async iterator” - like a regular iterator you’d use with a for loop, but every time it gets the next value, it’s awaited.
For our example, when we’ve gotten some text from OpenAI and we’re waiting for more, the for loop will wait until splitStream yields another value, which will happen when await reader.read() returns a value that finishes one or more lines of text.
Next up we’ll look at another way of returning an async iterator that isn’t a function like splitStream, so a caller can use a “for await” loop to iterate over this data.
Now that we have an async iterator returning full lines of text, we could just return splitStream(response.body), but we want to intercept each of the lines and transform them, while still letting the caller of our function to iterate.
The approach is similar to to the async function* syntax above. Here we’ll return an async iterator directly, instead of an async function that returns one when it’s called. The difference is the type is AsyncIterator instead of AsyncGenerator which needs to be called first. An AsyncIterator can be defined by having a certain named function: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
This is useful when you want to return something different from the data coming from splitStream. Every time a new line comes in from the streaming HTTP request, splitStream will yield it, this function will receive it in data and can do something before yielding it to its caller.
Next we’ll look at how to interpret this data specifically in the case of OpenAI’s streaming chat completion API.
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
The above is the detailed content of Streaming HTTP Responses using fetch. For more information, please follow other related articles on the PHP Chinese website!