Table des matières
1. Méthode simple et brute - encapsuler la bibliothèque MySQL
2. Sonde Python
3. commencez à créer votre propre module de sonde, puisque l'exemple concerne uniquement le module aiomysql, alors seul le module aiomysql doit être traité dans MetaPathFinder.find_module, et les autres seront ignorés en premier. Nous devons ensuite déterminer quelle fonction d'aiomysql nous voulons. à remplacer. D'un point de vue commercial, nous n'avons généralement besoin que de curseur.execute , curseur.fetchone, curseur.fetchall, curseur.execute et de nombreuses opérations principales, vous devez donc approfondir le curseur pour voir comment changer le code et quelle fonction ce dernier surcharge.
Maison développement back-end Tutoriel Python Comment Python sonde-t-il l'extraction complète des données de la bibliothèque appelante ?

Comment Python sonde-t-il l'extraction complète des données de la bibliothèque appelante ?

May 16, 2023 am 08:46 AM
python

1. Méthode simple et brute - encapsuler la bibliothèque MySQL

Pour compter un processus d'exécution, vous devez connaître la position de début et la position de fin du processus d'exécution, donc la méthode la plus simple et la plus grossière consiste à encapsuler en fonction de la méthode à utiliser. appelé, Implémentez une couche intermédiaire entre le framework appelant la bibliothèque MySQL et la bibliothèque MySQL, et complétez des statistiques chronophages dans la couche intermédiaire, telles que :

# 伪代码
def my_execute(conn, sql, param):
 # 针对MySql库的统计封装组件
 with MyTracer(conn, sql, param):
     # 以下为正常使用MySql库的代码
with conn.cursor as cursor:
 cursor.execute(sql, param)
...
Copier après la connexion

Cela semble être très bon à implémenter, et c'est très pratique à changer, mais comme il s'agit d'une API de niveau supérieur, les modifications sont en fait très rigides. En même temps, certaines pré-opérations seront effectuées dans curseur.execute, comme l'épissage de sql et param, en appelant nextset pour effacer le. données actuelles du curseur, etc. Les données que nous avons finalement obtenues, telles que le temps et la consommation, étaient également inexactes, et il n'y avait aucun moyen d'obtenir des métadonnées détaillées, telles que des codes d'erreur, etc.

Si nous voulons obtenir les données les plus directes et les plus utiles, nous pouvons changez uniquement le code source, puis appelez le code source, mais si chaque bibliothèque doit modifier le code source pour compter, ce serait trop gênant. Heureusement, Python fournit également des interfaces similaires aux sondes et le code source de la bibliothèque. peut être mesuré via des sondes. Remplacez et complétez notre code.

2. Sonde Python

En Python, la fonction de hook d'importation peut être implémentée via sys.meta_path Lorsque les opérations liées à l'importation sont effectuées, les bibliothèques liées à l'importation le seront. être mis en correspondance en fonction des objets définis par sys.meta_path. Pour modifier les objets dans .sys.meta_path, vous devez implémenter une méthode find_module renvoie None ou un objet qui implémente la méthode load_module. remplacez les méthodes pertinentes lors de l'importation de certaines bibliothèques. L'utilisation est la suivante. Utilisez hooktime.sleep pour imprimer le temps consommé pendant le sommeil.

import importlib
import sys
from functools import wraps
def func_wrapper(func):
    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 记录开始时间
        start = time.time()
        result = func(*args, **kwargs)
        # 统计消耗时间
        end = time.time()
        print(f"speed time:{end - start}")
        return result
    return wrapper
class MetaPathFinder:
    def find_module(self, fullname, path=None):
        # 执行时可以看出来在import哪些模块
        print(f'find module:{path}:{fullname}')
        return MetaPathLoader()
class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == '__main__':
    import time
    time.sleep(1)
# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468
Copier après la connexion

3. commencez à créer votre propre module de sonde, puisque l'exemple concerne uniquement le module aiomysql, alors seul le module aiomysql doit être traité dans MetaPathFinder.find_module, et les autres seront ignorés en premier. Nous devons ensuite déterminer quelle fonction d'aiomysql nous voulons. à remplacer. D'un point de vue commercial, nous n'avons généralement besoin que de curseur.execute , curseur.fetchone, curseur.fetchall, curseur.execute et de nombreuses opérations principales, vous devez donc approfondir le curseur pour voir comment changer le code et quelle fonction ce dernier surcharge.

D'abord le code source de curseur.execute (cursor.executemanay également similaire), on constate que la méthode de self.nextset sera d'abord appelée pour récupérer les données de la requête précédente, puis fusionner l'instruction sql. , et enfin interrogez via self._query:

async def execute(self, query, args=None):
    """Executes the given operation
    Executes the given operation substituting any markers with
    the given parameters.
    For example, getting all rows where id is 5:
        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    :param query: ``str`` sql statement
    :param args: ``tuple`` or ``list`` of arguments for sql query
    :returns: ``int``, number of rows that has been produced of affected
    """
    conn = self._get_db()

    while (await self.nextset()):
        pass

    if args is not None:
        query = query % self._escape_args(args, conn)

    await self._query(query)
    self._executed = query
    if self._echo:
        logger.info(query)
        logger.info("%r", args)
    return self._rowcount
Copier après la connexion

Regardez le code source de curseur.fetchone (cursor.fetchall Également similaire), on constate que les données sont en fait obtenues à partir du cache

Ces données ont été obtenues. lors de l'exécution de curseur.execute :

def fetchone(self):
    """Fetch the next row """
    self._check_executed()
    fut = self._loop.create_future()
    if self._rows is None or self._rownumber >= len(self._rows):
        fut.set_result(None)
        return fut
    result = self._rows[self._rownumber]
    self._rownumber += 1
    fut = self._loop.create_future()
    fut.set_result(result)
    return fut
Copier après la connexion
Sur la base de l'analyse ci-dessus, il suffit de surcharger la méthode principale self._query Nous pouvons obtenir les données que nous voulons À partir du code source, nous pouvons savoir que nous pouvons obtenir les données. Paramètres self et sql transmis à self._query. Nous pouvons également obtenir les résultats de la requête basés sur self. En même temps, nous pouvons obtenir les résultats en cours via le décorateur. Avec le temps, toutes les données requises sont essentiellement disponibles. Le code modifié selon l'idée est le suivant :

import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
    import aiomysql
def func_wrapper(func: Callable):
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start: float = time.time()
        func_result: Any = await func(*args, **kwargs)
        end: float = time.time()
        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql
        self: aiomysql.Cursor = args[0]
        sql: str = args[1]
        # 通过self,我们可以拿到其他的数据
        db: str = self._connection.db
        user: str = self._connection.user
        host: str = self._connection.host
        port: str = self._connection.port
        execute_result: Tuple[Tuple] = self._rows
        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, 
        # 这里只是打印一部分数据出来
        print({
            "sql": sql,
            "db": db,
            "user": user,
            "host": host,
            "port": port,
            "result": execute_result,
            "speed time": end - start
        })
        return func_result
    return cast(Callable, wrapper)
class MetaPathFinder:

    @staticmethod
    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
        if fullname == 'aiomysql':
            # 只有aiomysql才进行hook
            return MetaPathLoader()
        else:
            return None
class MetaPathLoader:
    @staticmethod
    def load_module(fullname: str):
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder: "MetaPathFinder" = sys.meta_path.pop(0)
        # 导入 module
        module: ModuleType = importlib.import_module(fullname)
        # 针对_query进行hook
        module.Cursor._query = func_wrapper(module.Cursor._query)
        sys.meta_path.insert(0, finder)
        return module
async def test_mysql() -> None:
    import aiomysql
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

if __name__ == '__main__':
    sys.meta_path.insert(0, MetaPathFinder())
    import asyncio
    asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
Copier après la connexion

Cet exemple semble très bon, mais la logique doit être appelée explicitement à l'entrée de l'appel, généralement pour un projet. Il peut y avoir plusieurs entrées. Chaque entrée montre que. appeler cette logique sera très gênant, et notre logique de hook doit d'abord être appelée avant l'importation, de cette façon, les spécifications d'introduction doivent être définies, sinon le hook peut ne pas réussir à certains endroits. Si c'est possible, ce problème peut être parfaitement résolu. en faisant en sorte que la logique d'introduction du hook soit exécutée immédiatement après le démarrage de l'analyseur.Après avoir vérifié les informations, nous avons constaté que lorsque l'interpréteur python est initialisé, il importera automatiquement les modules sitecustomize et usercustomize qui existent sous PYTHONPATH. le module et écrivez notre fonction de remplacement dans le module.

.
├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py
Copier après la connexion
hook_aiomysql.py est notre code pour créer des sondes à titre d'exemple, et le code stocké dans sitecustomize.py est le suivant. C'est très simple, introduisez simplement notre code de sonde et insérez-le dans sys.meta_path :
import sys
from hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())
Copier après la connexion

