由于Python是解释型语言,当用于后端开发时,例如与Python Django结合时,相对于Java Spring,其响应时间会长一些。不过,只要代码合理,差别并不会太大。即使Django使用多进程模式,其并发处理能力仍然弱很多。 Python有一些提高并发处理能力的解决方案。例如,使用异步框架FastAPI,凭借其异步能力,可以大大增强I/O密集型任务的并发处理能力。 FastAPI 是最快的 Python 框架之一。
我们先简单了解一下如何使用FastAPI。
安装:
pip install fastapi
简单的服务器端代码:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
启动:
uvicorn app:app --reload
我们可以看到,与其他框架相比,FastAPI的接口只多了一个async关键字。 async 关键字将接口定义为异步。仅从返回结果来看,我们无法看出FastAPI与其他Python框架的区别。区别在于并发访问。 FastAPI的服务器线程在处理路由请求时,如http://127.0.0.1:8000/,如果遇到网络I/O,将不再等待,而是处理其他请求。当网络 I/O 完成时,执行将恢复。这种异步能力提高了 I/O 密集型任务的处理能力。
让我们看另一个例子。在业务代码中,发起显式的异步网络请求。对于这个网络I/O,就像路由请求一样,FastAPI也会异步处理。
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
如果希望数据库I/O是异步的,需要数据库驱动或者ORM异步操作的支持。
FastAPI异步的核心实现是异步I/O。我们可以直接使用异步I/O来启动一个具有异步处理能力的服务器,而不需要使用FastAPI。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
本示例启动时,http://127.0.0.1:8000/的返回结果与示例1相同。异步I/O的底层实现原理是“协程”和“事件循环” .
pip install fastapi
函数索引是用 async def 定义的,这意味着它是一个协程。 await 关键字用在 I/O 操作之前,告诉执行线程不要等待本次 I/O 操作。普通函数的调用是通过栈来实现的,函数只能一个一个地调用和执行。然而,协程是一种特殊的函数(不是协作线程)。它允许线程在等待标记处暂停执行并切换到执行其他任务。当I/O操作完成后,会继续执行。
我们来看看多个协程并发执行的效果。
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
输出:
uvicorn app:app --reload
我们可以看到线程并没有一一执行这三个任务。当它遇到I/O操作时,它会切换到执行其他任务。 I/O操作完成后继续执行。还可以看出,三个协程基本上同时开始等待I/O操作,所以最终的执行完成时间基本相同。虽然这里没有显式使用事件循环,但 asyncio.run 会隐式使用它。
协程是通过生成器实现的。生成器可以暂停函数的执行,也可以恢复函数的执行,这是协程的特点。
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
用next()运行生成器时,遇到yield时会暂停。当 next() 再次运行时,它将从上次暂停的地方继续运行。在Python 3.5之前,协程也是用“注释”编写的。从Python 3.5开始,使用async def wait。
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
生成器的暂停和恢复功能除了协程之外还可以用于许多事情。例如,它可以循环计算和存储算法。例如,实现一个帕斯卡三角形(每行两端都是1,其他位置的数字是它上面两个数字的和)。
async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
输出:
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Create tasks to make coroutines execute concurrently task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Wait for all tasks to complete await task1 await task2 await task3 print("Main finished") # Run the main coroutine asyncio.run(main())
既然协程执行可以暂停,那么协程什么时候恢复执行呢?这就需要使用事件循环来告诉执行线程。
Main started Coroutine 1 started at 2024-12-27 12:28:01.661251 Coroutine 2 started at 2024-12-27 12:28:01.661276 Coroutine 3 started at 2024-12-27 12:28:01.665012 Coroutine 1 finished at 2024-12-27 12:28:02.665125 Coroutine 2 finished at 2024-12-27 12:28:02.665120 Coroutine 3 finished at 2024-12-27 12:28:02.665120 Main finished
事件循环使用I/O复用技术,不断循环监听协程可以继续执行的事件。当它们可以执行时,线程将继续执行协程。
简单理解I/O复用:我是一个快递站的老板。我不需要主动询问每个快递员的任务完成情况。相反,快递员完成任务后会自行来找我。这提高了我的任务处理能力,我可以做更多的事情。
select、poll、epoll都可以实现I/O复用。与select和poll相比,epoll具有更好的性能。 Linux一般默认使用epoll,macOS使用kqueue,与epoll类似,性能也差不多。
pip install fastapi
启动服务器socket来监听指定端口。如果运行在Linux系统上,选择器默认使用epoll作为其实现。代码中使用epoll来注册一个请求接收事件(accept事件)。当新的请求到来时,epoll会触发并执行事件处理函数,同时注册一个读事件(read event)来处理和响应请求数据。从Web端通过http://127.0.0.1:8000/访问,返回结果与示例1相同。服务器运行日志:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
直接使用Socket启动服务器。使用浏览器访问http://127.0.0.1:8080/或者使用curl http://127.0.0.1:8080/访问时,会返回{"Hello": "World"}
uvicorn app:app --reload
使用curl http://127.0.0.1:8001/访问时,服务器运行日志:
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
异步I/O在底层使用“协程”和“事件循环”实现。 “协程”确保当线程在执行过程中遇到标记的 I/O 操作时,不必等待 I/O 完成而是可以暂停并让线程执行其他任务而不会阻塞。 “事件循环”使用I/O复用技术,不断循环监视I/O事件。当某个I/O事件完成时,会触发相应的回调,让协程继续执行。
最后介绍一下部署Flask/FastAPI的理想平台:Leapcell。
Leapcell是专为现代分布式应用程序设计的云计算平台。其按需付费的定价模式确保没有闲置成本,这意味着用户只需为他们实际使用的资源付费。
Leapcell对于WSGI/ASGI应用的独特优势:
在文档中了解更多信息!
Leapcell Twitter:https://x.com/LeapcellHQ
以上是使用 FastAPI 掌握 Python 异步 IO的详细内容。更多信息请关注PHP中文网其他相关文章!