Coroutines in Python are a powerful tool for writing asynchronous code. They've revolutionized how we handle concurrent operations, making it easier to build scalable and efficient applications. I've spent a lot of time working with coroutines, and I'm excited to share some insights on creating custom asynchronous primitives.
Let's start with the basics. Coroutines are special functions that can be paused and resumed, allowing for cooperative multitasking. They're the foundation of Python's async/await syntax. When you define a coroutine, you're essentially creating a function that can yield control back to the event loop, allowing other tasks to run.
To create a custom awaitable object, you need to implement the await method. This method should return an iterator. Here's a simple example:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
This CustomAwaitable class can be used with the await keyword, just like built-in awaitables. When awaited, it yields control once, then returns its value.
But what if we want to create more complex asynchronous primitives? Let's look at implementing a custom semaphore. Semaphores are used to control access to a shared resource by multiple coroutines:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
This CustomSemaphore class implements the acquire and release methods, as well as the async context manager protocol (aenter and aexit). It allows a maximum of two coroutines to acquire the semaphore simultaneously.
Now, let's talk about creating efficient event loops. While Python's asyncio provides a robust event loop implementation, there might be cases where you need a custom one. Here's a basic example of a custom event loop:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)
This custom event loop implements basic functionality like running tasks, handling coroutines, and even a simple sleep function. It's not as feature-rich as Python's built-in event loop, but it demonstrates the core concepts.
One of the challenges in writing asynchronous code is managing task priorities. While Python's asyncio doesn't provide built-in priority queues for tasks, we can implement our own:
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock + delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())
This PriorityEventLoop uses a heap queue to manage tasks based on their scheduled execution time. You can assign priorities by scheduling tasks with different delays.
Handling cancellation gracefully is another important aspect of working with coroutines. Here's an example of how to implement cancellable tasks:
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
In this example, the cancellable_operation catches the CancelledError, performs any necessary cleanup, and then re-raises the exception. This allows for graceful handling of cancellation while still propagating the cancellation status.
Let's explore implementing custom async iterators. These are useful for creating sequences that can be iterated over asynchronously:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
This AsyncRange class implements the async iterator protocol, allowing it to be used in async for loops.
Finally, let's look at implementing custom async context managers. These are useful for managing resources that need to be acquired and released asynchronously:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
This AsyncResource class implements the aenter and aexit methods, allowing it to be used with the async with statement.
In conclusion, Python's coroutine system provides a powerful foundation for building custom asynchronous primitives. By understanding the underlying mechanisms and protocols, you can create tailored solutions for specific asynchronous challenges, optimize performance in complex concurrent scenarios, and extend Python's async capabilities. Remember, while these custom implementations are great for learning and specific use cases, Python's built-in asyncio library is highly optimized and should be your go-to for most scenarios. Happy coding!
Be sure to check out our creations:
Investor Central | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
The above is the detailed content of Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps. For more information, please follow other related articles on the PHP Chinese website!