Siaran ini akan melihat cara bekerja dengan JavaScript Streams API yang membolehkan membuat panggilan HTTP ambil dan menerima respons penstriman dalam ketulan, yang membolehkan pelanggan mula membalas respons pelayan dengan lebih banyak lagi cepat dan bina UI seperti ChatGPT.
Sebagai contoh yang memotivasikan, kami akan melaksanakan fungsi untuk mengendalikan respons LLM penstriman daripada OpenAI (atau mana-mana pelayan yang menggunakan API penstriman http yang sama), tidak menggunakan kebergantungan npm—hanya pengambilan terbina dalam. Kod penuh ada di sini termasuk percubaan semula dengan penyingkiran eksponen, pembenaman, sembang bukan penstriman dan API yang lebih mudah untuk berinteraksi dengan pelengkapan sembang dan pembenaman.
Jika anda berminat untuk melihat cara untuk mengembalikan strim HTTP kepada pelanggan, lihat siaran ini.
Berikut ialah contoh penuh. Kami akan melihat setiap bahagian di bawah:
async function createChatCompletion(body: ChatCompletionCreateParams) { // Making the request const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), }); // Handling errors if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, } if (!body.stream) { // the non-streaming case return response.json(); } const stream = response.body; if (!stream) throw new Error("No body in response"); // Returning an async iterator return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { // Handling the OpenAI HTTP streaming protocol if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } } }, }; } // Reading the stream async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
Lihat kod di sini untuk versi yang mempunyai kelebihan taip yang bagus untuk varian parameter penstriman & bukan penstriman, bersama-sama dengan percubaan semula dan peningkatan lain.
Siaran selebihnya adalah tentang memahami perkara yang dilakukan oleh kod ini.
Bahagian ini sebenarnya sangat mudah. Respons HTTP penstriman datang daripada permintaan HTTP biasa:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com"; const response = await fetch(baseUrl + "/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", "Authorization": "Bearer " + process.env.LLM_API_KEY, }, body: JSON.stringify(body), });
Pengepala HTTP dihantar seperti biasa, dan tidak perlu menetapkan apa-apa khususnya untuk mendayakan penstriman. Dan anda masih boleh memanfaatkan pengepala caching biasa untuk penstriman HTTP.
Cerita tentang ralat pada bahagian klien agak malang untuk penstriman HTTP. Kelebihannya ialah untuk penstriman HTTP, pelanggan mendapat kod status serta-merta dalam respons awal dan boleh mengesan kegagalan di sana. Kelemahan protokol http ialah jika pelayan mengembalikan kejayaan tetapi kemudian memecahkan pertengahan strim, tidak ada apa-apa pada tahap protokol yang akan memberitahu klien bahawa strim telah terganggu. Kita akan lihat di bawah cara OpenAI mengekodkan sentinel "selesai" pada penghujungnya untuk menangani perkara ini.
if (!response.ok) { const error = await response.text(); throw new Error(`Failed (${response.status}): ${error}`, }
Untuk membaca respons penstriman HTTP, pelanggan boleh menggunakan sifat response.body yang merupakan ReadableStream yang membolehkan anda mengulangi bahagian apabila ia masuk dari pelayan menggunakan kaedah .getReader(). 1
const reader = request.body.getReader(); try { while (true) { const { value, done } = await reader.read(); if (done) break; const text = TextDecoder().decode(value); //... do something with the chunk } } finally { reader.releaseLock(); }
Ini mengendalikan setiap bit data yang kami dapat semula, tetapi untuk protokol HTTP OpenAI kami menjangkakan data tersebut akan JSON dipisahkan oleh baris baharu, jadi sebaliknya kami akan membahagikan badan tindak balas dan "menghasilkan" setiap baris kerana ia' selesai semula. Kami menampan baris dalam proses menjadi lastFragment dan hanya mengembalikan baris penuh yang telah dipisahkan oleh dua baris baharu:
// stream here is request.body async function* splitStream(stream: ReadableStream<Uint8Array>) { const reader = stream.getReader(); let lastFragment = ""; try { while (true) { const { value, done } = await reader.read(); if (done) { // Flush the last fragment now that we're done if (lastFragment !== "") { yield lastFragment; } break; } const data = new TextDecoder().decode(value); lastFragment += data; const parts = lastFragment.split("\n\n"); // Yield all except for the last part for (let i = 0; i < parts.length - 1; i += 1) { yield parts[i]; } // Save the last part as the new last fragment lastFragment = parts[parts.length - 1]; } } finally { reader.releaseLock(); } }
Jika fungsi* dan sintaks hasil ini tidak anda kenali, cuma layan fungsi* sebagai fungsi yang boleh mengembalikan berbilang perkara dalam satu gelung dan hasil sebagai cara mengembalikan sesuatu berbilang kali daripada fungsi.
Anda kemudiannya boleh mengulangi fungsi splitStream ini seperti:
for await (const data of splitStream(response.body)) { // data here is a full line of text. For OpenAI, it might look like // "data: {...some json object...}" or "data: [DONE]" at the end }
Jika sintaks "untuk menunggu" ini tidak membantu anda, ia menggunakan apa yang dipanggil "peulang async" - seperti lelaran biasa yang anda akan gunakan dengan gelung for, tetapi setiap kali ia mendapat nilai seterusnya, ia ditunggu-tunggu.
Untuk contoh kami, apabila kami mendapat beberapa teks daripada OpenAI dan kami menunggu lebih banyak lagi, gelung for akan menunggu sehingga splitStream menghasilkan nilai lain, yang akan berlaku apabila menunggu reader.read() mengembalikan nilai yang selesai satu atau lebih baris teks.
Seterusnya kita akan melihat cara lain untuk mengembalikan iterator async yang bukan fungsi seperti splitStream, jadi pemanggil boleh menggunakan gelung "untuk menunggu" untuk mengulangi data ini.
Sekarang kami mempunyai iterator async yang mengembalikan baris teks penuh, kami hanya boleh mengembalikan splitStream(response.body), tetapi kami mahu memintas setiap baris dan mengubahnya, sambil masih membenarkan pemanggil fungsi kami untuk melelaran .
Pendekatannya serupa dengan sintaks fungsi async* di atas. Di sini kami akan mengembalikan iterator async secara langsung, bukannya fungsi async yang mengembalikannya apabila ia dipanggil. Perbezaannya ialah jenis AsyncIterator dan bukannya AsyncGenerator yang perlu dipanggil terlebih dahulu. AsyncIterator boleh ditakrifkan dengan mempunyai fungsi bernama tertentu: Symbol.asyncIterator.2
return { [Symbol.asyncIterator]: async function* () { for await (const data of splitStream(stream)) { //handle the data yield data; } }, };
Ini berguna apabila anda ingin mengembalikan sesuatu yang berbeza daripada data yang datang daripada splitStream. Setiap kali baris baharu masuk daripada permintaan HTTP penstriman, splitStream akan menghasilkannya, fungsi ini akan menerimanya dalam data dan boleh melakukan sesuatu sebelum menyerahkannya kepada pemanggilnya.
Seterusnya kita akan melihat cara mentafsir data ini secara khusus dalam kes API penyiapan sembang penstriman OpenAI.
The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) { if (data.startsWith("data:")) { const json = data.substring("data:".length).trimStart(); if (json.startsWith("[DONE]")) { return; } yield JSON.parse(json); } else { console.debug("Unexpected data:", data); } }
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:
const response = await createChatCompletion({ model: "llama3", messages: [...your messages...], stream: true, }); for await (const chunk of response) { if (chunk.choices[0].delta?.content) { console.log(chunk.choices[0].delta.content); } }
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:
const response = await chatStream(messages); for await (const content of response) { console.log(content); }
Before you copy the code, try to implement it yourself as an exercise in async functions.
Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. ↩
Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator](). ↩
Atas ialah kandungan terperinci Menstrim Respons HTTP menggunakan fetch. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!