. test_auto_hook.py Voici le code de test :

import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())
Copier après la connexion

Ensuite, il nous suffit de définir PYTHONPATH et d'exécuter notre code (s'il s'agit d'un projet, il est généralement démarré par le superviseur, et PYTHONPATH peut être défini dans le fichier de configuration) :

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.      
(.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py 
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
Copier après la connexion
4. Méthode de remplacement direct

Vous pouvez voir que la méthode ci-dessus fonctionne très bien et peut être facilement intégrée à notre projet. Cependant, il est difficile de l'extraire dans une bibliothèque tierce si cela dépend du. sitecustomize.py Si vous souhaitez séparer une bibliothèque tierce, vous devez vous demander s'il existe d'autres méthodes. Lors de l'introduction de MetaPathLoader ci-dessus, j'ai mentionné sys.module, dans lequel sys.modules est utilisé pour réduire les introductions répétées :

class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
Copier après la connexion

Le principe de la réduction des introductions répétées est que chaque fois qu'un module est introduit, il sera stocké dans sys.modules. S'il est introduit à plusieurs reprises, il sera directement actualisé vers le dernier module introduit. La raison pour laquelle nous envisageons de réduire les importations répétées ci-dessus est que nous ne mettrons pas à niveau les dépendances des bibliothèques tierces lorsque le programme est en cours d'exécution. Profitant du fait que nous n'avons pas besoin d'envisager d'introduire à plusieurs reprises des modules portant le même nom et des implémentations différentes, et que sys.modules mettra en cache les modules importés, nous pouvons simplifier la logique ci-dessus dans l'importation de modules -> Remplacer la méthode de module actuelle par la méthode hook que nous avons modifiée.

import time
from functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
    """和上面一样的封装函数, 这里简单略过"""
# 判断是否hook过
_IS_HOOK: bool = False
# 存放原来的_query
_query: Callable = aiomysql.Cursor._query
# hook函数
def install_hook() -> None:
    _IS_HOOK = False
    if _IS_HOOK:
        return
    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
    _IS_HOOK = True
# 还原到原来的函数方法
def reset_hook() -> None:
    aiomysql.Cursor._query = _query
    _IS_HOOK = False
Copier après la connexion

代码简单明了,接下来跑一跑刚才的测试:

import asyncio
import aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")
Copier après la connexion

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}
reset hook
end
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

AI Hentai Generator

AI Hentai Generator

Générez AI Hentai gratuitement.

Article chaud

R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Meilleurs paramètres graphiques
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Comment réparer l'audio si vous n'entendez personne
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Comment déverrouiller tout dans Myrise
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Quelle est la raison pour laquelle PS continue de montrer le chargement? Quelle est la raison pour laquelle PS continue de montrer le chargement? Apr 06, 2025 pm 06:39 PM

Les problèmes de «chargement» PS sont causés par des problèmes d'accès aux ressources ou de traitement: la vitesse de lecture du disque dur est lente ou mauvaise: utilisez Crystaldiskinfo pour vérifier la santé du disque dur et remplacer le disque dur problématique. Mémoire insuffisante: améliorez la mémoire pour répondre aux besoins de PS pour les images à haute résolution et le traitement complexe de couche. Les pilotes de la carte graphique sont obsolètes ou corrompues: mettez à jour les pilotes pour optimiser la communication entre le PS et la carte graphique. Les chemins de fichier sont trop longs ou les noms de fichiers ont des caractères spéciaux: utilisez des chemins courts et évitez les caractères spéciaux. Problème du PS: réinstaller ou réparer le programme d'installation PS.

Comment résoudre le problème du chargement lorsque PS est démarré? Comment résoudre le problème du chargement lorsque PS est démarré? Apr 06, 2025 pm 06:36 PM

Un PS est coincé sur le "chargement" lors du démarrage peut être causé par diverses raisons: désactiver les plugins corrompus ou conflictuels. Supprimer ou renommer un fichier de configuration corrompu. Fermez des programmes inutiles ou améliorez la mémoire pour éviter une mémoire insuffisante. Passez à un entraînement à semi-conducteurs pour accélérer la lecture du disque dur. Réinstaller PS pour réparer les fichiers système corrompus ou les problèmes de package d'installation. Afficher les informations d'erreur pendant le processus de démarrage de l'analyse du journal d'erreur.

Comment accélérer la vitesse de chargement de PS? Comment accélérer la vitesse de chargement de PS? Apr 06, 2025 pm 06:27 PM

La résolution du problème du démarrage lent Photoshop nécessite une approche à plusieurs volets, notamment: la mise à niveau du matériel (mémoire, lecteur à semi-conducteurs, CPU); des plug-ins désinstallés ou incompatibles; nettoyer régulièrement les déchets du système et des programmes de fond excessifs; clôture des programmes non pertinents avec prudence; Éviter d'ouvrir un grand nombre de fichiers pendant le démarrage.

Comment résoudre le problème du chargement lorsque le PS ouvre le fichier? Comment résoudre le problème du chargement lorsque le PS ouvre le fichier? Apr 06, 2025 pm 06:33 PM

Le bégaiement "Chargement" se produit lors de l'ouverture d'un fichier sur PS. Les raisons peuvent inclure: un fichier trop grand ou corrompu, une mémoire insuffisante, une vitesse du disque dur lente, des problèmes de pilote de carte graphique, des conflits de version PS ou du plug-in. Les solutions sont: vérifier la taille et l'intégrité du fichier, augmenter la mémoire, mettre à niveau le disque dur, mettre à jour le pilote de carte graphique, désinstaller ou désactiver les plug-ins suspects et réinstaller PS. Ce problème peut être résolu efficacement en vérifiant progressivement et en faisant bon usage des paramètres de performances PS et en développant de bonnes habitudes de gestion des fichiers.

Comment résoudre le problème du chargement lorsque PS montre toujours qu'il se charge? Comment résoudre le problème du chargement lorsque PS montre toujours qu'il se charge? Apr 06, 2025 pm 06:30 PM

La carte PS est "Chargement"? Les solutions comprennent: la vérification de la configuration de l'ordinateur (mémoire, disque dur, processeur), nettoyage de la fragmentation du disque dur, mise à jour du pilote de carte graphique, ajustement des paramètres PS, réinstaller PS et développer de bonnes habitudes de programmation.

Comment les plumes PS contrôlent-elles la douceur de la transition? Comment les plumes PS contrôlent-elles la douceur de la transition? Apr 06, 2025 pm 07:33 PM

La clé du contrôle des plumes est de comprendre sa nature progressive. Le PS lui-même ne fournit pas la possibilité de contrôler directement la courbe de gradient, mais vous pouvez ajuster de manière flexible le rayon et la douceur du gradient par plusieurs plumes, des masques correspondants et des sélections fines pour obtenir un effet de transition naturel.

Comment utiliser MySQL après l'installation Comment utiliser MySQL après l'installation Apr 08, 2025 am 11:48 AM

L'article présente le fonctionnement de la base de données MySQL. Tout d'abord, vous devez installer un client MySQL, tel que MySQLWorkBench ou le client de ligne de commande. 1. Utilisez la commande MySQL-UROot-P pour vous connecter au serveur et connecter avec le mot de passe du compte racine; 2. Utilisez Createdatabase pour créer une base de données et utilisez Sélectionner une base de données; 3. Utilisez CreateTable pour créer une table, définissez des champs et des types de données; 4. Utilisez InsertInto pour insérer des données, remettre en question les données, mettre à jour les données par mise à jour et supprimer les données par Supprimer. Ce n'est qu'en maîtrisant ces étapes, en apprenant à faire face à des problèmes courants et à l'optimisation des performances de la base de données que vous pouvez utiliser efficacement MySQL.

Comment optimiser les performances de la base de données après l'installation de MySQL Comment optimiser les performances de la base de données après l'installation de MySQL Apr 08, 2025 am 11:36 AM

L'optimisation des performances MySQL doit commencer à partir de trois aspects: configuration d'installation, indexation et optimisation des requêtes, surveillance et réglage. 1. Après l'installation, vous devez ajuster le fichier my.cnf en fonction de la configuration du serveur, tel que le paramètre innodb_buffer_pool_size, et fermer query_cache_size; 2. Créez un index approprié pour éviter les index excessifs et optimiser les instructions de requête, telles que l'utilisation de la commande Explication pour analyser le plan d'exécution; 3. Utilisez le propre outil de surveillance de MySQL (ShowProcessList, Showstatus) pour surveiller la santé de la base de données, et sauvegarde régulièrement et organisez la base de données. Ce n'est qu'en optimisant en continu ces étapes que les performances de la base de données MySQL peuvent être améliorées.

See all articles