FastAPI でスケジュールされたタスクと定期的なタスクを実装する方法

WBOY
リリース: 2023-07-30 15:53:12
オリジナル
3577 人が閲覧しました

FastAPI でスケジュールされたタスクと定期的なタスクを実装する方法

はじめに:
FastAPI は、API アプリケーションの構築に重点を置いた最新の高パフォーマンスの Python フレームワークです。ただし、場合によっては、FastAPI アプリケーションでスケジュールされたタスクや定期的なタスクを実行する必要があります。この記事では、これらのタスクを FastAPI アプリケーションに実装する方法について説明し、対応するコード例を示します。

1. スケジュールされたタスクの実装

  1. APScheduler ライブラリの使用
    APScheduler は、スケジュールされたタスクのスケジュールと管理のための強力な Python ライブラリです。日付、時間間隔、Cron 式などに基づく複数のタスク スケジューラをサポートします。 APScheduler を使用して FastAPI でスケジュールされたタスクを実装する手順は次のとおりです。

    1. APScheduler ライブラリをインストールします。ターミナルでコマンド pip install apscheduler を実行して、APScheduler ライブラリをインストールします。
    2. スケジュールされたタスク モジュールを作成する: FastAPI アプリケーションのルート ディレクトリに、スケジュールされたタスクを定義するための tasks.py という名前のファイルを作成します。
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('interval', seconds=10)
def job():
    print("This is a scheduled job")

scheduler.start()
ログイン後にコピー
  1. スケジュールされたタスク モジュールを登録します。FastAPI アプリケーションのメイン ファイルで、スケジュールされたタスク モジュールをインポートし、FastAPI のサブアプリケーションとして登録します。応用。
from fastapi import FastAPI
from .tasks import scheduler

app = FastAPI()

app.mount("/tasks", scheduler.app)
ログイン後にコピー
  1. Celery ライブラリの使用
    Celery は、非同期タスクとスケジュールされたタスクをサポートする強力な分散タスク キュー ライブラリです。 Celery を使用して FastAPI でスケジュールされたタスクを実装する手順は次のとおりです。

    1. Celery ライブラリをインストールします。ターミナルでコマンド pip install celery を実行して、Celery ライブラリをインストールします。
    2. スケジュールされたタスク モジュールを作成する: FastAPI アプリケーションのルート ディレクトリに、スケジュールされたタスクを定義するための tasks.py という名前のファイルを作成します。
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def job():
    print("This is a scheduled job")
ログイン後にコピー
  1. スケジュールされたタスク モジュールを登録します。FastAPI アプリケーションのメイン ファイルで、スケジュールされたタスク モジュールをインポートし、FastAPI のサブアプリケーションとして登録します。応用。
from fastapi import FastAPI
from .tasks import app as celery_app

app = FastAPI()

app.mount("/tasks", celery_app)
ログイン後にコピー

2. 定期タスクの実装

  1. APScheduler ライブラリの使用
    APScheduler ライブラリは、定期タスクのスケジュール設定もサポートしています。 APScheduler を使用して FastAPI アプリケーションに定期タスクを実装する手順は次のとおりです。

    1. APScheduler ライブラリをインストールします。前の記事の手順 1 を参照してください。
    2. 定期タスク モジュールを作成します。前の記事の手順 2 を参照してください。
from apscheduler.triggers.cron import CronTrigger

scheduler = BackgroundScheduler()

@scheduler.scheduled_job(CronTrigger.from_crontab('* * * * *'))
def job():
    print("This is a periodic job")

scheduler.start()
ログイン後にコピー
  1. Celery ライブラリの使用
    Celery ライブラリは、定期的なタスクのスケジュール設定もサポートしています。 Celery を使用して FastAPI アプリケーションに定期タスクを実装する手順は次のとおりです。

    1. Celery ライブラリをインストールします。前の記事の手順 1 を参照してください。
    2. 定期タスク モジュールを作成します。前の記事の手順 2 を参照してください。
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost:6379')

@app.task
def job():
    print("This is a periodic job")

app.conf.beat_schedule = {
    'job': {
        'task': 'tasks.job',
        'schedule': crontab(minute='*'),
    },
}
ログイン後にコピー

結論:
APScheduler または Celery ライブラリを使用すると、FastAPI アプリケーションにスケジュールされたタスクと定期的なタスクを簡単に実装できます。上記のコード例は、FastAPI プロジェクトでこれらのタスク関数を迅速に実装するのに役立つ参照として使用できます。上記は単純な例ですが、これらの例に基づいて独自のタスク ロジックをさらに拡張およびカスタマイズできます。

参考資料:

  • APScheduler 公式ドキュメント: https://apscheduler.readthedocs.io/
  • Celery 公式ドキュメント: https://docs.celeryproject. org/

(この記事は参考用です。実際のニーズに応じて調整および変更してください。)

以上がFastAPI でスケジュールされたタスクと定期的なタスクを実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート