FastAPI StreamingResponse streamt nicht mit Generatorfunktion
Problem:
Eine FastAPI-Anwendung schlägt fehl Streamen Sie eine Antwort von einer Generatorfunktion mithilfe von StreamingResponse, was dazu führt, dass die gesamte Antwort als gesendet wird ganz.
Antwort:
Bei der Verwendung von StreamingResponse mit Generatorfunktionen sind mehrere Faktoren zu berücksichtigen:
1. HTTP-Anfragetyp:
Der bereitgestellte Code verwendet eine POST-Anfrage, die nicht zum Abrufen von Daten vom Server geeignet ist. Verwenden Sie stattdessen eine GET-Anfrage, um Daten abzurufen.
2. Umgang mit Anmeldeinformationen:
Vermeiden Sie aus Sicherheitsgründen das Senden von Anmeldeinformationen (z. B. „auth_key“) über die URL-Abfragezeichenfolge. Verwenden Sie stattdessen Header oder Cookies.
3. Syntax der Generatorfunktion:
Blockierungsvorgänge sollten nicht innerhalb der Generatorfunktion von StreamingResponse ausgeführt werden. Verwenden Sie def anstelle von async def für die Generatorfunktion, da FastAPI einen Thread-Pool verwendet, um Blockierungsvorgänge zu verwalten.
4. Verwendung des Iterators:
In Ihrem Testcode iteriert „requests.iter_lines()“ zeilenweise durch die Antwortdaten. Wenn die Antwort keine Zeilenumbrüche enthält, verwenden Sie iter_content() und geben Sie eine Blockgröße an, um potenzielle Pufferprobleme zu vermeiden.
5. Medientyp:
Browser können Antworten mit media_type='text/plain' puffern. Um dies zu vermeiden, legen Sie media_type='text/event-stream' fest oder deaktivieren Sie MIME Sniffing mit X-Content-Type-Options: nosniff in den Antwortheadern.
Arbeitsbeispiel:
Hier ist ein funktionierendes Beispiel in app.py und test.py, das die oben genannten Probleme behebt:
# app.py from fastapi import FastAPI, StreamingResponse import asyncio app = FastAPI() async def fake_data_streamer(): for i in range(10): yield b'some fake data\n\n' await asyncio.sleep(0.5) @app.get('/') async def main(): headers = {'X-Content-Type-Options': 'nosniff'} # Disable MIME Sniffing return StreamingResponse(fake_data_streamer(), media_type='text/event-stream', headers=headers) # test.py (using httpx) import httpx url = 'http://localhost:8000/' with httpx.stream('GET', url) as r: for chunk in r.iter_content(1024): print(chunk)
Das obige ist der detaillierte Inhalt vonWarum streamt meine FastAPI StreamingResponse nicht mit einer Generatorfunktion?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!