在本实用指南中,您将学习如何为您的应用程序创建具有内置 LLM 的高度可扩展的模型部署解决方案。
在您的示例中,我们将使用 Hugging Face 的 ChatGPT2 模型,但您可以轻松插入任何其他模型,包括 ChatGPT4、Claude 等。
无论您是设计具有 AI 功能的新应用程序,还是改进现有的 AI 系统,本指南都将帮助您逐步创建强大的 LLM 集成。
在开始编写代码之前,让我们弄清楚构建生产 LLM 集成需要什么。在构建生产就绪的 LLM 集成时,API 调用并不是您需要考虑的唯一事情,您还需要考虑可靠性、成本和稳定性等问题。您的生产应用程序必须解决服务中断、速率限制和响应时间变化等问题,同时控制成本。
这是我们将共同构建的内容:
在我们开始编码之前,请确保您拥有:
想跟随吗?完整的代码可以在您的 GitHub 存储库中找到。
让我们首先准备好您的开发环境。我们将创建一个干净的项目结构并安装所有必要的软件包。
首先,让我们创建项目目录并设置 Python 虚拟环境。打开终端并运行:
mkdir llm_integration && cd llm_integration python3 -m venv env syource env/bin/activate
现在让我们设置您的项目依赖项。使用这些基本包创建一个新的requirements.txt 文件:
transformers==4.36.0 huggingface-hub==0.19.4 redis==4.6.0 pydantic==2.5.0 pydantic-settings==2.1.0 tenacity==8.2.3 python-dotenv==1.0.0 fastapi==0.104.1 uvicorn==0.24.0 torch==2.1.0 numpy==1.24.3
让我们来分析一下为什么我们需要这些包:
使用以下命令安装所有软件包:
mkdir llm_integration && cd llm_integration python3 -m venv env syource env/bin/activate
让我们以干净的结构来组织您的项目。在您的项目目录中创建这些目录和文件:
transformers==4.36.0 huggingface-hub==0.19.4 redis==4.6.0 pydantic==2.5.0 pydantic-settings==2.1.0 tenacity==8.2.3 python-dotenv==1.0.0 fastapi==0.104.1 uvicorn==0.24.0 torch==2.1.0 numpy==1.24.3
让我们从您的LLM客户端开始,这是您申请中最重要的组成部分。这是我们与 ChatGPT 模型(或您喜欢的任何其他 LLM)交互的地方。将以下代码片段添加到您的 core/llm_client.py 文件中:
pip install -r requirements.txt
在 LLMClient 类的第一部分中,我们正在建立基础:
现在让我们添加与您的模型对话的方法:
llm_integration/ ├── core/ │ ├── llm_client.py # your main LLM interaction code │ ├── prompt_manager.py # Handles prompt templates │ └── response_handler.py # Processes LLM responses ├── cache/ │ └── redis_manager.py # Manages your caching system ├── config/ │ └── settings.py # Configuration management ├── api/ │ └── routes.py # API endpoints ├── utils/ │ ├── monitoring.py # Usage tracking │ └── rate_limiter.py # Rate limiting logic ├── requirements.txt └── main.py └── usage_logs.json
让我们分解一下这个完成方法中发生了什么:
接下来,我们需要添加响应处理程序来解析和构建 LLM 的原始输出。使用以下代码片段在 core/response_handler.py 文件中执行此操作:
import torch from transformers import AutoModelForCausalLM, AutoTokenizer from tenacity import retry, stop_after_attempt, wait_exponential from typing import Dict, Optional import logging class LLMClient: def __init__(self, model_name: str = "gpt2", timeout: int = 30): try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", torch_dtype=torch.float16 ) except Exception as e: logging.error(f"Error loading model: {str(e)}") # Fallback to a simpler model if the specified one fails self.tokenizer = AutoTokenizer.from_pretrained("gpt2") self.model = AutoModelForCausalLM.from_pretrained("gpt2") self.timeout = timeout self.logger = logging.getLogger(__name__)
现在让我们创建您的缓存系统来提高应用程序性能并降低成本。将以下代码片段添加到您的cache/redis_manager.py 文件中:
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), reraise=True ) async def complete(self, prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None) -> Dict: """Get completion from the model with automatic retries""" try: inputs = self.tokenizer(prompt, return_tensors="pt").to( self.model.device ) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_tokens or 100, temperature=temperature, do_sample=True ) response_text = self.tokenizer.decode( outputs[0], skip_special_tokens=True ) # Calculate token usage for monitoring input_tokens = len(inputs.input_ids[0]) output_tokens = len(outputs[0]) - input_tokens return { 'content': response_text, 'usage': { 'prompt_tokens': input_tokens, 'completion_tokens': output_tokens, 'total_tokens': input_tokens + output_tokens }, 'model': "gpt2" } except Exception as e: self.logger.error(f"Error in LLM completion: {str(e)}") raise
在上面的代码片段中,我们创建了一个 CacheManager 类,它通过以下方式处理所有缓存操作:
让我们创建您的提示管理器来管理您的 LLM 模型的提示。将以下代码添加到您的 core/prompt_manager.py 中:
mkdir llm_integration && cd llm_integration python3 -m venv env syource env/bin/activate
然后使用代码片段在您的提示/content_moderation.json 文件中创建用于内容审核的示例提示模板:
transformers==4.36.0 huggingface-hub==0.19.4 redis==4.6.0 pydantic==2.5.0 pydantic-settings==2.1.0 tenacity==8.2.3 python-dotenv==1.0.0 fastapi==0.104.1 uvicorn==0.24.0 torch==2.1.0 numpy==1.24.3
现在,您的提示管理器将能够从 JSON 文件加载提示模板,并获得格式化的提示模板。
为了将所有 LLM 配置保存在一个位置并轻松地在您的应用程序中重复使用它们,让我们创建配置设置。将以下代码添加到您的 config/settings.py 文件中:
pip install -r requirements.txt
接下来,让我们实施速率限制来控制用户访问应用程序资源的方式。为此,请将以下代码添加到您的 utils/rate_limiter.py 文件中:
llm_integration/ ├── core/ │ ├── llm_client.py # your main LLM interaction code │ ├── prompt_manager.py # Handles prompt templates │ └── response_handler.py # Processes LLM responses ├── cache/ │ └── redis_manager.py # Manages your caching system ├── config/ │ └── settings.py # Configuration management ├── api/ │ └── routes.py # API endpoints ├── utils/ │ ├── monitoring.py # Usage tracking │ └── rate_limiter.py # Rate limiting logic ├── requirements.txt └── main.py └── usage_logs.json
在 RateLimiter 中,我们实现了一个可重复使用的 check_rate_limit 方法,该方法可在任何路由中使用,通过简单地传递每个用户在一段时间内允许的周期和请求数量来处理速率限制。
现在让我们在 api/routes.py 文件中创建 API 端点,以将 LLM 集成到您的应用程序中:
import torch from transformers import AutoModelForCausalLM, AutoTokenizer from tenacity import retry, stop_after_attempt, wait_exponential from typing import Dict, Optional import logging class LLMClient: def __init__(self, model_name: str = "gpt2", timeout: int = 30): try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", torch_dtype=torch.float16 ) except Exception as e: logging.error(f"Error loading model: {str(e)}") # Fallback to a simpler model if the specified one fails self.tokenizer = AutoTokenizer.from_pretrained("gpt2") self.model = AutoModelForCausalLM.from_pretrained("gpt2") self.timeout = timeout self.logger = logging.getLogger(__name__)
这里我们在 APIRouter 类中定义了一个 /moderate 端点,它负责组织 API 路由。 @lru_cache 装饰器应用于依赖项注入函数(get_llm_client、get_response_handler、get_cache_manager 和 get_prompt_manager),以确保 LLMClient、CacheManager 和 PromptManager 的实例被缓存以获得更好的性能。用 @router.post 修饰的moderate_content函数定义了一个用于内容审核的POST路由,并利用FastAPI的Depends机制来注入这些依赖项。在函数内部,RateLimiter 类使用设置中的速率限制设置进行配置,强制执行请求限制。
最后,让我们更新您的 main.py 以将所有内容整合在一起:
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), reraise=True ) async def complete(self, prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None) -> Dict: """Get completion from the model with automatic retries""" try: inputs = self.tokenizer(prompt, return_tensors="pt").to( self.model.device ) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_tokens or 100, temperature=temperature, do_sample=True ) response_text = self.tokenizer.decode( outputs[0], skip_special_tokens=True ) # Calculate token usage for monitoring input_tokens = len(inputs.input_ids[0]) output_tokens = len(outputs[0]) - input_tokens return { 'content': response_text, 'usage': { 'prompt_tokens': input_tokens, 'completion_tokens': output_tokens, 'total_tokens': input_tokens + output_tokens }, 'model': "gpt2" } except Exception as e: self.logger.error(f"Error in LLM completion: {str(e)}") raise
在上面的代码中,我们使用 /api/v1 前缀下的 api.routes 创建了一个 FastAPI 应用程序和路由器。启用日志记录以显示带有时间戳的信息消息。该应用程序将使用 Uvicorn 运行 localhost:8000,并启用热重载。
现在所有组件都已就位,让我们开始启动并运行您的应用程序。首先,在项目根目录中创建一个 .env 文件并添加您的 HUGGINGFACE_API_KEY 和 REDIS_URL:
mkdir llm_integration && cd llm_integration python3 -m venv env syource env/bin/activate
然后确保 Redis 正在您的计算机上运行。在大多数基于 Unix 的系统上,您可以使用以下命令启动它:
transformers==4.36.0 huggingface-hub==0.19.4 redis==4.6.0 pydantic==2.5.0 pydantic-settings==2.1.0 tenacity==8.2.3 python-dotenv==1.0.0 fastapi==0.104.1 uvicorn==0.24.0 torch==2.1.0 numpy==1.24.3
现在您可以开始申请:
pip install -r requirements.txt
您的 FastAPI 服务器将开始在 http://localhost:8000 上运行。自动 API 文档将在 http://localhost:8000/docs 上提供 - 这对于测试您的端点非常有帮助!
让我们用真实的请求来测试您新创建的 API。打开一个新终端并运行以下curl命令:
llm_integration/ ├── core/ │ ├── llm_client.py # your main LLM interaction code │ ├── prompt_manager.py # Handles prompt templates │ └── response_handler.py # Processes LLM responses ├── cache/ │ └── redis_manager.py # Manages your caching system ├── config/ │ └── settings.py # Configuration management ├── api/ │ └── routes.py # API endpoints ├── utils/ │ ├── monitoring.py # Usage tracking │ └── rate_limiter.py # Rate limiting logic ├── requirements.txt └── main.py └── usage_logs.json
您应该在终端上看到如下响应:
现在让我们添加一些监控功能来跟踪应用程序的执行情况以及正在使用的资源量。将以下代码添加到您的 utils/monitoring.py 文件中:
import torch from transformers import AutoModelForCausalLM, AutoTokenizer from tenacity import retry, stop_after_attempt, wait_exponential from typing import Dict, Optional import logging class LLMClient: def __init__(self, model_name: str = "gpt2", timeout: int = 30): try: self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", torch_dtype=torch.float16 ) except Exception as e: logging.error(f"Error loading model: {str(e)}") # Fallback to a simpler model if the specified one fails self.tokenizer = AutoTokenizer.from_pretrained("gpt2") self.model = AutoModelForCausalLM.from_pretrained("gpt2") self.timeout = timeout self.logger = logging.getLogger(__name__)
UsageMonitor 类将执行以下操作:
接下来,添加一个新的方法来计算使用情况统计:
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), reraise=True ) async def complete(self, prompt: str, temperature: float = 0.7, max_tokens: Optional[int] = None) -> Dict: """Get completion from the model with automatic retries""" try: inputs = self.tokenizer(prompt, return_tensors="pt").to( self.model.device ) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_tokens or 100, temperature=temperature, do_sample=True ) response_text = self.tokenizer.decode( outputs[0], skip_special_tokens=True ) # Calculate token usage for monitoring input_tokens = len(inputs.input_ids[0]) output_tokens = len(outputs[0]) - input_tokens return { 'content': response_text, 'usage': { 'prompt_tokens': input_tokens, 'completion_tokens': output_tokens, 'total_tokens': input_tokens + output_tokens }, 'model': "gpt2" } except Exception as e: self.logger.error(f"Error in LLM completion: {str(e)}") raise
更新您的API以添加UsageMonitor类中的监控功能:
from typing import Dict import logging class ResponseHandler: def __init__(self): self.logger = logging.getLogger(__name__) def parse_moderation_response(self, raw_response: str) -> Dict: """Parse and structure the raw LLM response for moderation""" try: # Default response structure structured_response = { "is_appropriate": True, "confidence_score": 0.0, "reason": None } # Simple keyword-based analysis lower_response = raw_response.lower() # Check for inappropriate content signals if any(word in lower_response for word in ['inappropriate', 'unsafe', 'offensive', 'harmful']): structured_response["is_appropriate"] = False structured_response["confidence_score"] = 0.9 # Extract reason if present if "because" in lower_response: reason_start = lower_response.find("because") structured_response["reason"] = raw_response[reason_start:].split('.')[0].strip() else: structured_response["confidence_score"] = 0.95 return structured_response except Exception as e: self.logger.error(f"Error parsing response: {str(e)}") return { "is_appropriate": True, "confidence_score": 0.5, "reason": "Failed to parse response" } def format_response(self, raw_response: Dict) -> Dict: """Format the final response with parsed content and usage stats""" try: return { "content": self.parse_moderation_response(raw_response["content"]), "usage": raw_response["usage"], "model": raw_response["model"] } except Exception as e: self.logger.error(f"Error formatting response: {str(e)}") raise
现在,通过运行以下curl 命令来测试您的 /stats 端点:
import redis from typing import Optional, Any import json import hashlib class CacheManager: def __init__(self, redis_url: str, ttl: int = 3600): self.redis = redis.from_url(redis_url) self.ttl = ttl def _generate_key(self, prompt: str, params: dict) -> str: """Generate a unique cache key""" cache_data = { 'prompt': prompt, 'params': params } serialized = json.dumps(cache_data, sort_keys=True) return hashlib.sha256(serialized.encode()).hexdigest() async def get_cached_response(self, prompt: str, params: dict) -> Optional[dict]: """Retrieve cached LLM response""" key = self._generate_key(prompt, params) cached = self.redis.get(key) return json.loads(cached) if cached else None async def cache_response(self, prompt: str, params: dict, response: dict) -> None: """Cache LLM response""" key = self._generate_key(prompt, params) self.redis.setex( key, self.ttl, json.dumps(response) )
上面的命令将向您显示 /moderate 端点上的请求的统计信息,如下面的屏幕截图所示:
通过本教程,我们学习了如何在生产应用程序中使用大型语言模型。您实现了 API 客户端、缓存、提示管理和错误处理等功能。作为这些概念的示例,您开发了一个内容审核系统。
现在您已经有了坚实的基础,您可以通过以下方式增强您的系统:
请记住,在示例中您使用了 ChatGPT2 模型,但您可以调整该系统以与任何 LLM 提供商合作。因此,请选择满足您的要求且在预算之内的型号。
如果您有疑问或想告诉我您正在使用此系统构建什么,请随时与我联系。
编码愉快! ?
以上是在生产应用程序中集成大型语言模型的详细内容。更多信息请关注PHP中文网其他相关文章!