La réponse en streaming FastAPI ne diffuse pas avec la fonction générateur
Problème :
Une application FastAPI ne parvient pas à diffusez une réponse à partir d'une fonction génératrice à l'aide de StreamingResponse, ce qui entraîne l'envoi de la réponse entière dans son ensemble.
Réponse :
Il y a plusieurs facteurs à prendre en compte lors de l'utilisation de StreamingResponse avec fonctions génératrices :
1. Type de requête HTTP :
Le code fourni utilise une requête POST, qui n'est pas adaptée pour obtenir des données du serveur. Utilisez plutôt une requête GET pour récupérer des données.
2. Gestion des informations d'identification :
Pour des raisons de sécurité, évitez d'envoyer des informations d'identification (par exemple, « auth_key ») via la chaîne de requête URL. Utilisez plutôt des en-têtes ou des cookies.
3. Syntaxe de la fonction génératrice :
Les opérations de blocage ne doivent pas être exécutées dans la fonction génératrice de StreamingResponse. Utilisez def au lieu de async def pour la fonction génératrice, car FastAPI utilise un pool de threads pour gérer les opérations de blocage.
4. Utilisation de l'itérateur :
Dans votre code de test, request.iter_lines() parcourt les données de réponse une ligne à la fois. Si la réponse ne contient pas de sauts de ligne, utilisez iter_content() et spécifiez une taille de bloc pour éviter d'éventuels problèmes de mise en mémoire tampon.
5. Type de média :
Les navigateurs peuvent mettre en mémoire tampon les réponses avec media_type='text/plain'. Pour éviter cela, définissez media_type='text/event-stream' ou désactivez le reniflage MIME à l'aide de X-Content-Type-Options : nosniff dans les en-têtes de réponse.
Exemple de travail :
Voici un exemple fonctionnel dans app.py et test.py qui résout les problèmes mentionnés ci-dessus :
# app.py from fastapi import FastAPI, StreamingResponse import asyncio app = FastAPI() async def fake_data_streamer(): for i in range(10): yield b'some fake data\n\n' await asyncio.sleep(0.5) @app.get('/') async def main(): headers = {'X-Content-Type-Options': 'nosniff'} # Disable MIME Sniffing return StreamingResponse(fake_data_streamer(), media_type='text/event-stream', headers=headers) # test.py (using httpx) import httpx url = 'http://localhost:8000/' with httpx.stream('GET', url) as r: for chunk in r.iter_content(1024): print(chunk)
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!