Python プローブは呼び出し側ライブラリからのデータ抽出をどのように完了するのでしょうか?

王林
リリース: 2023-05-16 08:46:05
転載
1036 人が閲覧しました

1. シンプルで大雑把な方法 - mysql ライブラリをカプセル化する

実行プロセスをカウントするには、実行プロセスの開始位置と終了位置を知る必要があるため、最も単純で大雑把な方法は、要件 呼び出しメソッドはカプセル化され、MySQL ライブラリとフレームワークによって呼び出される MySQL ライブラリの間に中間層が実装されます。時間のかかる統計は中間層で完了します。例:

# 伪代码
def my_execute(conn, sql, param):
 # 针对MySql库的统计封装组件
 with MyTracer(conn, sql, param):
     # 以下为正常使用MySql库的代码
with conn.cursor as cursor:
 cursor.execute(sql, param)
...
ログイン後にコピー

実装すると非常に良さそうです。そして、この変更は非常に便利ですが、トップレベルの API 上で変更されるため、実際には非常に柔軟性がありません。同時に、いくつかの事前操作が で実行されます。 cursor.execute (SQL と param の結合、nextset の呼び出しによる現在のカーソルのクリア、データなど)。最終的に取得した時間や消費量などのデータは不正確で、エラー コードなどの詳細なメタデータを取得する方法はありませんでした。

最も直接的で有用なデータを取得したい場合, ソースコードを変更してからそのソースコードを呼び出すこともできますが、統計を作成するために各ライブラリがソースコードを変更する必要がある場合、それは非常に面倒です. 幸いなことに、Python にはプローブに似たインターフェイスもいくつか用意されており、それを使用できます統計を計算します。ライブラリのソース コードを置き換えてコードを完成させます。

2.Python プローブ

Python では、インポート フック関数は sys.meta_path を通じて実装できます。関連の操作, インポート関連のライブラリは、sys.meta_path で定義されたオブジェクトに基づいて変更されます。sys.meta_path のオブジェクトは、find_module メソッドを実装する必要があります。この find_module メソッドは None を返すか、load_module メソッドを実装するオブジェクトを返します。このオブジェクトを使用して、いくつかのライブラリをターゲットにすることができます。インポートするときに、関連するメソッドを置き換えます。簡単な使用法は次のとおりです。hooktime.sleep を使用して、スリープ中に消費された時間を出力します。

