Since Python is an interpreted language, when used for back-end development, such as in the combination of Python Django, compared to Java Spring, its response time will be a bit longer. However, as long as the code is reasonable, the difference is not too significant. Even when Django uses the multi-process mode, its concurrent processing ability is still much weaker. Python has some solutions to improve concurrent processing capabilities. For example, using the asynchronous framework FastAPI, with its asynchronous capabilities, the concurrent processing ability of I/O-intensive tasks can be greatly enhanced. FastAPI is one of the fastest Python frameworks.
Let's first take a brief look at how to use FastAPI.
Installation:
pip install fastapi
Simple Server-side Code:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
Startup:
uvicorn app:app --reload
We can see that, compared with other frameworks, the interface of FastAPI only has an additional async keyword. The async keyword defines the interface as asynchronous. From the return result alone, we can't tell the difference between FastAPI and other Python frameworks. The difference lies in concurrent access. When the server threads of FastAPI handle route requests, such as http://127.0.0.1:8000/, if they encounter network I/O, they will no longer wait for it but handle other requests instead. When the network I/O is completed, the execution will resume. This asynchronous ability improves the processing ability of I/O-intensive tasks.
Let's look at another example. In the business code, an explicit asynchronous network request is initiated. For this network I/O, just like route requests, FastAPI will also handle it asynchronously.
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
If you want database I/O to be asynchronous, you need the support of asynchronous operations from the database driver or ORM.
The core implementation of FastAPI's asynchrony is asynchronous I/O. We can start a server with asynchronous processing capabilities directly using asynchronous I/O without using FastAPI.
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
When this example is started, the return result of http://127.0.0.1:8000/ is the same as that of Example 1. The underlying implementation principle of asynchronous I/O is "coroutines" and "event loops".
pip install fastapi
The function index is defined with async def, which means it is a coroutine. The await keyword is used before an I/O operation to tell the execution thread not to wait for this I/O operation. The calls of normal functions are implemented through the stack, and functions can only be called and executed one by one. However, a coroutine is a special kind of function (not a collaborative thread). It allows the thread to pause execution at the await mark and switch to execute other tasks. When the I/O operation is completed, the execution will continue.
Let's take a look at the effect of multiple coroutines executing concurrently.
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
Output:
uvicorn app:app --reload
We can see that the thread does not execute the three tasks one by one. When it encounters an I/O operation, it switches to execute other tasks. After the I/O operation is completed, it continues to execute. It can also be seen that the three coroutines basically start waiting for the I/O operation at the same time, so the final execution completion times are basically the same. Although the event loop is not used explicitly here, asyncio.run will use it implicitly.
Coroutines are implemented through generators. Generators can pause the execution of functions and also resume it, which are the characteristics of coroutines.
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
When running the generator with next(), when it encounters yield, it will pause. When next() is run again, it will continue running from the yield where it was paused last time. Before Python 3.5, coroutines were also written with "annotations" yeild. Starting from Python 3.5, async def await are used.
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
The pause and resume features of generators can be used for many things besides coroutines. For example, it can calculate while looping and store algorithms. For instance, implementing a Pascal's triangle (both ends of each row are 1, and the numbers in other positions are the sum of the two numbers above it).
async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
Output:
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Create tasks to make coroutines execute concurrently task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Wait for all tasks to complete await task1 await task2 await task3 print("Main finished") # Run the main coroutine asyncio.run(main())
Since coroutine execution can be paused, when will the coroutine resume execution? This requires the use of an event loop to tell the execution thread.
Main started Coroutine 1 started at 2024-12-27 12:28:01.661251 Coroutine 2 started at 2024-12-27 12:28:01.661276 Coroutine 3 started at 2024-12-27 12:28:01.665012 Coroutine 1 finished at 2024-12-27 12:28:02.665125 Coroutine 2 finished at 2024-12-27 12:28:02.665120 Coroutine 3 finished at 2024-12-27 12:28:02.665120 Main finished
The event loop uses the I/O multiplexing technology, constantly cycling to monitor events where coroutines can continue to execute. When they can be executed, the thread will continue to execute the coroutines.
To understand I/O multiplexing in a simple way: I'm the boss of a courier station. I don't need to actively ask each courier about the completion of their tasks. Instead, the couriers will come to me on their own after completing their tasks. This improves my task processing ability, and I can do more things.
select, poll, and epoll can all achieve I/O multiplexing. Compared with select and poll, epoll has better performance. Linux generally uses epoll by default, and macOS uses kqueue, which is similar to epoll and has similar performance.
pip install fastapi
Start the server socket to monitor the specified port. If running on a Linux system, selectors uses epoll as its implementation by default. The code uses epoll to register a request reception event (accept event). When a new request arrives, epoll will trigger and execute the event handling function, and at the same time, register a read event (read event) to process and respond to the request data. When accessed from the web side with http://127.0.0.1:8000/, the return result is the same as that of Example 1. Server running log:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
Directly use Socket to start a server. When accessed with a browser at http://127.0.0.1:8080/ or using curl http://127.0.0.1:8080/, it will return {"Hello": "World"}
uvicorn app:app --reload
When accessed with curl http://127.0.0.1:8001/, Server running log:
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
Asynchronous I/O is implemented at the bottom layer using "coroutines" and "event loops". "Coroutines" ensure that when the thread encounters marked I/O operations during execution, it doesn't have to wait for the I/O to complete but can pause and let the thread execute other tasks without blocking. "Event loops" use the I/O multiplexing technology, constantly cycling to monitor I/O events. When a certain I/O event is completed, the corresponding callback is triggered, allowing the coroutine to continue execution.
Finally, let me introduce the ideal platform for deploying Flask/FastAPI: Leapcell.
Leapcell is a cloud computing platform designed specifically for modern distributed applications. Its pay-as-you-go pricing model ensures no idle costs, meaning users only pay for the resources they actually use.
The unique advantages of Leapcell for WSGI/ASGI applications:
Learn more in the documentation!
Leapcell Twitter: https://x.com/LeapcellHQ
The above is the detailed content of Mastering Python Async IO with FastAPI. For more information, please follow other related articles on the PHP Chinese website!