MongoDB の変更ストリームにより、アプリケーションはリアルタイムのデータ変更に即座に対応できます。このブログ投稿では、理論にはあまり立ち入りませんが、Python で変更ストリームを設定して使用する方法を説明します。最初に挿入に焦点を当て、それを他のイベント タイプに拡張して、データベース イベントをリッスンする単純なプログラムを作成します。
変更ストリームを使用すると、アプリは挿入や更新などの特定のデータベース イベントをリッスンし、すぐに応答できます。ユーザーが自分のプロフィールを更新するシナリオを想像してください。変更ストリームを使用すると、ユーザーがページを更新しなくても、この変更をアプリ全体に即座に反映できます。この機能が登場する前は、データベースを継続的にポーリングするか、MongoDB Oplog を追跡するなどの複雑な方法を使用する必要がありました。変更ストリームは、よりユーザーフレンドリーな API を提供することでこれを簡素化します。
請求書をアップロードするための API があるとします。お客様が請求書の画像をMongoDBにアップロードしていただき、AIで情報を抽出して請求書を更新するという流れになります。請求書をアップロードするコードの例は次のとおりです:
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
念のため、まず pymongo をクラス内にラップします:))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
合理的な質問は次のとおりです。AI モデルが画像を処理するまで待ってから更新しないのはなぜですか?問題は、処理に約 4 ~ 5 分かかることですが、ユーザー エクスペリエンスに影響を与えたくないということです。
もう 1 つのオプションは、Kafka を使用することです。イメージを Kafka トピックに公開すると、別のサービスがデータを処理できます。
長所:
短所:
ここでは、Kafka を使用して請求書のアップロード プロセスを処理する方法を示す基本的な実装を示します。
ユーザーは API エンドポイントを通じて請求書をアップロードします。請求書の画像は MongoDB に保存され、メッセージはさらなる処理のために Kafka トピックに送信されます。
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Kafka コンシューマーは、invoice_topic をリッスンします。メッセージを受信すると、請求書を処理し (画像から情報を抽出するなど)、MongoDB 内の対応するドキュメントを更新します。
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
フローの概要:
わあ、これを自分で書くことができたなんて信じられません!それは、それに伴う努力を本当に強調しています。それには、MongoDB、Kafka、Invoice サービスの 3 つのサービスの管理と構成の複雑さは考慮されていません。
ここでは、MongoDB 変更ストリームを示すために Markdown で書き直された完全なコードを示します。これには、変更ストリームによってトリガーされる請求書処理を処理する追加のメソッドと関数が含まれます。
ドキュメントの作成や変更ストリームの待機などのデータベース操作を処理する MongoDB ラッパー クラスを作成することから始めます。
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
簡単にするために、MongoDatabase クラス内に process_invoice を追加します。でも、それは別の場所に置いておくべきです
アップロード API はオリジナルのものと同様である必要があります。
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
フローの概要:
MongoDB 変更ストリームを使用すると、データベース内のリアルタイムの変更を効率的に処理できます。この例を拡張すると、更新や削除などのさまざまなイベントを処理でき、アプリケーションの反応性と応答性が向上します。
以上がMongoDB 変更ストリームと Python を使用したリアルタイム データ処理の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。