Rumah > pembangunan bahagian belakang > Tutorial Python > Master Python Coroutines: Cipta Alat Async Tersuai untuk Apl Serentak Berkuasa

Master Python Coroutines: Cipta Alat Async Tersuai untuk Apl Serentak Berkuasa

DDD
Lepaskan: 2024-11-29 12:18:14
asal
1061 orang telah melayarinya

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Coroutine dalam Python ialah alat yang berkuasa untuk menulis kod tak segerak. Mereka telah merevolusikan cara kami mengendalikan operasi serentak, menjadikannya lebih mudah untuk membina aplikasi berskala dan cekap. Saya telah menghabiskan banyak masa bekerja dengan coroutine dan saya teruja untuk berkongsi beberapa cerapan tentang mencipta primitif tak segerak tersuai.

Mari kita mulakan dengan perkara asas. Coroutine ialah fungsi khas yang boleh dijeda dan disambung semula, membolehkan kerja berbilang tugas secara kooperatif. Mereka adalah asas sintaks async/menunggu Python. Apabila anda mentakrifkan coroutine, anda pada asasnya mencipta fungsi yang boleh menghasilkan kawalan kembali kepada gelung acara, membenarkan tugasan lain dijalankan.

Untuk mencipta objek tersuai yang boleh ditunggu, anda perlu melaksanakan kaedah menunggu. Kaedah ini harus mengembalikan iterator. Berikut ialah contoh mudah:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
Salin selepas log masuk
Salin selepas log masuk

Kelas CustomAwaitable ini boleh digunakan dengan kata kunci await, sama seperti awaitable terbina dalam. Apabila ditunggu, ia menghasilkan kawalan sekali, kemudian mengembalikan nilainya.

Tetapi bagaimana jika kita mahu mencipta primitif tak segerak yang lebih kompleks? Mari kita lihat melaksanakan semafor tersuai. Semaphore digunakan untuk mengawal akses kepada sumber yang dikongsi oleh berbilang coroutine:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
Salin selepas log masuk
Salin selepas log masuk

Kelas CustomSemaphore ini melaksanakan kaedah peroleh dan keluarkan, serta protokol pengurus konteks async (aenter dan aexit). Ia membenarkan maksimum dua coroutine untuk memperoleh semaphore secara serentak.

Sekarang, mari kita bincangkan tentang mencipta gelung acara yang cekap. Walaupun asyncio Python menyediakan pelaksanaan gelung acara yang mantap, mungkin terdapat kes di mana anda memerlukan yang tersuai. Berikut ialah contoh asas gelung acara tersuai:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)
Salin selepas log masuk

Gelung acara tersuai ini melaksanakan fungsi asas seperti menjalankan tugas, mengendalikan coroutine dan juga fungsi tidur yang ringkas. Ia tidak sekaya ciri seperti gelung acara terbina dalam Python, tetapi ia menunjukkan konsep teras.

Salah satu cabaran dalam menulis kod tak segerak ialah mengurus keutamaan tugas. Walaupun asyncio Python tidak menyediakan baris gilir keutamaan terbina dalam untuk tugasan, kami boleh melaksanakan tugas kami sendiri:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())
Salin selepas log masuk

PriorityEventLoop ini menggunakan baris gilir timbunan untuk mengurus tugasan berdasarkan masa pelaksanaan yang dijadualkan. Anda boleh menetapkan keutamaan dengan menjadualkan tugas dengan kelewatan yang berbeza.

Mengendalikan pembatalan dengan baik adalah satu lagi aspek penting dalam bekerja dengan coroutine. Berikut ialah contoh cara melaksanakan tugas yang boleh dibatalkan:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())
Salin selepas log masuk

Dalam contoh ini, cancelled_operation menangkap CancelledError, melakukan sebarang pembersihan yang diperlukan, dan kemudian menaikkan semula pengecualian. Ini membolehkan pengendalian pembatalan yang bijak sambil masih menyebarkan status pembatalan.

Mari kita terokai melaksanakan iterator async tersuai. Ini berguna untuk mencipta jujukan yang boleh diulang secara tidak segerak:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
Salin selepas log masuk
Salin selepas log masuk

Kelas AsyncRange ini melaksanakan protokol lelaran async, membenarkan ia digunakan dalam async untuk gelung.

Akhir sekali, mari kita lihat melaksanakan pengurus konteks async tersuai. Ini berguna untuk mengurus sumber yang perlu diperoleh dan dikeluarkan secara tak segerak:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
Salin selepas log masuk
Salin selepas log masuk

Kelas AsyncResource ini melaksanakan kaedah aenter dan aexit, membenarkan ia digunakan dengan penyataan async dengan.

Kesimpulannya, sistem coroutine Python menyediakan asas yang kuat untuk membina primitif tak segerak tersuai. Dengan memahami mekanisme dan protokol asas, anda boleh mencipta penyelesaian yang disesuaikan untuk cabaran tak segerak tertentu, mengoptimumkan prestasi dalam senario serentak yang kompleks dan memanjangkan keupayaan async Python. Ingat, walaupun pelaksanaan tersuai ini bagus untuk pembelajaran dan kes penggunaan khusus, perpustakaan asyncio terbina dalam Python sangat dioptimumkan dan harus menjadi pilihan anda untuk kebanyakan senario. Selamat mengekod!


Ciptaan Kami

Pastikan anda melihat ciptaan kami:

Pusat Pelabur | Hidup Pintar | Epos & Gema | Misteri Membingungkan | Hindutva | Pembangunan Elit | Sekolah JS


Kami berada di Medium

Tech Koala Insights | Dunia Epok & Gema | Medium Pusat Pelabur | Medium Misteri Membingungkan | Sains & Zaman Sederhana | Hindutva Moden

Atas ialah kandungan terperinci Master Python Coroutines: Cipta Alat Async Tersuai untuk Apl Serentak Berkuasa. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan