在 FastAPI 端点中进行并发 HTTP 调用时,如何避免资源紧张和潜在的崩溃?

Mary-Kate Olsen
发布: 2024-11-15 22:02:02
原创
412 人浏览过

How can I avoid resource strain and potential crashes when making concurrent HTTP calls in a FastAPI endpoint?

FastAPI 端点中使用 HTTPX 的异步调用

关于在 FastAPI 中使用 ThreadPoolExecutor 的担忧

您对在 FastAPI 端点中使用并发.futures.ThreadPoolExecutor 的影响表示担忧,特别是在线程创建、主机饥饿和应用程序崩溃方面。这些担忧是合理的,因为过多的线程创建可能会导致资源紧张,并可能导致性能问题。

建议:HTTPX Async API

为了避免这些潜在的陷阱,建议利用 HTTPX 库的异步功能而不是并发.futures.ThreadPoolExecutor。 HTTPX 提供了高效的异步 API,允许您发出 HTTP 请求,而无需显式的线程管理。这种方法有几个优点:

  • 受控连接池:HTTPX 使您能够控制连接池的大小,确保维持合理数量的连接以获得最佳性能,而无需
  • 异步请求:异步请求不会阻塞事件循环,为其他操作释放资源,从而防止性能瓶颈。
  • 优雅关闭: HTTPX 允许您显式关闭 AsyncClient 实例,确保正确清理并防止资源泄漏。

工作示例

以下代码代码片段演示了如何在 FastAPI 端点中使用 HTTPX 实现异步 HTTP 请求:

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import httpx
import asyncio

URLS = ['https://www.foxnews.com/',
        'https://edition.cnn.com/',
        'https://www.nbcnews.com/',
        'https://www.bbc.co.uk/',
        'https://www.reuters.com/']

@asynccontextmanager
async def lifespan(app: FastAPI):
    # customise settings
    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.

    # Initialise the Client on startup and add it to the state
    async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
        yield {'client': client}
        # The Client closes on shutdown 

app = FastAPI(lifespan=lifespan)

async def send(url, client):
    return await client.get(url)

@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return [r.text[:50] for r in responses]  # for demo purposes, only return the first 50 chars of each response
登录后复制

替代方案:流式传输响应以避免 RAM 使用

如果读取整个响应正文进入 RAM 是一个问题,请考虑利用 HTTPX 的 Streaming 响应以及 FastAPI 的 StreamingResponse:

... # (same as previous code snippet)

async def iter_content(responses):
     for r in responses:
        async for chunk in r.aiter_text():
            yield chunk[:50]  # for demo purposes, return only the first 50 chars of each response and then break the loop
            yield '\n\n'
            break
        await r.aclose()

@app.get('/')
async def main(request: Request):
    client = request.state.client
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    return StreamingResponse(iter_content(responses), media_type='text/event-stream')
登录后复制

以上是在 FastAPI 端点中进行并发 HTTP 调用时,如何避免资源紧张和潜在的崩溃?的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:php.cn
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板