Cara melaksanakan tugas berjadual dan tugas berkala dalam FastAPI
Pengenalan:
FastAPI ialah rangka kerja Python moden dan berprestasi tinggi yang memfokuskan pada membina aplikasi API. Walau bagaimanapun, kadangkala kita perlu melaksanakan tugas berjadual dan tugas berkala dalam aplikasi FastAPI. Artikel ini menerangkan cara melaksanakan tugas ini dalam aplikasi FastAPI dan menyediakan contoh kod yang sepadan.
1. Pelaksanaan tugas berjadual
Menggunakan perpustakaan APScheduler
APScheduler ialah perpustakaan Python yang berkuasa untuk menjadualkan dan mengurus tugas berjadual. Ia menyokong berbilang penjadual tugas, seperti berdasarkan tarikh, selang masa dan ungkapan Cron. Berikut ialah langkah untuk menggunakan APScheduler untuk melaksanakan tugas berjadual dalam FastAPI:
pip install apscheduler
dalam terminal untuk memasang pustaka APScheduler. pip install apscheduler
来安装APScheduler库。tasks.py
的文件,用于定义定时任务。from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() @scheduler.scheduled_job('interval', seconds=10) def job(): print("This is a scheduled job") scheduler.start()
from fastapi import FastAPI from .tasks import scheduler app = FastAPI() app.mount("/tasks", scheduler.app)
使用Celery库
Celery是一个强大的分布式任务队列库,支持异步和定时任务。以下是在FastAPI中使用Celery实现定时任务的步骤:
pip install celery
来安装Celery库。tasks.py
tasks.py
untuk mentakrifkan tugas berjadual. from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task def job(): print("This is a scheduled job")
from fastapi import FastAPI from .tasks import app as celery_app app = FastAPI() app.mount("/tasks", celery_app)
Saderi ialah perpustakaan baris gilir tugas teragih yang berkuasa yang menyokong tugas tak segerak dan berjadual. Berikut ialah langkah untuk menggunakan Celery untuk melaksanakan tugas berjadual dalam FastAPI:
pip install celery
dalam terminal untuk memasang pustaka Celery. Buat modul tugas berjadual: Dalam direktori akar aplikasi FastAPI, buat fail bernama tasks.py
untuk mentakrifkan tugas berjadual. from apscheduler.triggers.cron import CronTrigger scheduler = BackgroundScheduler() @scheduler.scheduled_job(CronTrigger.from_crontab('* * * * *')) def job(): print("This is a periodic job") scheduler.start()
from celery import Celery from celery.schedules import crontab app = Celery('tasks', broker='redis://localhost:6379') @app.task def job(): print("This is a periodic job") app.conf.beat_schedule = { 'job': { 'task': 'tasks.job', 'schedule': crontab(minute='*'), }, }
Buat modul tugasan berkala: rujuk langkah 2 dalam artikel sebelumnya.
Pasang perpustakaan Saderi: Rujuk langkah 1 dalam artikel sebelumnya.
🎜Buat modul tugasan berkala: rujuk langkah 2 dalam artikel sebelumnya. 🎜🎜🎜🎜rrreee🎜Kesimpulan: 🎜Dengan menggunakan perpustakaan APScheduler atau Celery, kami boleh melaksanakan tugas berjadual dan tugas berkala dengan mudah dalam aplikasi FastAPI. Contoh kod yang disediakan di atas boleh digunakan sebagai rujukan untuk membantu anda melaksanakan fungsi tugas ini dengan cepat dalam projek FastAPI anda. Walaupun perkara di atas adalah contoh mudah, anda boleh melanjutkan dan menyesuaikan logik tugas anda sendiri berdasarkan contoh ini. 🎜🎜Bahan rujukan: 🎜🎜🎜Dokumentasi rasmi APSscheduler: https://apscheduler.readthedocs.io/🎜🎜Dokumentasi rasmi Celery: https://docs.celeryproject.org/🎜🎜🎜 (Artikel ini untuk rujukan sahaja rujuk situasi sebenar Laraskan dan ubah suai sewajarnya jika perlu)🎜.Atas ialah kandungan terperinci Cara melaksanakan tugas berjadual dan tugas berkala dalam FastAPI. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!