MongoDB 中的更改流允许您的应用程序立即对实时数据更改做出反应。在这篇博文中,我将向您展示如何使用 Python 设置和使用变更流,而无需深入研究理论。我们将创建一个简单的程序来监听数据库事件,首先关注插入,然后将其扩展到其他事件类型。
更改流让您的应用程序能够侦听特定的数据库事件,例如插入或更新,并立即响应。想象一下用户更新其个人资料的场景;通过更改流,您可以立即在应用程序中反映此更改,而无需用户刷新页面。在此功能之前,您必须不断轮询数据库或使用复杂的方法,例如跟踪 MongoDB Oplog。变更流通过提供更加用户友好的 API 来简化这一过程。
假设我有一个上传发票的 API。流程是客户将发票图像上传到 MongoDB,然后我们使用 AI 提取信息并更新发票。以下是上传发票的代码示例:
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
首先,我将把 pymongo 包装在一个类中,以防万一:))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
一个合理的问题可能是:为什么不等到 AI 模型处理完图像后再更新?问题是处理时间大约需要 4-5 分钟,我们不想影响用户体验。
另一种选择是使用 Kafka。我们可以将图像发布到 Kafka 主题,然后另一个服务将处理数据。
优点:
缺点:
这是一个基本实现,演示如何使用 Kafka 处理发票上传过程。
用户通过 API 端点上传发票。发票图像保存在 MongoDB 中,并向 Kafka 主题发送一条消息以进行进一步处理。
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Kafka消费者监听invoice_topic。当它收到消息时,它会处理发票(例如,从图像中提取信息)并更新 MongoDB 中的相应文档。
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
流程摘要:
哇,我不敢相信我自己写了这个!它确实凸显了所涉及的努力。这甚至没有考虑管理和配置三个服务的复杂性:MongoDB、Kafka 和 Invoice 服务。
这是用 Markdown 重写的完整代码,用于演示 MongoDB 变更流,包括用于处理由变更流触发的发票处理的其他方法和函数。
我们将首先创建一个 MongoDB 包装类来处理数据库操作,例如创建文档和监听更改流。
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
为了方便起见,我在 MongoDatabase 类中添加了 process_invoice。但你应该把它留在其他地方
上传API应该和原来的一样。
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
流程摘要:
借助 MongoDB 更改流,您可以高效地处理数据库中的实时更改。扩展此示例,您可以处理各种事件,例如更新和删除,使您的应用程序更具反应性和响应能力。
以上是使用 MongoDB Change Streams 和 Python 进行实时数据处理的详细内容。更多信息请关注PHP中文网其他相关文章!