Pourquoi ma réponse FastAPI StreamingResponse ne diffuse-t-elle pas avec une fonction génératrice ?

Mary-Kate Olsen
Libérer: 2024-11-10 02:55:02
original
765 Les gens l'ont consulté

Why is my FastAPI StreamingResponse not streaming with a generator function?

La réponse en streaming FastAPI ne diffuse pas avec la fonction générateur

Problème :

Une application FastAPI ne parvient pas à diffusez une réponse à partir d'une fonction génératrice à l'aide de StreamingResponse, ce qui entraîne l'envoi de la réponse entière dans son ensemble.

Réponse :

Il y a plusieurs facteurs à prendre en compte lors de l'utilisation de StreamingResponse avec fonctions génératrices :

1. Type de requête HTTP :

Le code fourni utilise une requête POST, qui n'est pas adaptée pour obtenir des données du serveur. Utilisez plutôt une requête GET pour récupérer des données.

2. Gestion des informations d'identification :

Pour des raisons de sécurité, évitez d'envoyer des informations d'identification (par exemple, « auth_key ») via la chaîne de requête URL. Utilisez plutôt des en-têtes ou des cookies.

3. Syntaxe de la fonction génératrice :

Les opérations de blocage ne doivent pas être exécutées dans la fonction génératrice de StreamingResponse. Utilisez def au lieu de async def pour la fonction génératrice, car FastAPI utilise un pool de threads pour gérer les opérations de blocage.

4. Utilisation de l'itérateur :

Dans votre code de test, request.iter_lines() parcourt les données de réponse une ligne à la fois. Si la réponse ne contient pas de sauts de ligne, utilisez iter_content() et spécifiez une taille de bloc pour éviter d'éventuels problèmes de mise en mémoire tampon.

5. Type de média :

Les navigateurs peuvent mettre en mémoire tampon les réponses avec media_type='text/plain'. Pour éviter cela, définissez media_type='text/event-stream' ou désactivez le reniflage MIME à l'aide de X-Content-Type-Options : nosniff dans les en-têtes de réponse.

Exemple de travail :

Voici un exemple fonctionnel dans app.py et test.py qui résout les problèmes mentionnés ci-dessus :

# app.py

from fastapi import FastAPI, StreamingResponse
import asyncio

app = FastAPI()

async def fake_data_streamer():
    for i in range(10):
        yield b'some fake data\n\n'
        await asyncio.sleep(0.5)

@app.get('/')
async def main():
    headers = {'X-Content-Type-Options': 'nosniff'}  # Disable MIME Sniffing
    return StreamingResponse(fake_data_streamer(), media_type='text/event-stream', headers=headers)

# test.py (using httpx)

import httpx

url = 'http://localhost:8000/'

with httpx.stream('GET', url) as r:
    for chunk in r.iter_content(1024):
        print(chunk)
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal