Avant de plonger dans les détails techniques, comprenons pourquoi une journalisation appropriée est importante :
Pour ceux qui découvrent la journalisation Python, voici un exemple de base utilisant logging.basicConfig :
# Simple python logging example import logging # Basic logger in python example logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a logger logger = logging.getLogger(__name__) # Logger in python example logger.info("This is an information message") logger.warning("This is a warning message")
Cet exemple montre les bases du module de journalisation en Python et montre comment utiliser la journalisation Python Logger dans votre application.
Commençons par une configuration de journalisation simple :
import logging # Basic configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Your first logger logger = logging.getLogger(__name__) # Using the logger logger.info("Application started") logger.warning("Watch out!") logger.error("Something went wrong")
La journalisation Python est livrée avec cinq niveaux standards :
Level | Numeric Value | When to Use |
---|---|---|
DEBUG | 10 | Detailed information for diagnosing problems |
INFO | 20 | General operational events |
WARNING | 30 | Something unexpected happened |
ERROR | 40 | More serious problem |
CRITICAL | 50 | Program may not be able to continue |
Pourquoi choisir la journalisation plutôt que les relevés imprimés ?
# Simple python logging example import logging # Basic logger in python example logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a logger logger = logging.getLogger(__name__) # Logger in python example logger.info("This is an information message") logger.warning("This is a warning message")
Pour les applications plus complexes :
import logging # Basic configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Your first logger logger = logging.getLogger(__name__) # Using the logger logger.info("Application started") logger.warning("Watch out!") logger.error("Something went wrong")
La journalisation structurée fournit un format cohérent et lisible par machine, essentiel pour l'analyse et la surveillance des journaux. Pour un aperçu complet des modèles de journalisation structurée et des meilleures pratiques, consultez le guide de journalisation structurée. Implémentons la journalisation structurée en Python :
logging.basicConfig( filename='app.log', filemode='w', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S' )
Une journalisation appropriée des erreurs est cruciale pour le débogage des problèmes de production. Voici une approche globale :
config = { 'version': 1, 'formatters': { 'detailed': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' } }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'INFO', 'formatter': 'detailed' }, 'file': { 'class': 'logging.FileHandler', 'filename': 'app.log', 'level': 'DEBUG', 'formatter': 'detailed' } }, 'loggers': { 'myapp': { 'handlers': ['console', 'file'], 'level': 'DEBUG', 'propagate': True } } } logging.config.dictConfig(config)
Lorsque vous vous connectez à des applications multithread, vous devez garantir la sécurité des threads :
import json import logging from datetime import datetime class JSONFormatter(logging.Formatter): def __init__(self): super().__init__() def format(self, record): # Create base log record log_obj = { "timestamp": self.formatTime(record, self.datefmt), "name": record.name, "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Add exception info if present if record.exc_info: log_obj["exception"] = self.formatException(record.exc_info) # Add custom fields from extra if hasattr(record, "extra_fields"): log_obj.update(record.extra_fields) return json.dumps(log_obj) # Usage Example logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) # Log with extra fields logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})
Différents environnements d'application nécessitent des approches de journalisation spécifiques. Que vous travailliez avec des applications Web, des microservices ou des tâches en arrière-plan, chaque environnement a des exigences de journalisation et des bonnes pratiques uniques. Explorons comment mettre en œuvre une journalisation efficace dans différents scénarios de déploiement.
Voici une configuration complète de la journalisation Django :
import traceback import sys from contextlib import contextmanager class ErrorLogger: def __init__(self, logger): self.logger = logger @contextmanager def error_context(self, operation_name, **context): """Context manager for error logging with additional context""" try: yield except Exception as e: # Capture the current stack trace exc_type, exc_value, exc_traceback = sys.exc_info() # Format error details error_details = { "operation": operation_name, "error_type": exc_type.__name__, "error_message": str(exc_value), "context": context, "stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback) } # Log the error with full context self.logger.error( f"Error in {operation_name}: {str(exc_value)}", extra={"error_details": error_details} ) # Re-raise the exception raise # Usage Example logger = logging.getLogger(__name__) error_logger = ErrorLogger(logger) with error_logger.error_context("user_authentication", user_id="123", attempt=2): # Your code that might raise an exception authenticate_user(user_id)
Flask fournit son propre système de journalisation qui peut être personnalisé :
import threading import logging from queue import Queue from logging.handlers import QueueHandler, QueueListener def setup_thread_safe_logging(): """Set up thread-safe logging with a queue""" # Create the queue log_queue = Queue() # Create handlers console_handler = logging.StreamHandler() file_handler = logging.FileHandler('app.log') # Create queue handler and listener queue_handler = QueueHandler(log_queue) listener = QueueListener( log_queue, console_handler, file_handler, respect_handler_level=True ) # Configure root logger root_logger = logging.getLogger() root_logger.addHandler(queue_handler) # Start the listener in a separate thread listener.start() return listener # Usage listener = setup_thread_safe_logging() def worker_function(): logger = logging.getLogger(__name__) logger.info(f"Worker thread {threading.current_thread().name} starting") # Do work... logger.info(f"Worker thread {threading.current_thread().name} finished") # Create and start threads threads = [ threading.Thread(target=worker_function) for _ in range(3) ] for thread in threads: thread.start()
FastAPI peut exploiter la journalisation de Python avec certaines améliorations du middleware :
# settings.py LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'verbose': { 'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}', 'style': '{', }, 'simple': { 'format': '{levelname} {message}', 'style': '{', }, }, 'filters': { 'require_debug_true': { '()': 'django.utils.log.RequireDebugTrue', }, }, 'handlers': { 'console': { 'level': 'INFO', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'simple' }, 'file': { 'level': 'ERROR', 'class': 'logging.FileHandler', 'filename': 'django-errors.log', 'formatter': 'verbose' }, 'mail_admins': { 'level': 'ERROR', 'class': 'django.utils.log.AdminEmailHandler', 'include_html': True, } }, 'loggers': { 'django': { 'handlers': ['console'], 'propagate': True, }, 'django.request': { 'handlers': ['file', 'mail_admins'], 'level': 'ERROR', 'propagate': False, }, 'myapp': { 'handlers': ['console', 'file'], 'level': 'INFO', } } }
Pour les microservices, le traçage distribué et les identifiants de corrélation sont essentiels :
import logging from logging.handlers import RotatingFileHandler from flask import Flask, request app = Flask(__name__) def setup_logger(): # Create formatter formatter = logging.Formatter( '[%(asctime)s] %(levelname)s in %(module)s: %(message)s' ) # File Handler file_handler = RotatingFileHandler( 'flask_app.log', maxBytes=10485760, # 10MB backupCount=10 ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) # Add request context class RequestFormatter(logging.Formatter): def format(self, record): record.url = request.url record.remote_addr = request.remote_addr return super().format(record) # Configure app logger app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) return app.logger # Usage in routes @app.route('/api/endpoint') def api_endpoint(): app.logger.info(f'Request received from {request.remote_addr}') # Your code here return jsonify({'status': 'success'})
Pour les tâches en arrière-plan, nous devons assurer une gestion et une rotation appropriées des journaux :
from fastapi import FastAPI, Request from typing import Callable import logging import time app = FastAPI() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Middleware for request logging @app.middleware("http") async def log_requests(request: Request, call_next: Callable): start_time = time.time() response = await call_next(request) duration = time.time() - start_time log_dict = { "url": str(request.url), "method": request.method, "client_ip": request.client.host, "duration": f"{duration:.2f}s", "status_code": response.status_code } logger.info(f"Request processed: {log_dict}") return response # Example endpoint with logging @app.get("/items/{item_id}") async def read_item(item_id: int): logger.info(f"Retrieving item {item_id}") # Your code here return {"item_id": item_id}
Mise en œuvre du suivi des demandes dans votre application :
import logging import contextvars from uuid import uuid4 # Create context variable for trace ID trace_id_var = contextvars.ContextVar('trace_id', default=None) class TraceIDFilter(logging.Filter): def filter(self, record): trace_id = trace_id_var.get() record.trace_id = trace_id if trace_id else 'no_trace' return True def setup_microservice_logging(service_name): logger = logging.getLogger(service_name) # Create formatter with trace ID formatter = logging.Formatter( '%(asctime)s - %(name)s - [%(trace_id)s] - %(levelname)s - %(message)s' ) # Add handlers with trace ID filter handler = logging.StreamHandler() handler.setFormatter(formatter) handler.addFilter(TraceIDFilter()) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger # Usage in microservice logger = setup_microservice_logging('order_service') def process_order(order_data): # Generate or get trace ID from request trace_id_var.set(str(uuid4())) logger.info("Starting order processing", extra={ 'order_id': order_data['id'], 'customer_id': order_data['customer_id'] }) # Process order... logger.info("Order processed successfully")
Suivez les actions des utilisateurs en toute sécurité :
# Simple python logging example import logging # Basic logger in python example logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a logger logger = logging.getLogger(__name__) # Logger in python example logger.info("This is an information message") logger.warning("This is a warning message")
Un dépannage efficace des problèmes de journalisation nécessite une compréhension des problèmes courants et de leurs solutions. Cette section couvre les défis les plus fréquents auxquels les développeurs sont confrontés lors de la mise en œuvre de la journalisation et fournit des solutions pratiques pour déboguer les configurations de journalisation.
import logging # Basic configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Your first logger logger = logging.getLogger(__name__) # Using the logger logger.info("Application started") logger.warning("Watch out!") logger.error("Something went wrong")
logging.basicConfig( filename='app.log', filemode='w', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S' )
config = { 'version': 1, 'formatters': { 'detailed': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' } }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'INFO', 'formatter': 'detailed' }, 'file': { 'class': 'logging.FileHandler', 'filename': 'app.log', 'level': 'DEBUG', 'formatter': 'detailed' } }, 'loggers': { 'myapp': { 'handlers': ['console', 'file'], 'level': 'DEBUG', 'propagate': True } } } logging.config.dictConfig(config)
import json import logging from datetime import datetime class JSONFormatter(logging.Formatter): def __init__(self): super().__init__() def format(self, record): # Create base log record log_obj = { "timestamp": self.formatTime(record, self.datefmt), "name": record.name, "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Add exception info if present if record.exc_info: log_obj["exception"] = self.formatException(record.exc_info) # Add custom fields from extra if hasattr(record, "extra_fields"): log_obj.update(record.extra_fields) return json.dumps(log_obj) # Usage Example logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) # Log with extra fields logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})
import traceback import sys from contextlib import contextmanager class ErrorLogger: def __init__(self, logger): self.logger = logger @contextmanager def error_context(self, operation_name, **context): """Context manager for error logging with additional context""" try: yield except Exception as e: # Capture the current stack trace exc_type, exc_value, exc_traceback = sys.exc_info() # Format error details error_details = { "operation": operation_name, "error_type": exc_type.__name__, "error_message": str(exc_value), "context": context, "stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback) } # Log the error with full context self.logger.error( f"Error in {operation_name}: {str(exc_value)}", extra={"error_details": error_details} ) # Re-raise the exception raise # Usage Example logger = logging.getLogger(__name__) error_logger = ErrorLogger(logger) with error_logger.error_context("user_authentication", user_id="123", attempt=2): # Your code that might raise an exception authenticate_user(user_id)
import threading import logging from queue import Queue from logging.handlers import QueueHandler, QueueListener def setup_thread_safe_logging(): """Set up thread-safe logging with a queue""" # Create the queue log_queue = Queue() # Create handlers console_handler = logging.StreamHandler() file_handler = logging.FileHandler('app.log') # Create queue handler and listener queue_handler = QueueHandler(log_queue) listener = QueueListener( log_queue, console_handler, file_handler, respect_handler_level=True ) # Configure root logger root_logger = logging.getLogger() root_logger.addHandler(queue_handler) # Start the listener in a separate thread listener.start() return listener # Usage listener = setup_thread_safe_logging() def worker_function(): logger = logging.getLogger(__name__) logger.info(f"Worker thread {threading.current_thread().name} starting") # Do work... logger.info(f"Worker thread {threading.current_thread().name} finished") # Create and start threads threads = [ threading.Thread(target=worker_function) for _ in range(3) ] for thread in threads: thread.start()
# settings.py LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'verbose': { 'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}', 'style': '{', }, 'simple': { 'format': '{levelname} {message}', 'style': '{', }, }, 'filters': { 'require_debug_true': { '()': 'django.utils.log.RequireDebugTrue', }, }, 'handlers': { 'console': { 'level': 'INFO', 'filters': ['require_debug_true'], 'class': 'logging.StreamHandler', 'formatter': 'simple' }, 'file': { 'level': 'ERROR', 'class': 'logging.FileHandler', 'filename': 'django-errors.log', 'formatter': 'verbose' }, 'mail_admins': { 'level': 'ERROR', 'class': 'django.utils.log.AdminEmailHandler', 'include_html': True, } }, 'loggers': { 'django': { 'handlers': ['console'], 'propagate': True, }, 'django.request': { 'handlers': ['file', 'mail_admins'], 'level': 'ERROR', 'propagate': False, }, 'myapp': { 'handlers': ['console', 'file'], 'level': 'INFO', } } }
import logging from logging.handlers import RotatingFileHandler from flask import Flask, request app = Flask(__name__) def setup_logger(): # Create formatter formatter = logging.Formatter( '[%(asctime)s] %(levelname)s in %(module)s: %(message)s' ) # File Handler file_handler = RotatingFileHandler( 'flask_app.log', maxBytes=10485760, # 10MB backupCount=10 ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) # Add request context class RequestFormatter(logging.Formatter): def format(self, record): record.url = request.url record.remote_addr = request.remote_addr return super().format(record) # Configure app logger app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) return app.logger # Usage in routes @app.route('/api/endpoint') def api_endpoint(): app.logger.info(f'Request received from {request.remote_addr}') # Your code here return jsonify({'status': 'success'})
from fastapi import FastAPI, Request from typing import Callable import logging import time app = FastAPI() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Middleware for request logging @app.middleware("http") async def log_requests(request: Request, call_next: Callable): start_time = time.time() response = await call_next(request) duration = time.time() - start_time log_dict = { "url": str(request.url), "method": request.method, "client_ip": request.client.host, "duration": f"{duration:.2f}s", "status_code": response.status_code } logger.info(f"Request processed: {log_dict}") return response # Example endpoint with logging @app.get("/items/{item_id}") async def read_item(item_id: int): logger.info(f"Retrieving item {item_id}") # Your code here return {"item_id": item_id}
import logging import contextvars from uuid import uuid4 # Create context variable for trace ID trace_id_var = contextvars.ContextVar('trace_id', default=None) class TraceIDFilter(logging.Filter): def filter(self, record): trace_id = trace_id_var.get() record.trace_id = trace_id if trace_id else 'no_trace' return True def setup_microservice_logging(service_name): logger = logging.getLogger(service_name) # Create formatter with trace ID formatter = logging.Formatter( '%(asctime)s - %(name)s - [%(trace_id)s] - %(levelname)s - %(message)s' ) # Add handlers with trace ID filter handler = logging.StreamHandler() handler.setFormatter(formatter) handler.addFilter(TraceIDFilter()) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger # Usage in microservice logger = setup_microservice_logging('order_service') def process_order(order_data): # Generate or get trace ID from request trace_id_var.set(str(uuid4())) logger.info("Starting order processing", extra={ 'order_id': order_data['id'], 'customer_id': order_data['customer_id'] }) # Process order... logger.info("Order processed successfully")
Loguru fournit une interface de journalisation plus simple avec des fonctionnalités puissantes prêtes à l'emploi :
from logging.handlers import RotatingFileHandler import logging import threading from datetime import datetime class BackgroundTaskLogger: def __init__(self, task_name): self.logger = logging.getLogger(f'background_task.{task_name}') self.setup_logging() def setup_logging(self): # Create logs directory if it doesn't exist import os os.makedirs('logs', exist_ok=True) # Setup rotating file handler handler = RotatingFileHandler( filename=f'logs/task_{datetime.now():%Y%m%d}.log', maxBytes=5*1024*1024, # 5MB backupCount=5 ) # Create formatter formatter = logging.Formatter( '%(asctime)s - [%(threadName)s] - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def log_task_status(self, status, **kwargs): """Log task status with additional context""" extra = { 'thread_id': threading.get_ident(), 'timestamp': datetime.now().isoformat(), **kwargs } self.logger.info(f"Task status: {status}", extra=extra) # Usage example def background_job(): logger = BackgroundTaskLogger('data_processing') try: logger.log_task_status('started', job_id=123) # Do some work... logger.log_task_status('completed', records_processed=1000) except Exception as e: logger.logger.error(f"Task failed: {str(e)}", exc_info=True)
Structlog est excellent pour la journalisation structurée avec contexte :
import logging from contextlib import contextmanager import threading import uuid # Store request ID in thread-local storage _request_id = threading.local() class RequestIDFilter(logging.Filter): def filter(self, record): record.request_id = getattr(_request_id, 'id', 'no_request_id') return True @contextmanager def request_context(request_id=None): """Context manager for request tracking""" if request_id is None: request_id = str(uuid.uuid4()) old_id = getattr(_request_id, 'id', None) _request_id.id = request_id try: yield request_id finally: if old_id is None: del _request_id.id else: _request_id.id = old_id # Setup logging with request ID def setup_request_logging(): logger = logging.getLogger() formatter = logging.Formatter( '%(asctime)s - [%(request_id)s] - %(levelname)s - %(message)s' ) handler = logging.StreamHandler() handler.setFormatter(formatter) handler.addFilter(RequestIDFilter()) logger.addHandler(handler) return logger # Usage example logger = setup_request_logging() def process_request(data): with request_context() as request_id: logger.info("Processing request", extra={ 'data': data, 'operation': 'process_request' }) # Process the request... logger.info("Request processed successfully")
Pour la journalisation au format JSON :
# Simple python logging example import logging # Basic logger in python example logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create a logger logger = logging.getLogger(__name__) # Logger in python example logger.info("This is an information message") logger.warning("This is a warning message")
import logging # Basic configuration logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Your first logger logger = logging.getLogger(__name__) # Using the logger logger.info("Application started") logger.warning("Watch out!") logger.error("Something went wrong")
logging.basicConfig( filename='app.log', filemode='w', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S' )
config = { 'version': 1, 'formatters': { 'detailed': { 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' } }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'level': 'INFO', 'formatter': 'detailed' }, 'file': { 'class': 'logging.FileHandler', 'filename': 'app.log', 'level': 'DEBUG', 'formatter': 'detailed' } }, 'loggers': { 'myapp': { 'handlers': ['console', 'file'], 'level': 'DEBUG', 'propagate': True } } } logging.config.dictConfig(config)
import json import logging from datetime import datetime class JSONFormatter(logging.Formatter): def __init__(self): super().__init__() def format(self, record): # Create base log record log_obj = { "timestamp": self.formatTime(record, self.datefmt), "name": record.name, "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno } # Add exception info if present if record.exc_info: log_obj["exception"] = self.formatException(record.exc_info) # Add custom fields from extra if hasattr(record, "extra_fields"): log_obj.update(record.extra_fields) return json.dumps(log_obj) # Usage Example logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(JSONFormatter()) logger.addHandler(handler) # Log with extra fields logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})
import traceback import sys from contextlib import contextmanager class ErrorLogger: def __init__(self, logger): self.logger = logger @contextmanager def error_context(self, operation_name, **context): """Context manager for error logging with additional context""" try: yield except Exception as e: # Capture the current stack trace exc_type, exc_value, exc_traceback = sys.exc_info() # Format error details error_details = { "operation": operation_name, "error_type": exc_type.__name__, "error_message": str(exc_value), "context": context, "stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback) } # Log the error with full context self.logger.error( f"Error in {operation_name}: {str(exc_value)}", extra={"error_details": error_details} ) # Re-raise the exception raise # Usage Example logger = logging.getLogger(__name__) error_logger = ErrorLogger(logger) with error_logger.error_context("user_authentication", user_id="123", attempt=2): # Your code that might raise an exception authenticate_user(user_id)
Ce guide couvre les aspects essentiels de la journalisation Python, de la configuration de base aux implémentations avancées. N'oubliez pas que la journalisation fait partie intégrante de l'observabilité et de la maintenance des applications. Mettez-le en œuvre de manière réfléchie et entretenez-le régulièrement pour obtenir les meilleurs résultats.
N'oubliez pas de revoir et de mettre à jour périodiquement votre implémentation de journalisation à mesure que votre application évolue et que de nouvelles exigences apparaissent.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!