FastAPI 提供 StreamingResponse 类,用于在 API 调用期间将数据流式传输到客户端。虽然此功能旨在以非阻塞方式流式传输数据,但在使用具有阻塞操作或不当使用的生成器函数时,可能会出现问题。
确保成功流式传输,请考虑以下事项:
考虑以下 Python 代码:
# app.py from fastapi import FastAPI, StreamingResponse from fastapi.responses import StreamingResponse import asyncio app = FastAPI() async def fake_data_streamer(): for i in range(10): yield b'some fake data\n\n' await asyncio.sleep(0.5) @app.get('/') async def main(): return StreamingResponse(fake_data_streamer(), media_type='text/event-stream') # or, use: ''' headers = {'X-Content-Type-Options': 'nosniff'} return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain') ''' # test.py (using httpx) import httpx url = 'http://127.0.0.1:8000/' with httpx.stream('GET', url) as r: for chunk in r.iter_raw(): # or, for line in r.iter_lines(): print(chunk)
此代码演示了如何从 FastAPI 应用程序中的生成器函数流式传输数据并使用 httpx 库使用它。
以上是如何在 FastAPI 中有效处理流响应?的详细内容。更多信息请关注PHP中文网其他相关文章!