Cara menggunakan baris gilir mesej untuk pemprosesan tugas asynchronous dalam FastAPI
Pengenalan:
Dalam aplikasi web, selalunya dihadapi bahawa tugas yang memakan masa perlu diproses, seperti menghantar e-mel, menjana laporan, dsb. Jika tugasan ini diletakkan dalam proses permintaan-tindak balas segerak, pengguna perlu menunggu untuk masa yang lama, mengurangkan pengalaman pengguna dan kelajuan tindak balas pelayan. Untuk menyelesaikan masalah ini, kami boleh menggunakan baris gilir mesej untuk pemprosesan tugas tak segerak. Artikel ini akan memperkenalkan cara menggunakan baris gilir mesej untuk memproses tugas tak segerak dalam rangka kerja FastAPI dan memberikan contoh kod yang sepadan.
1. Apakah itu baris gilir mesej?
Baris gilir mesej ialah mekanisme untuk komunikasi tak segerak antara komponen aplikasi. Ia membolehkan pengirim menghantar mesej ke baris gilir, dan penerima untuk mendapatkan dan memproses mesej ini daripada baris gilir. Kelebihan baris gilir mesej ialah penghantar dan penerima dipisahkan Pengirim tidak perlu menunggu penerima menyelesaikan pemprosesan sebelum meneruskan melaksanakan tugas lain, dengan itu meningkatkan prestasi pemprosesan dan keselarasan sistem.
2. Pilih perkhidmatan baris gilir mesej yang sesuai
Sebelum menggunakan baris gilir mesej, kita perlu memilih perkhidmatan baris gilir mesej yang sesuai. Pada masa ini, perkhidmatan baris gilir mesej yang lebih biasa digunakan termasuk RabbitMQ, Kafka, ActiveMQ, dll. Perkhidmatan baris gilir mesej ini menyediakan fungsi yang kaya dan jaminan kebolehpercayaan, dan kami boleh memilih perkhidmatan yang sesuai mengikut keperluan sebenar.
3 Menggunakan baris gilir mesej dalam FastAPI
Untuk menggunakan baris gilir mesej dalam FastAPI, kami perlu memasang perpustakaan pelanggan baris gilir mesej yang sepadan. Mengambil RabbitMQ sebagai contoh, anda boleh memasangnya melalui arahan pip install aio-pika
. Selepas pemasangan selesai, kami boleh memperkenalkan kebergantungan dan modul yang sepadan dalam fail utama FastAPI. pip install aio-pika
进行安装。安装完成后,我们可以在FastAPI的主文件中引入相应的依赖项和模块。
from fastapi import FastAPI from fastapi import BackgroundTasks from aio_pika import connect, IncomingMessage
接下来,我们需要配置消息队列的连接信息,并编写处理消息的函数。
AMQP_URL = "amqp://guest:guest@localhost/" QUEUE_NAME = "task_queue" async def process_message(message: IncomingMessage): # 在这里编写异步任务的处理逻辑 # 例如发送邮件、生成报表等 print(f"Received message: {message.body}") # 这里可以根据实际情况进行任务处理 # ... message.ack()
然后,我们需要在FastAPI应用程序中定义一个接口,用来接收需要进行异步处理的任务。
app = FastAPI() @app.post("/task") async def handle_task(request: dict, background_tasks: BackgroundTasks): connection = await connect(AMQP_URL) channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME) # 发送任务给消息队列 await queue.publish( body=str(request).encode(), routing_key=QUEUE_NAME ) connection.close() return {"message": "Task submitted successfully"}
上述代码定义了一个POST接口/task
async def listen_to_queue(): connection = await connect(AMQP_URL) channel = await connection.channel() queue = await channel.declare_queue(QUEUE_NAME) # 持续监听消息队列 async with queue.iterator() as queue_iterator: async for message in queue_iterator: async with message.process(): await process_message(message)
app = FastAPI() @app.on_event("startup") async def startup_event(): # 启动消息队列监听 await listen_to_queue()
rrreee
Kod di atas mentakrifkan antara muka POSTrrreee
Di pintu masuk aplikasi FastAPI, kita perlu memulakan fungsi tak segerak untuk mendengar baris gilir mesej.
Pada ketika ini, kami telah menyelesaikan konfigurasi dan pengekodan menggunakan baris gilir mesej untuk pemprosesan tugas tak segerak dalam FastAPI.
Kesimpulan:
Atas ialah kandungan terperinci Cara menggunakan baris gilir mesej untuk pemprosesan tugas tak segerak dalam FastAPI. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!