Python のコルーチンは、非同期コードを作成するための強力なツールです。これらは同時操作の処理方法に革命をもたらし、スケーラブルで効率的なアプリケーションの構築を容易にしました。私はコルーチンの使用に多くの時間を費やしてきました。カスタムの非同期プリミティブの作成に関する洞察を共有できることを嬉しく思います。
基本から始めましょう。コルーチンは、一時停止および再開が可能な特別な関数であり、協調的なマルチタスクを可能にします。これらは Python の async/await 構文の基礎です。コルーチンを定義すると、基本的に、制御をイベント ループに戻し、他のタスクを実行できるようにする関数を作成することになります。
カスタムの待機可能オブジェクトを作成するには、await メソッドを実装する必要があります。このメソッドはイテレータを返す必要があります。簡単な例を次に示します:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
この CustomAwaitable クラスは、組み込みの awaitable と同様に、await キーワードとともに使用できます。待機中は、制御を一度譲り、その後その値を返します。
しかし、より複雑な非同期プリミティブを作成したい場合はどうすればよいでしょうか?カスタム セマフォの実装を見てみましょう。セマフォは、複数のコルーチンによる共有リソースへのアクセスを制御するために使用されます:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
この CustomSemaphore クラスは、取得メソッドと解放メソッド、および非同期コンテキスト マネージャー プロトコル (aenter および aexit) を実装します。最大 2 つのコルーチンが同時にセマフォを取得できるようになります。
ここで、効率的なイベント ループの作成について話しましょう。 Python の asyncio は堅牢なイベント ループ実装を提供しますが、カスタムの実装が必要になる場合もあります。カスタム イベント ループの基本的な例を次に示します:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)
このカスタム イベント ループは、タスクの実行、コルーチンの処理、さらには単純なスリープ関数などの基本機能を実装します。 Python の組み込みイベント ループほど機能は豊富ではありませんが、中心となる概念を示しています。
非同期コードを記述する際の課題の 1 つは、タスクの優先順位を管理することです。 Python の asyncio はタスク用の組み込みの優先キューを提供しませんが、独自の優先キューを実装できます。
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock + delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())
この PriorityEventLoop はヒープ キューを使用して、スケジュールされた実行時間に基づいてタスクを管理します。さまざまな遅延を伴うタスクをスケジュールすることで、優先順位を割り当てることができます。
キャンセルを適切に処理することは、コルーチンを操作する際のもう 1 つの重要な側面です。以下は、キャンセル可能なタスクを実装する方法の例です:
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
この例では、cancellable_operation が CancelledError をキャッチし、必要なクリーンアップを実行してから、例外を再発生させます。これにより、キャンセル ステータスを伝達しながら、キャンセルを適切に処理できます。
カスタムの非同期イテレータの実装を見てみましょう。これらは、非同期で反復できるシーケンスを作成するのに役立ちます:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
この AsyncRange クラスは、非同期反復子プロトコルを実装し、非同期 for ループで使用できるようにします。
最後に、カスタムの非同期コンテキスト マネージャーの実装を見てみましょう。これらは、非同期で取得および解放する必要があるリソースを管理するのに役立ちます:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
この AsyncResource クラスは、aenter メソッドと aexit メソッドを実装し、async with ステートメントで使用できるようにします。
結論として、Python のコルーチン システムは、カスタムの非同期プリミティブを構築するための強力な基盤を提供します。基礎となるメカニズムとプロトコルを理解することで、特定の非同期の課題に合わせたソリューションを作成し、複雑な同時シナリオでパフォーマンスを最適化し、Python の非同期機能を拡張できます。これらのカスタム実装は学習や特定のユースケースに最適ですが、Python の組み込み asyncio ライブラリは高度に最適化されており、ほとんどのシナリオで頼りになるはずであることに注意してください。コーディングを楽しんでください!
私たちの作品をぜひチェックしてください:
インベスターセントラル | スマートな暮らし | エポックとエコー | 不可解な謎 | ヒンドゥーヴァ | エリート開発者 | JS スクール
Tech Koala Insights | エポックズ&エコーズワールド | インベスター・セントラル・メディア | 不可解な謎 中 | 科学とエポックミディアム | 現代ヒンドゥーヴァ
以上がマスター Python コルーチン: 強力な同時実行アプリ用のカスタム非同期ツールを作成するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。