import importlib
import sys
from functools import wraps
def func_wrapper(func):
    """这里通过一个装饰器来达到狸猫换太子和获取数据的效果"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 记录开始时间
        start = time.time()
        result = func(*args, **kwargs)
        # 统计消耗时间
        end = time.time()
        print(f"speed time:{end - start}")
        return result
    return wrapper
class MetaPathFinder:
    def find_module(self, fullname, path=None):
        # 执行时可以看出来在import哪些模块
        print(f'find module:{path}:{fullname}')
        return MetaPathLoader()
class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == '__main__':
    import time
    time.sleep(1)
# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468
ログイン後にコピー

3. プローブ モジュールを作成します

主なプロセスを理解した後、独自のプローブ モジュールの作成を開始できます。この例には aiomysql モジュールのみが含まれるため、MetaPathFinder.find_module で aiomysql モジュールのみを処理する必要があり、他のモジュールは最初に無視されます。ビジネスの観点から見ると、一般に、cursor.execute、cursor.fetchone、cursor.fetchall、cursor.executemany の主要な操作のみが必要なので、カーソルの詳細に進み、コードの変更方法を確認します。後者によってオーバーロードされている関数はどれですか?

まず、cursor.execute (cursor.executemanay と似ています) のソース コードを確認して、 self.nextset メソッドは、最初に前のリクエストからデータを取得するために呼び出され、次に SQL ステートメントをマージし、最後に self._query:

async def execute(self, query, args=None):
    """Executes the given operation
    Executes the given operation substituting any markers with
    the given parameters.
    For example, getting all rows where id is 5:
        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    :param query: ``str`` sql statement
    :param args: ``tuple`` or ``list`` of arguments for sql query
    :returns: ``int``, number of rows that has been produced of affected
    """
    conn = self._get_db()

    while (await self.nextset()):
        pass

    if args is not None:
        query = query % self._escape_args(args, conn)

    await self._query(query)
    self._executed = query
    if self._echo:
        logger.info(query)
        logger.info("%r", args)
    return self._rowcount
ログイン後にコピー

を通じてクエリを実行します。cursor.fetchone のソース コードを確認します (cursor .fetchall も同様です)、実際にはデータがキャッシュから取得されていることがわかりました。

これらのデータは、cursor.execute の実行中に取得されています:

def fetchone(self):
    """Fetch the next row """
    self._check_executed()
    fut = self._loop.create_future()
    if self._rows is None or self._rownumber >= len(self._rows):
        fut.set_result(None)
        return fut
    result = self._rows[self._rownumber]
    self._rownumber += 1
    fut = self._loop.create_future()
    fut.set_result(result)
    return fut
ログイン後にコピー

上記の分析に基づいて、必要なデータを取得するには、コア メソッド self._query をオーバーロードするだけで済みます。ソース コードから、self._query に渡される self パラメーターと SQL パラメーターを取得できることがわかります。 self に基づいてクエリの結果を取得します。同時に、デコレータを通じて実行時間を取得でき、必要なデータはすべて基本的に利用可能です。わかりました。

に従って変更されたコードアイデアは次のとおりです:

import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
    import aiomysql
def func_wrapper(func: Callable):
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start: float = time.time()
        func_result: Any = await func(*args, **kwargs)
        end: float = time.time()
        # 根据_query可以知道, 第一格参数是self, 第二个参数是sql
        self: aiomysql.Cursor = args[0]
        sql: str = args[1]
        # 通过self,我们可以拿到其他的数据
        db: str = self._connection.db
        user: str = self._connection.user
        host: str = self._connection.host
        port: str = self._connection.port
        execute_result: Tuple[Tuple] = self._rows
        # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, 
        # 这里只是打印一部分数据出来
        print({
            "sql": sql,
            "db": db,
            "user": user,
            "host": host,
            "port": port,
            "result": execute_result,
            "speed time": end - start
        })
        return func_result
    return cast(Callable, wrapper)
class MetaPathFinder:

    @staticmethod
    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
        if fullname == 'aiomysql':
            # 只有aiomysql才进行hook
            return MetaPathLoader()
        else:
            return None
class MetaPathLoader:
    @staticmethod
    def load_module(fullname: str):
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder: "MetaPathFinder" = sys.meta_path.pop(0)
        # 导入 module
        module: ModuleType = importlib.import_module(fullname)
        # 针对_query进行hook
        module.Cursor._query = func_wrapper(module.Cursor._query)
        sys.meta_path.insert(0, finder)
        return module
async def test_mysql() -> None:
    import aiomysql
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

if __name__ == '__main__':
    sys.meta_path.insert(0, MetaPathFinder())
    import asyncio
    asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
ログイン後にコピー

この例は非常に良さそうですが、呼び出しの入り口でロジックを明示的に呼び出す必要があります。通常、プロジェクトには複数のエントリがある場合があります。各エントリは呼び出しを示しています。このロジックは非常に面倒で、インポートする前に最初にフック ロジックを呼び出す必要があります。このように導入仕様を設定しておかないと、フックが一部で成功しない可能性があります。パーサー起動直後に実行するように設定すれば、この問題は完全に解決できます 情報を確認したところ、Python インタプリタの初期化時に、PYTHONPATH 配下に存在する sitecustomize モジュールと usercustomize モジュールが自動的にインポートされることがわかりました。モジュールを作成し、モジュールに置換関数を記述するだけです。

.
├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py
ログイン後にコピー

hook_aiomysql.py は例としてのプローブ コードであり、sitecustomize.py に格納されているコードは次のとおりです。非常に簡単で、プローブ コードを導入して sys.meta_path に挿入するだけです:

import sys
from hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())
ログイン後にコピー

test_auto_hook.py はテスト コードです:

import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())
ログイン後にコピー

次に、PYTHONPATH を設定してコードを実行します (プロジェクトの場合、通常はスーパーバイザーによって開始されます) , その後、構成ファイルに PYTHONPATH を設定できます):

(.venv) ➜  python_hook git:(master) ✗ export PYTHONPATH=.      
(.venv) ➜  python_hook git:(master) ✗ python test_auto_hook.py 
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
ログイン後にコピー

4. メソッド

を直接置き換えます。上記のメソッドは非常にうまく動作し、簡単に埋め込むことができることがわかります。 project. ですが、sitecustomize.py ファイルに依存しており、サードパーティのライブラリに抽出することが難しいため、サードパーティのライブラリに抽出したい場合は、他の方法がないか検討する必要があります。上で MetaPathLoader を紹介するときに、sys.module について説明しました。sys.modules は繰り返しの導入を減らすために使用されます。

class MetaPathLoader:
    def load_module(self, fullname):
        # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止递归调用
        finder = sys.meta_path.pop(0)
        # 导入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替换函数
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
ログイン後にコピー

繰り返しの導入を減らす原理は、モジュールが導入されるたびに、モジュールが次の場所に保存されることです。 sys .modules を繰り返し導入すると、最新のインポートされたモジュールに直接更新されます。上記の繰り返しインポートを減らすことを検討する理由は、プログラムの実行中にサードパーティ ライブラリの依存関係をアップグレードしないためです。同じ名前で異なる実装を持つモジュールを繰り返し導入することを考慮する必要がなく、sys.modules がインポートされたモジュールをキャッシュするという事実を利用して、上記のロジックを単純化してモジュールをインポートできます -> 現在のモジュール メソッドを次のように置き換えます。変更したフックメソッド。

import time
from functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
    """和上面一样的封装函数, 这里简单略过"""
# 判断是否hook过
_IS_HOOK: bool = False
# 存放原来的_query
_query: Callable = aiomysql.Cursor._query
# hook函数
def install_hook() -> None:
    _IS_HOOK = False
    if _IS_HOOK:
        return
    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
    _IS_HOOK = True
# 还原到原来的函数方法
def reset_hook() -> None:
    aiomysql.Cursor._query = _query
    _IS_HOOK = False
ログイン後にコピー

代码简单明了,接下来跑一跑刚才的测试:

import asyncio
import aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")
ログイン後にコピー

通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息

install hook
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}
reset hook
end
ログイン後にコピー

以上がPython プローブは呼び出し側ライブラリからのデータ抽出をどのように完了するのでしょうか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート