Pour compter un processus d'exécution, vous devez connaître la position de début et la position de fin du processus d'exécution, donc la méthode la plus simple et la plus grossière consiste à encapsuler en fonction de la méthode à utiliser. appelé, Implémentez une couche intermédiaire entre le framework appelant la bibliothèque MySQL et la bibliothèque MySQL, et complétez des statistiques chronophages dans la couche intermédiaire, telles que :
# 伪代码 def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with MyTracer(conn, sql, param): # 以下为正常使用MySql库的代码 with conn.cursor as cursor: cursor.execute(sql, param) ...
Cela semble être très bon à implémenter, et c'est très pratique à changer, mais comme il s'agit d'une API de niveau supérieur, les modifications sont en fait très rigides. En même temps, certaines pré-opérations seront effectuées dans curseur.execute, comme l'épissage de sql et param, en appelant nextset pour effacer le. données actuelles du curseur, etc. Les données que nous avons finalement obtenues, telles que le temps et la consommation, étaient également inexactes, et il n'y avait aucun moyen d'obtenir des métadonnées détaillées, telles que des codes d'erreur, etc.
Si nous voulons obtenir les données les plus directes et les plus utiles, nous pouvons changez uniquement le code source, puis appelez le code source, mais si chaque bibliothèque doit modifier le code source pour compter, ce serait trop gênant. Heureusement, Python fournit également des interfaces similaires aux sondes et le code source de la bibliothèque. peut être mesuré via des sondes. Remplacez et complétez notre code.
En Python, la fonction de hook d'importation peut être implémentée via sys.meta_path Lorsque les opérations liées à l'importation sont effectuées, les bibliothèques liées à l'importation le seront. être mis en correspondance en fonction des objets définis par sys.meta_path. Pour modifier les objets dans .sys.meta_path, vous devez implémenter une méthode find_module renvoie None ou un objet qui implémente la méthode load_module. remplacez les méthodes pertinentes lors de l'importation de certaines bibliothèques. L'utilisation est la suivante. Utilisez hooktime.sleep pour imprimer le temps consommé pendant le sommeil.
import importlib import sys from functools import wraps def func_wrapper(func): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func) def wrapper(*args, **kwargs): # 记录开始时间 start = time.time() result = func(*args, **kwargs) # 统计消耗时间 end = time.time() print(f"speed time:{end - start}") return result return wrapper class MetaPathFinder: def find_module(self, fullname, path=None): # 执行时可以看出来在import哪些模块 print(f'find module:{path}:{fullname}') return MetaPathLoader() class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module sys.meta_path.insert(0, MetaPathFinder()) if __name__ == '__main__': import time time.sleep(1) # 输出示例: # find module:datetime # find module:time # load module:time # find module:math # find module:_datetime # speed time:1.00073385238647468
D'abord le code source de curseur.execute (cursor.executemanay également similaire), on constate que la méthode de self.nextset sera d'abord appelée pour récupérer les données de la requête précédente, puis fusionner l'instruction sql. , et enfin interrogez via self._query:
async def execute(self, query, args=None): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
Regardez le code source de curseur.fetchone (cursor.fetchall Également similaire), on constate que les données sont en fait obtenues à partir du cache
Ces données ont été obtenues. lors de l'exécution de curseur.execute :def fetchone(self):
"""Fetch the next row """
self._check_executed()
fut = self._loop.create_future()
if self._rows is None or self._rownumber >= len(self._rows):
fut.set_result(None)
return fut
result = self._rows[self._rownumber]
self._rownumber += 1
fut = self._loop.create_future()
fut.set_result(result)
return fut
import importlib import time import sys from functools import wraps from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING from types import ModuleType if TYPE_CHECKING: import aiomysql def func_wrapper(func: Callable): @wraps(func) async def wrapper(*args, **kwargs) -> Any: start: float = time.time() func_result: Any = await func(*args, **kwargs) end: float = time.time() # 根据_query可以知道, 第一格参数是self, 第二个参数是sql self: aiomysql.Cursor = args[0] sql: str = args[1] # 通过self,我们可以拿到其他的数据 db: str = self._connection.db user: str = self._connection.user host: str = self._connection.host port: str = self._connection.port execute_result: Tuple[Tuple] = self._rows # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, # 这里只是打印一部分数据出来 print({ "sql": sql, "db": db, "user": user, "host": host, "port": port, "result": execute_result, "speed time": end - start }) return func_result return cast(Callable, wrapper) class MetaPathFinder: @staticmethod def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]: if fullname == 'aiomysql': # 只有aiomysql才进行hook return MetaPathLoader() else: return None class MetaPathLoader: @staticmethod def load_module(fullname: str): if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder: "MetaPathFinder" = sys.meta_path.pop(0) # 导入 module module: ModuleType = importlib.import_module(fullname) # 针对_query进行hook module.Cursor._query = func_wrapper(module.Cursor._query) sys.meta_path.insert(0, finder) return module async def test_mysql() -> None: import aiomysql pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123123', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() if __name__ == '__main__': sys.meta_path.insert(0, MetaPathFinder()) import asyncio asyncio.run(test_mysql()) # 输出示例: # 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间 # {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
. ├── __init__.py ├── hook_aiomysql.py ├── sitecustomize.py └── test_auto_hook.py
import sys from hook_aiomysql import MetaPathFinder sys.meta_path.insert(0, MetaPathFinder())
. test_auto_hook.py Voici le code de test :
import asyncio from hook_aiomysql import test_mysql asyncio.run(test_mysql())
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module
import time from functools import wraps from typing import Any, Callable, Tuple, cast import aiomysql def func_wrapper(func: Callable): """和上面一样的封装函数, 这里简单略过""" # 判断是否hook过 _IS_HOOK: bool = False # 存放原来的_query _query: Callable = aiomysql.Cursor._query # hook函数 def install_hook() -> None: _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True # 还原到原来的函数方法 def reset_hook() -> None: aiomysql.Cursor._query = _query _IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
import asyncio import aiomysql from demo import install_hook, reset_hook async def test_mysql() -> None: pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook") install_hook() asyncio.run(test_mysql()) print("reset hook") reset_hook() asyncio.run(test_mysql()) print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875} reset hook end
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!