다음을 사용하여 Python 서버를 구성하는 경우 FastAPI의 경우 API 로직과 관련이 없지만 백그라운드에서 반복적으로 실행되어야 하는 기능을 통합해야 할 수 있습니다. 예를 들어 외부 API를 확인하고 응답에 따라 정보를 인쇄하는 작업이 포함될 수 있습니다.
한 가지 접근 방식은 원하는 기능을 실행하는 스레드를 생성하는 것입니다. 이를 달성하는 방법은 다음과 같습니다.
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
백그라운드 작업에 이벤트 스케줄러를 사용하는 방법도 있습니다.
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
작업이 비동기 정의 함수인 경우 추가할 수 있습니다. asyncio.create_task() 함수를 사용하여 현재 이벤트 루프에:
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
위 내용은 FastAPI 애플리케이션에서 백그라운드 작업을 실행하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!