実行プロセスをカウントするには、実行プロセスの開始位置と終了位置を知る必要があるため、最も単純で大雑把な方法は、要件 呼び出しメソッドはカプセル化され、MySQL ライブラリとフレームワークによって呼び出される MySQL ライブラリの間に中間層が実装されます。時間のかかる統計は中間層で完了します。例:
# 伪代码 def my_execute(conn, sql, param): # 针对MySql库的统计封装组件 with MyTracer(conn, sql, param): # 以下为正常使用MySql库的代码 with conn.cursor as cursor: cursor.execute(sql, param) ...
実装すると非常に良さそうです。そして、この変更は非常に便利ですが、トップレベルの API 上で変更されるため、実際には非常に柔軟性がありません。同時に、いくつかの事前操作が で実行されます。 cursor.execute (SQL と param の結合、nextset の呼び出しによる現在のカーソルのクリア、データなど)。最終的に取得した時間や消費量などのデータは不正確で、エラー コードなどの詳細なメタデータを取得する方法はありませんでした。
最も直接的で有用なデータを取得したい場合, ソースコードを変更してからそのソースコードを呼び出すこともできますが、統計を作成するために各ライブラリがソースコードを変更する必要がある場合、それは非常に面倒です. 幸いなことに、Python にはプローブに似たインターフェイスもいくつか用意されており、それを使用できます統計を計算します。ライブラリのソース コードを置き換えてコードを完成させます。
Python では、インポート フック関数は sys.meta_path を通じて実装できます。関連の操作, インポート関連のライブラリは、sys.meta_path で定義されたオブジェクトに基づいて変更されます。sys.meta_path のオブジェクトは、find_module メソッドを実装する必要があります。この find_module メソッドは None を返すか、load_module メソッドを実装するオブジェクトを返します。このオブジェクトを使用して、いくつかのライブラリをターゲットにすることができます。インポートするときに、関連するメソッドを置き換えます。簡単な使用法は次のとおりです。hooktime.sleep を使用して、スリープ中に消費された時間を出力します。
import importlib import sys from functools import wraps def func_wrapper(func): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func) def wrapper(*args, **kwargs): # 记录开始时间 start = time.time() result = func(*args, **kwargs) # 统计消耗时间 end = time.time() print(f"speed time:{end - start}") return result return wrapper class MetaPathFinder: def find_module(self, fullname, path=None): # 执行时可以看出来在import哪些模块 print(f'find module:{path}:{fullname}') return MetaPathLoader() class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module sys.meta_path.insert(0, MetaPathFinder()) if __name__ == '__main__': import time time.sleep(1) # 输出示例: # find module:datetime # find module:time # load module:time # find module:math # find module:_datetime # speed time:1.00073385238647468
主なプロセスを理解した後、独自のプローブ モジュールの作成を開始できます。この例には aiomysql モジュールのみが含まれるため、MetaPathFinder.find_module で aiomysql モジュールのみを処理する必要があり、他のモジュールは最初に無視されます。ビジネスの観点から見ると、一般に、cursor.execute、cursor.fetchone、cursor.fetchall、cursor.executemany の主要な操作のみが必要なので、カーソルの詳細に進み、コードの変更方法を確認します。後者によってオーバーロードされている関数はどれですか?
まず、cursor.execute (cursor.executemanay と似ています) のソース コードを確認して、 self.nextset メソッドは、最初に前のリクエストからデータを取得するために呼び出され、次に SQL ステートメントをマージし、最後に self._query:
async def execute(self, query, args=None): """Executes the given operation Executes the given operation substituting any markers with the given parameters. For example, getting all rows where id is 5: cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not None: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
を通じてクエリを実行します。cursor.fetchone のソース コードを確認します (cursor .fetchall も同様です)、実際にはデータがキャッシュから取得されていることがわかりました。
これらのデータは、cursor.execute の実行中に取得されています:
def fetchone(self): """Fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is None or self._rownumber >= len(self._rows): fut.set_result(None) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
上記の分析に基づいて、必要なデータを取得するには、コア メソッド self._query をオーバーロードするだけで済みます。ソース コードから、self._query に渡される self パラメーターと SQL パラメーターを取得できることがわかります。 self に基づいてクエリの結果を取得します。同時に、デコレータを通じて実行時間を取得でき、必要なデータはすべて基本的に利用可能です。わかりました。
に従って変更されたコードアイデアは次のとおりです:
import importlib import time import sys from functools import wraps from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING from types import ModuleType if TYPE_CHECKING: import aiomysql def func_wrapper(func: Callable): @wraps(func) async def wrapper(*args, **kwargs) -> Any: start: float = time.time() func_result: Any = await func(*args, **kwargs) end: float = time.time() # 根据_query可以知道, 第一格参数是self, 第二个参数是sql self: aiomysql.Cursor = args[0] sql: str = args[1] # 通过self,我们可以拿到其他的数据 db: str = self._connection.db user: str = self._connection.user host: str = self._connection.host port: str = self._connection.port execute_result: Tuple[Tuple] = self._rows # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, # 这里只是打印一部分数据出来 print({ "sql": sql, "db": db, "user": user, "host": host, "port": port, "result": execute_result, "speed time": end - start }) return func_result return cast(Callable, wrapper) class MetaPathFinder: @staticmethod def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]: if fullname == 'aiomysql': # 只有aiomysql才进行hook return MetaPathLoader() else: return None class MetaPathLoader: @staticmethod def load_module(fullname: str): if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder: "MetaPathFinder" = sys.meta_path.pop(0) # 导入 module module: ModuleType = importlib.import_module(fullname) # 针对_query进行hook module.Cursor._query = func_wrapper(module.Cursor._query) sys.meta_path.insert(0, finder) return module async def test_mysql() -> None: import aiomysql pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123123', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() if __name__ == '__main__': sys.meta_path.insert(0, MetaPathFinder()) import asyncio asyncio.run(test_mysql()) # 输出示例: # 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间 # {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
この例は非常に良さそうですが、呼び出しの入り口でロジックを明示的に呼び出す必要があります。通常、プロジェクトには複数のエントリがある場合があります。各エントリは呼び出しを示しています。このロジックは非常に面倒で、インポートする前に最初にフック ロジックを呼び出す必要があります。このように導入仕様を設定しておかないと、フックが一部で成功しない可能性があります。パーサー起動直後に実行するように設定すれば、この問題は完全に解決できます 情報を確認したところ、Python インタプリタの初期化時に、PYTHONPATH 配下に存在する sitecustomize モジュールと usercustomize モジュールが自動的にインポートされることがわかりました。モジュールを作成し、モジュールに置換関数を記述するだけです。
. ├── __init__.py ├── hook_aiomysql.py ├── sitecustomize.py └── test_auto_hook.py
hook_aiomysql.py は例としてのプローブ コードであり、sitecustomize.py に格納されているコードは次のとおりです。非常に簡単で、プローブ コードを導入して sys.meta_path に挿入するだけです:
import sys from hook_aiomysql import MetaPathFinder sys.meta_path.insert(0, MetaPathFinder())
test_auto_hook.py はテスト コードです:
import asyncio from hook_aiomysql import test_mysql asyncio.run(test_mysql())
次に、PYTHONPATH を設定してコードを実行します (プロジェクトの場合、通常はスーパーバイザーによって開始されます) , その後、構成ファイルに PYTHONPATH を設定できます):
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
を直接置き換えます。上記のメソッドは非常にうまく動作し、簡単に埋め込むことができることがわかります。 project. ですが、sitecustomize.py ファイルに依存しており、サードパーティのライブラリに抽出することが難しいため、サードパーティのライブラリに抽出したい場合は、他の方法がないか検討する必要があります。上で MetaPathLoader を紹介するときに、sys.module について説明しました。sys.modules は繰り返しの導入を減らすために使用されます。
class MetaPathLoader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module
繰り返しの導入を減らす原理は、モジュールが導入されるたびに、モジュールが次の場所に保存されることです。 sys .modules を繰り返し導入すると、最新のインポートされたモジュールに直接更新されます。上記の繰り返しインポートを減らすことを検討する理由は、プログラムの実行中にサードパーティ ライブラリの依存関係をアップグレードしないためです。同じ名前で異なる実装を持つモジュールを繰り返し導入することを考慮する必要がなく、sys.modules がインポートされたモジュールをキャッシュするという事実を利用して、上記のロジックを単純化してモジュールをインポートできます -> 現在のモジュール メソッドを次のように置き換えます。変更したフックメソッド。
import time from functools import wraps from typing import Any, Callable, Tuple, cast import aiomysql def func_wrapper(func: Callable): """和上面一样的封装函数, 这里简单略过""" # 判断是否hook过 _IS_HOOK: bool = False # 存放原来的_query _query: Callable = aiomysql.Cursor._query # hook函数 def install_hook() -> None: _IS_HOOK = False if _IS_HOOK: return aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query) _IS_HOOK = True # 还原到原来的函数方法 def reset_hook() -> None: aiomysql.Cursor._query = _query _IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
import asyncio import aiomysql from demo import install_hook, reset_hook async def test_mysql() -> None: pool: aiomysql.Pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed() print("install hook") install_hook() asyncio.run(test_mysql()) print("reset hook") reset_hook() asyncio.run(test_mysql()) print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875} reset hook end
以上がPython プローブは呼び出し側ライブラリからのデータ抽出をどのように完了するのでしょうか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。