使用構建FastAPI,可能需要合併一個與API 邏輯無關但需要在背景重複執行的函數。例如,這可能涉及檢查外部 API 並根據回應列印訊息。
一種方法是建立一個執行所需函數的執行緒。實作方法如下:
import threading def start_worker(): print('[main]: starting worker...') # Define and execute the worker function worker = my_worker.Worker() worker.working_loop() if __name__ == '__main__': print('[main]: starting...') # Start the worker thread _worker_thread = Thread(target=start_worker, daemon=False) _worker_thread.start()
另一種方法涉及使用事件調度程序來執行後台任務:
import sched, time from threading import Thread s = sched.scheduler(time.time, time.sleep) def print_event(sc): print("Hello") sc.enter(5, 1, print_event, (sc,)) def start_scheduler(): s.enter(5, 1, print_event, (s,)) s.run() @app.on_event("startup") async def startup_event(): thread = Thread(target=start_scheduler) thread.start()
如果任務是async def函數,可以將其加入使用asyncio.create_task() 函數的目前事件循環:
from fastapi import FastAPI import asyncio async def print_task(s): while True: print('Hello') await asyncio.sleep(s) @asynccontextmanager async def lifespan(app: FastAPI): # Add the async task to the event loop asyncio.create_task(print_task(5)) yield print('Shutting down...') app = FastAPI(lifespan=lifespan)
以上是如何在 FastAPI 應用程式中執行後台任務?的詳細內容。更多資訊請關注PHP中文網其他相關文章!