Python은 해석형 언어이기 때문에 Python Django와의 조합 등 백엔드 개발에 사용할 경우 Java Spring에 비해 응답 시간이 조금 더 길어집니다. 그러나 코드가 합리적이라면 그 차이는 그리 크지 않습니다. Django가 다중 프로세스 모드를 사용하더라도 동시 처리 능력은 여전히 훨씬 약합니다. Python에는 동시 처리 기능을 향상시키는 몇 가지 솔루션이 있습니다. 예를 들어, 비동기 기능을 갖춘 비동기 프레임워크 FastAPI를 사용하면 I/O 집약적인 작업의 동시 처리 능력이 크게 향상될 수 있습니다. FastAPI는 가장 빠른 Python 프레임워크 중 하나입니다.
먼저 FastAPI 사용법을 간단히 살펴보겠습니다.
설치:
pip install fastapi
간단한 서버측 코드:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
스타트업:
uvicorn app:app --reload
다른 프레임워크와 비교하여 FastAPI의 인터페이스에는 추가 async 키워드만 있는 것을 볼 수 있습니다. async 키워드는 인터페이스를 비동기식으로 정의합니다. 반환 결과만으로는 FastAPI와 다른 Python 프레임워크 간의 차이점을 알 수 없습니다. 차이점은 동시 액세스에 있습니다. FastAPI의 서버 스레드가 http://127.0.0.1:8000/과 같은 경로 요청을 처리할 때 네트워크 I/O가 발생하면 더 이상 기다리지 않고 대신 다른 요청을 처리합니다. 네트워크 I/O가 완료되면 실행이 재개됩니다. 이러한 비동기식 기능은 I/O 집약적인 작업의 처리 능력을 향상시킵니다.
또 다른 예를 살펴보겠습니다. 비즈니스 코드에서는 명시적인 비동기 네트워크 요청이 시작됩니다. 이 네트워크 I/O의 경우 경로 요청과 마찬가지로 FastAPI도 이를 비동기식으로 처리합니다.
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
데이터베이스 I/O를 비동기식으로 수행하려면 데이터베이스 드라이버 또는 ORM의 비동기 작업 지원이 필요합니다.
FastAPI 비동기의 핵심 구현은 비동기 I/O입니다. FastAPI를 사용하지 않고 비동기 I/O를 사용하여 비동기 처리 기능을 갖춘 서버를 직접 시작할 수 있습니다.
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
이 예제가 시작되면 http://127.0.0.1:8000/의 반환 결과는 예제 1과 동일합니다. 비동기 I/O의 기본 구현 원칙은 "코루틴"과 "이벤트 루프"입니다. .
pip install fastapi
함수 인덱스는 async def로 정의되는데, 이는 코루틴임을 의미합니다. Wait 키워드는 I/O 작업 전에 실행 스레드에 이 I/O 작업을 기다리지 않도록 지시하는 데 사용됩니다. 일반 함수의 호출은 스택을 통해 구현되며, 함수는 하나씩만 호출하고 실행할 수 있습니다. 그러나 코루틴은 특별한 종류의 함수입니다(협업 스레드가 아님). 이를 통해 스레드는 대기 표시에서 실행을 일시 중지하고 다른 작업을 실행하도록 전환할 수 있습니다. I/O 작업이 완료되면 실행이 계속됩니다.
여러 코루틴이 동시에 실행되는 효과를 살펴보겠습니다.
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
출력:
uvicorn app:app --reload
스레드가 세 가지 작업을 하나씩 실행하지 않는 것을 볼 수 있습니다. I/O 작업이 발생하면 다른 작업을 실행하도록 전환됩니다. I/O 작업이 완료된 후에도 계속 실행됩니다. 또한 3개의 코루틴은 기본적으로 동시에 I/O 작업을 기다리기 시작하므로 최종 실행 완료 시간은 기본적으로 동일하다는 것을 알 수 있습니다. 여기서는 이벤트 루프가 명시적으로 사용되지 않지만 asyncio.run은 이를 암시적으로 사용합니다.
코루틴은 생성기를 통해 구현됩니다. 제너레이터는 함수 실행을 일시 중지하고 다시 시작할 수도 있는데, 이것이 코루틴의 특징입니다.
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
next()를 사용하여 생성기를 실행할 때 Yield가 발생하면 일시 중지됩니다. next()가 다시 실행되면 마지막으로 일시 중지된 항복에서 계속 실행됩니다. Python 3.5 이전에는 코루틴도 "주석"으로 작성되었습니다. Python 3.5부터는 async defawait가 사용됩니다.
import asyncio from aiohttp import web async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json') async def init(loop): # Use the event loop to monitor web requests app = web.Application(loop=loop) app.router.add_route('GET', '/', index) # Start the server, and the event loop monitors and processes web requests srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv # Explicitly get an event loop loop = asyncio.get_event_loop() # Start the event loop loop.run_until_complete(init(loop)) loop.run_forever()
제너레이터의 일시 중지 및 재개 기능은 코루틴 외에도 다양한 용도로 사용될 수 있습니다. 예를 들어, 반복하면서 계산하고 알고리즘을 저장할 수 있습니다. 예를 들어, 파스칼의 삼각형을 구현합니다(각 행의 양쪽 끝은 1이고 다른 위치의 숫자는 그 위에 있는 두 숫자의 합입니다).
async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
출력:
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Create tasks to make coroutines execute concurrently task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Wait for all tasks to complete await task1 await task2 await task3 print("Main finished") # Run the main coroutine asyncio.run(main())
코루틴 실행은 일시 중지될 수 있는데, 코루틴은 언제 실행을 재개하나요? 이를 위해서는 실행 스레드에 알리기 위해 이벤트 루프를 사용해야 합니다.
Main started Coroutine 1 started at 2024-12-27 12:28:01.661251 Coroutine 2 started at 2024-12-27 12:28:01.661276 Coroutine 3 started at 2024-12-27 12:28:01.665012 Coroutine 1 finished at 2024-12-27 12:28:02.665125 Coroutine 2 finished at 2024-12-27 12:28:02.665120 Coroutine 3 finished at 2024-12-27 12:28:02.665120 Main finished
이벤트 루프는 I/O 멀티플렉싱 기술을 사용하여 지속적으로 순환하여 코루틴이 계속 실행될 수 있는 이벤트를 모니터링합니다. 실행할 수 있게 되면 스레드는 코루틴을 계속 실행합니다.
I/O 다중화를 간단하게 이해하려면: 저는 택배회사의 사장입니다. 각 택배사에게 작업 완료에 대해 적극적으로 물어볼 필요가 없습니다. 대신 택배기사들이 일을 마친 후 스스로 나에게 올 것입니다. 이를 통해 업무 처리 능력이 향상되고, 더 많은 일을 할 수 있게 됩니다.
select, poll 및 epoll은 모두 I/O 멀티플렉싱을 달성할 수 있습니다. select와 poll에 비해 epoll의 성능이 더 좋습니다. Linux는 일반적으로 기본적으로 epoll을 사용하고, macOS는 epoll과 유사하고 성능도 유사한 kqueue를 사용합니다.
pip install fastapi
지정된 포트를 모니터링하려면 서버 소켓을 시작하세요. Linux 시스템에서 실행 중인 경우 선택기는 기본적으로 epoll을 구현으로 사용합니다. 코드는 epoll을 사용하여 요청 수신 이벤트(accept 이벤트)를 등록합니다. 새로운 요청이 도착하면 epoll은 이벤트 처리 기능을 트리거 및 실행하는 동시에 읽기 이벤트(읽기 이벤트)를 등록하여 요청 데이터를 처리하고 응답합니다. http://127.0.0.1:8000/으로 웹측에서 접속 시, 반환 결과는 예시 1과 동일합니다. 서버 실행 로그:
# app.py from typing import Union from fastapi import FastAPI app = FastAPI() @app.get("/") async def read_root(): return {"Hello": "World"}
소켓을 직접 사용하여 서버를 시작합니다. 브라우저로 http://127.0.0.1:8080/ 또는 컬 http://127.0.0.1:8080/을 사용하여 액세스하면 {"Hello": "World"}
가 반환됩니다.
uvicorn app:app --reload
curl http://127.0.0.1:8001/로 접속 시, 서버 실행 로그:
# app.py from fastapi import FastAPI, HTTPException import httpx app = FastAPI() # Example of an asynchronous GET request @app.get("/external-api") async def call_external_api(): url = "https://leapcell.io" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code!= 200: raise HTTPException(status_code=response.status_code, detail="Failed to fetch data") return response.json()
비동기 I/O는 "코루틴"과 "이벤트 루프"를 사용하여 하위 계층에 구현됩니다. "코루틴"은 스레드가 실행 중에 표시된 I/O 작업을 만날 때 I/O가 완료될 때까지 기다릴 필요 없이 일시 중지하고 스레드가 차단 없이 다른 작업을 실행할 수 있도록 보장합니다. "이벤트 루프"는 I/O 멀티플렉싱 기술을 사용하여 지속적으로 순환하여 I/O 이벤트를 모니터링합니다. 특정 I/O 이벤트가 완료되면 해당 콜백이 트리거되어 코루틴이 계속 실행될 수 있습니다.
마지막으로 Flask/FastAPI 배포에 이상적인 플랫폼인 Leapcell을 소개하겠습니다.
Leapcell은 최신 분산 애플리케이션을 위해 특별히 설계된 클라우드 컴퓨팅 플랫폼입니다. 종량제 가격 모델을 통해 유휴 비용이 발생하지 않으므로 사용자는 실제로 사용한 리소스에 대해서만 비용을 지불하면 됩니다.
WSGI/ASGI 애플리케이션을 위한 Leapcell의 고유한 장점:
문서에서 자세히 알아보세요!
리프셀 트위터: https://x.com/LeapcellHQ
위 내용은 FastAPI로 Python 비동기 IO 마스터하기의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!