ホームページ > バックエンド開発 > Python チュートリアル > マスター Python コルーチン: 強力な同時実行アプリ用のカスタム非同期ツールを作成する

マスター Python コルーチン: 強力な同時実行アプリ用のカスタム非同期ツールを作成する

DDD
リリース: 2024-11-29 12:18:14
オリジナル
1059 人が閲覧しました

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Python のコルーチンは、非同期コードを作成するための強力なツールです。これらは同時操作の処理方法に革命をもたらし、スケーラブルで効率的なアプリケーションの構築を容易にしました。私はコルーチンの使用に多くの時間を費やしてきました。カスタムの非同期プリミティブの作成に関する洞察を共有できることを嬉しく思います。

基本から始めましょう。コルーチンは、一時停止および再開が可能な特別な関数であり、協調的なマルチタスクを可能にします。これらは Python の async/await 構文の基礎です。コルーチンを定義すると、基本的に、制御をイベント ループに戻し、他のタスクを実行できるようにする関数を作成することになります。

カスタムの待機可能オブジェクトを作成するには、await メソッドを実装する必要があります。このメソッドはイテレータを返す必要があります。簡単な例を次に示します:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
ログイン後にコピー
ログイン後にコピー

この CustomAwaitable クラスは、組み込みの awaitable と同様に、await キーワードとともに使用できます。待機中は、制御を一度譲り、その後その値を返します。

しかし、より複雑な非同期プリミティブを作成したい場合はどうすればよいでしょうか?カスタム セマフォの実装を見てみましょう。セマフォは、複数のコルーチンによる共有リソースへのアクセスを制御するために使用されます:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
ログイン後にコピー
ログイン後にコピー

この CustomSemaphore クラスは、取得メソッドと解放メソッド、および非同期コンテキスト マネージャー プロトコル (aenter および aexit) を実装します。最大 2 つのコルーチンが同時にセマフォを取得できるようになります。

ここで、効率的なイベント ループの作成について話しましょう。 Python の asyncio は堅牢なイベント ループ実装を提供しますが、カスタムの実装が必要になる場合もあります。カスタム イベント ループの基本的な例を次に示します:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)
ログイン後にコピー

このカスタム イベント ループは、タスクの実行、コルーチンの処理、さらには単純なスリープ関数などの基本機能を実装します。 Python の組み込みイベント ループほど機能は豊富ではありませんが、中心となる概念を示しています。

非同期コードを記述する際の課題の 1 つは、タスクの優先順位を管理することです。 Python の asyncio はタスク用の組み込みの優先キューを提供しませんが、独自の優先キューを実装できます。

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())
ログイン後にコピー

この PriorityEventLoop はヒープ キューを使用して、スケジュールされた実行時間に基づいてタスクを管理します。さまざまな遅延を伴うタスクをスケジュールすることで、優先順位を割り当てることができます。

キャンセルを適切に処理することは、コルーチンを操作する際のもう 1 つの重要な側面です。以下は、キャンセル可能なタスクを実装する方法の例です:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())
ログイン後にコピー

この例では、cancellable_operation が CancelledError をキャッチし、必要なクリーンアップを実行してから、例外を再発生させます。これにより、キャンセル ステータスを伝達しながら、キャンセルを適切に処理できます。

カスタムの非同期イテレータの実装を見てみましょう。これらは、非同期で反復できるシーケンスを作成するのに役立ちます:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
ログイン後にコピー
ログイン後にコピー

この AsyncRange クラスは、非同期反復子プロトコルを実装し、非同期 for ループで使用できるようにします。

最後に、カスタムの非同期コンテキスト マネージャーの実装を見てみましょう。これらは、非同期で取得および解放する必要があるリソースを管理するのに役立ちます:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
ログイン後にコピー
ログイン後にコピー

この AsyncResource クラスは、aenter メソッドと aexit メソッドを実装し、async with ステートメントで使用できるようにします。

結論として、Python のコルーチン システムは、カスタムの非同期プリミティブを構築するための強力な基盤を提供します。基礎となるメカニズムとプロトコルを理解することで、特定の非同期の課題に合わせたソリューションを作成し、複雑な同時シナリオでパフォーマンスを最適化し、Python の非同期機能を拡張できます。これらのカスタム実装は学習や特定のユースケースに最適ですが、Python の組み込み asyncio ライブラリは高度に最適化されており、ほとんどのシナリオで頼りになるはずであることに注意してください。コーディングを楽しんでください!


私たちの作品

私たちの作品をぜひチェックしてください:

インベスターセントラル | スマートな暮らし | エポックとエコー | 不可解な謎 | ヒンドゥーヴァ | エリート開発者 | JS スクール


私たちは中程度です

Tech Koala Insights | エポックズ&エコーズワールド | インベスター・セントラル・メディア | 不可解な謎 中 | 科学とエポックミディアム | 現代ヒンドゥーヴァ

以上がマスター Python コルーチン: 強力な同時実行アプリ用のカスタム非同期ツールを作成するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート