Maison > développement back-end > Tutoriel Python > Comment exécuter des tâches en arrière-plan dans une application FastAPI ?

Comment exécuter des tâches en arrière-plan dans une application FastAPI ?

DDD
Libérer: 2024-12-05 06:52:10
original
637 Les gens l'ont consulté

How to Run Background Tasks in a FastAPI Application?

Comment exécuter un fil de discussion en arrière-plan pour FastAPI Python

Défi : incorporer une fonction d'arrière-plan indépendante dans FastAPI

Lors de la construction d'un serveur Python à l'aide FastAPI, il peut être nécessaire d'incorporer une fonction qui n'est pas liée à la logique de l'API mais qui doit s'exécuter en arrière-plan de manière récurrente. Par exemple, cela pourrait impliquer de vérifier des API externes et d'imprimer des informations en fonction des réponses.

Option 1 : Utiliser un fil

Une approche consiste à créer un fil qui exécute la fonction souhaitée. Voici comment cela peut être réalisé :

import threading

def start_worker():
    print('[main]: starting worker...')
    # Define and execute the worker function
    worker = my_worker.Worker()
    worker.working_loop()

if __name__ == '__main__':
    print('[main]: starting...')
    # Start the worker thread
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()
Copier après la connexion

Option 2 : Planification d'événements

Une méthode alternative consiste à utiliser un planificateur d'événements pour la tâche en arrière-plan :

import sched, time
from threading import Thread

s = sched.scheduler(time.time, time.sleep)

def print_event(sc):
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target=start_scheduler)
    thread.start()
Copier après la connexion

Option 3 : Tâches asynchrones avec boucle d'événement

Si la tâche est une fonction de définition asynchrone, elle peut être ajoutée à la boucle d'événement actuelle en utilisant la fonction asyncio.create_task() :

from fastapi import FastAPI
import asyncio

async def print_task(s):
    while True:
        print('Hello')
        await asyncio.sleep(s)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Add the async task to the event loop
    asyncio.create_task(print_task(5))
    yield
    print('Shutting down...')

app = FastAPI(lifespan=lifespan)
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal