Dans les applications LLM, le traitement parallèle est particulièrement adapté pour :
from typing import List, Dict, Any from dataclasses import dataclass import asyncio from langchain.chat_models import ChatOpenAI from langchain.callbacks import AsyncCallbackHandler @dataclass class BatchConfig: """Batch processing configuration""" batch_size: int = 5 max_concurrent_tasks: int = 3 timeout_seconds: int = 30 retry_attempts: int = 2 class BatchProcessor: def __init__(self, config: BatchConfig): self.config = config self.llm = ChatOpenAI( temperature=0, request_timeout=config.timeout_seconds ) self.semaphore = asyncio.Semaphore( config.max_concurrent_tasks ) async def process_batch( self, items: List[Any] ) -> List[Dict]: """Main batch processing function""" batches = self._create_batches(items) results = [] for batch in batches: batch_results = await self._process_batch_with_semaphore( batch ) results.extend(batch_results) return results
class AsyncBatchProcessor(BatchProcessor): async def _process_single_item( self, item: Any ) -> Dict: """Process single item""" async with self.semaphore: for attempt in range(self.config.retry_attempts): try: return await self._execute_processing(item) except Exception as e: if attempt == self.config.retry_attempts - 1: return self._create_error_response(item, e) await asyncio.sleep(2 ** attempt) async def _execute_processing( self, item: Any ) -> Dict: """Execute specific processing logic""" task = asyncio.create_task( self.llm.agenerate([item]) ) try: result = await asyncio.wait_for( task, timeout=self.config.timeout_seconds ) return { "status": "success", "input": item, "result": result } except asyncio.TimeoutError: task.cancel() raise
class DocumentBatchProcessor: def __init__(self): self.config = BatchConfig( batch_size=10, max_concurrent_tasks=5 ) self.processor = AsyncBatchProcessor(self.config) self.results_manager = ResultsManager() async def process_documents( self, documents: List[str] ) -> Dict: """Process document batches""" try: preprocessed = await self._preprocess_documents( documents ) results = await self.processor.process_batch( preprocessed ) return await self.results_manager.merge_results( results ) except Exception as e: return self._handle_batch_error(e, documents)
class ResourceController: def __init__(self): self.token_limit = 4096 self.request_limit = 100 self._request_count = 0 self._token_count = 0 self._reset_time = None async def check_limits(self) -> bool: """Check resource limits""" await self._update_counters() return ( self._request_count < self.request_limit and self._token_count < self.token_limit ) async def track_usage( self, tokens_used: int ): """Track resource usage""" self._token_count += tokens_used self._request_count += 1 async def wait_if_needed(self): """Wait for resource release if necessary""" if not await self.check_limits(): wait_time = self._calculate_wait_time() await asyncio.sleep(wait_time)
class ResultsManager: def __init__(self): self.merge_strategies = { "text": self._merge_text_results, "embeddings": self._merge_embedding_results, "classifications": self._merge_classification_results } async def merge_results( self, results: List[Dict] ) -> Dict: """Merge processing results""" merged = { "success_count": 0, "error_count": 0, "results": [] } for result in results: if result["status"] == "success": merged["success_count"] += 1 merged["results"].append( await self._process_result(result) ) else: merged["error_count"] += 1 return merged
class MemoryManager: def __init__(self, max_memory_mb: int = 1024): self.max_memory = max_memory_mb * 1024 * 1024 self.current_usage = 0 async def monitor_memory(self): """Monitor memory usage""" import psutil process = psutil.Process() memory_info = process.memory_info() if memory_info.rss > self.max_memory: await self._trigger_memory_cleanup() async def _trigger_memory_cleanup(self): """Trigger memory cleanup""" import gc gc.collect()
class PerformanceMonitor: def __init__(self): self.metrics = { "processing_times": [], "error_rates": [], "throughput": [] } async def record_metrics( self, batch_size: int, duration: float, errors: int ): """Record performance metrics""" self.metrics["processing_times"].append(duration) self.metrics["error_rates"].append(errors / batch_size) self.metrics["throughput"].append( batch_size / duration )
Optimisation du traitement par lots
Contrôle de la concurrence
Gestion des erreurs
Niveau système
Niveau d'application
Le traitement parallèle est crucial pour créer des applications LLM hautes performances. Points clés à retenir :
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!