Apakah fungsi asyncio.task yang biasa digunakan dalam perpustakaan Python Asyncio?

王林
Lepaskan: 2023-05-12 19:49:04
ke hadapan
1563 orang telah melayarinya

0. Asas

Dalam "Python Asyncio Scheduling Principle", dua unit penjadualan asas Asyncio diperkenalkan, Handler dan TimeHandler, ia hanya boleh dipanggil oleh loop.call_xx Fungsi, Pembangun tidak mengetahui kewujudan mereka di permukaan Mereka dan loop.call_xx tergolong dalam fungsi asas gelung acara, tetapi operasi ini adalah operasi tunggal dan pembangun perlu menulis kod mereka sendiri untuk menyambungkan operasi mereka secara bersiri. Dalam "The Role of Python's Waitable Objects in Asyncio", diperkenalkan bahawa pemula rantai coroutine asyncio.Task boleh berinteraksi dengan gelung acara melalui loop.call_soon, dan menyambungkan objek yang boleh menunggu dalam keseluruhan rantai coroutine dan mengatur yang boleh menunggu. objek lari. Walau bagaimanapun, untuk loop.call_at dan loop.call_later, pembangun masih perlu menggunakan asyncio.Future untuk menyambungkan hasil pelaksanaan Timehandler dengan asyncio.Task Contohnya, pelaksanaan kod tidur selama satu saat:

rreee

Dalam kod, asyncio.Future melaksanakan fungsi seperti bekas dan akan menerima pelbagai keadaan dan menyegerakkan keadaannya sendiri kepada asyncio.Task yang menguruskan rantai coroutine semasa, supaya asyncio.Task boleh mengurus jenis operasi lain. Prinsip semua fungsi fungsi dalam modul asyncio.tasks adalah serupa Parameter yang mereka terima pada dasarnya adalah objek yang boleh ditunggu, dan kemudian asyncio.Futurte digunakan sebagai bekas untuk menyegerakkan keadaan antara pemanggil dan objek yang boleh ditunggu digunakan melalui lain Beberapa kaedah menyegerakkan status asyncio.Task kepada objek yang boleh ditunggu.

1. Sleep--asyncio.sleep

asyncio.sleep ialah kaedah biasa yang melaluinya pembangun boleh membiarkan coroutine tidur untuk masa yang ditetapkan Ia juga sangat Mudah, kod sumbernya adalah seperti berikut:

import asyncio


async def main():
    loop = asyncio.get_event_loop()
    f = asyncio.Future()

    def _on_complete():
        f.set_result(True)

    loop.call_later(1, _on_complete)
    return await f


if __name__ == "__main__":
    import time
    s_t = time.time()
    asyncio.run(main())
    print(time.time() - s_t)
Salin selepas log masuk

Melalui kod sumber, anda boleh mendapati bahawa apabila masa tidur yang ditetapkan bersamaan dengan kurang daripada 0, sleep hanya melaksanakan yield dan tidak melaksanakan logik lain apabila nilai lebih besar daripada Pada 0, objek Future akan dibuat, dan kemudian ia akan menunggu sehingga objek Future dikawal oleh loop.call_later sebelum mengembalikan nilai hasil.

Perlu diingatkan bahawa apabila asyncio.sleep ialah 0, sleep melaksanakan yield supaya Task.__step boleh melihat dan melepaskan kawalan Ini adalah cara terkecil untuk melepaskan kawalan arus coroutine. Kaedah yang betul, jadi apabila kita menulis fungsi yang melibatkan banyak CPU atau mengambil masa yang lama, kita boleh melepaskan kawalan secara aktif melalui asyncio.sleep(0), seperti berikut:

@types.coroutine
def __sleep0():
    yield


async def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result

    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()
Salin selepas log masuk

Dalam contoh ini, setiap gelung Beri kawalan 100 kali untuk mengurangkan kesan pada coroutine lain.

2. Pembatalan Shield--asyncio.shield

asyncio.shield boleh melindungi objek yang boleh ditunggu daripada dibatalkan atau menghalang pembatalan pada rantai coroutine daripada disebarkan ke asyncio.shield terurus A objek boleh tunggu, tetapi memanggil kaedah cancel objek boleh tunggu masih boleh membatalkan operasi objek boleh tunggu, seperti yang ditunjukkan dalam contoh berikut:

import asyncio

async def demo() -> None:
    for index, i in enumerate(range(10000)):
        if index % 100 == 0:
            await asyncio.sleep(0)
        ...  # 假设这里的代码占用过多的CPU时间
Salin selepas log masuk

di mana f1, f2 semuanya dicipta dalam fungsi main , dan kemudian dibalut oleh fungsi sub pada masa yang sama, dan jalankan secara tak segerak di latar belakang melalui asyncio.create_task dan kembalikan sub1 dan sub2 masing-masing sepadan dengan pelaksanaan status fungsi Future. Kemudian batalkan pelaksanaan sub dan f1 masing-masing, dan cetak sama ada sub2, f1, f2 dan sub1 adalah sub2 Anda boleh mencari done, f1, Status sub1 ialah sub2 (dibatalkan juga dianggap selesai), manakala done masih berjalan. f2

menyebut dalam artikel "The Role of Python's Waitable Objects in Asyncio" bahawa rantai coroutine diketuai oleh

, dan semua kejayaan dan pengecualian seterusnya akan disebarkan pada rantaian ini pada dasarnya adalah pengecualian. jadi ia juga boleh disebarkan pada rantai coroutine. Dan asyncio.Task Untuk mengelakkan objek yang boleh ditunggu yang sedang berjalan daripada menerima pengecualian daripada rantaian coroutine dan biarkan rantaian coroutine mengetahui hasil pelaksanaan objek yang boleh ditunggu, objek yang boleh ditunggu akan mula dijalankan dalam rantaian coroutine yang lain, dan kemudian coroutine baharu Rantaian akan dibuat. Bekas disambungkan ke rantai asal dan memberitahu bekas hasil apabila objek yang boleh ditunggu selesai

def shield(arg):
    # 如果是Coro,则需要包装成future
    inner = _ensure_future(arg)
    if inner.done():
        # 如果已经完成,就不需要被处理了
        return inner
    loop = futures._get_loop(inner)
    # 创建一个future容器
    outer = loop.create_future()

    def _inner_done_callback(inner):
        if outer.cancelled():
            if not inner.cancelled():
                # 如果容器已经被取消,而自己没被取消且已经完成,则手动获取下结果,方便被回收
                inner.exception()
            return

        if inner.cancelled():
            # 如果自己被取消,则把取消通过容器传播到协程链上
            outer.cancel()
        else:
            # 自己已经完成且容器未完成,把自己的结果或者异常通过替身传播到协程链上
            exc = inner.exception()
            if exc is not None:
                outer.set_exception(exc)
            else:
                outer.set_result(inner.result())


    def _outer_done_callback(outer):
        if not inner.done():
            inner.remove_done_callback(_inner_done_callback)

    # 添加回调,在执行成功或被取消时通知对方
    inner.add_done_callback(_inner_done_callback)
    outer.add_done_callback(_outer_done_callback)
    return outer
Salin selepas log masuk

通过源码可以发现shield被调用的时候(假设驱动调用shieldTask名为main.Task),会先通过_ensure_future辅助函数创建一个Task(other.Task)在后台异步运行可等待对象,驱动可等待对象的运行,由于是新的Task驱动着可等待对象的执行,所以main.Task的任何状态不会传播到当前的可等待对象。 接着创建一个Future容器,并在other.TaskFuture容器挂上完成的回调使他们在完成的时候都能通知到对方,最后返回Future容器给main.Task,使main.Task能够间接的知道可等待对象的运行结果,如下图:

Apakah fungsi asyncio.task yang biasa digunakan dalam perpustakaan Python Asyncio?

不过Future容器完成的回调只是把托管可等待对象的other.Task回调给移除了,导致main.Task的状态不会同步到other.Task中(图中Future通知可等待对象aws的通道是不通的),进而不会影响到托管的可等待对象。 而other.Task完成的回调会把任何状态同步到Future中,进而影响到main.Task

3.超时--asyncio.wait_for

asyncio.wait_for可以托管可等待对象,直到可等待对象完成,不过可等待对象在设定的时间内还没执行完成时会被直接取消执行并抛出asyncio.TimeoutError异常。 它的运行原理综合了上面的asyncio.shieldasyncio.sleep,它一样会为可等待对象创建一个Future容器,并在容器上挂了一个超时的回调和可等待对象执行结束的回调,接着就等待容器执行结束。 不过在了解asyncio.wait_for之前,先了解他用到的两个辅助函数_cancel_and_wait_release_waiter,他们的源码如下:

def _release_waiter(waiter, *args):
    if not waiter.done():
        waiter.set_result(None)

async def _cancel_and_wait(fut, loop):
    waiter = loop.create_future()
    cb = functools.partial(_release_waiter, waiter)
    fut.add_done_callback(cb)

    try:
        fut.cancel()
        await waiter
    finally:
        fut.remove_done_callback(cb)
Salin selepas log masuk

可以看出源码比较简单,他们的作用都是为了确保可等待对象能完全执行结束才返回,其中_release_waiter是确保可等待对象一定被设置为执行结束,而_cancel_and_wait是为了确保能等到可等待对象被取消且完整结束时才返回。

可等待对象的cancel方法可以认为是异步的,调用后需要等事件循环再次调用可等待对象时,可等待对象才会被取消。而_cancel_and_wait通过一个容器来规避这个问题,使取消这个操作变为同步的,这个方法在某些开发场景经常被使用,如果不是私有API就更好了。

接下来就可以通过wait_for的源码了解他的执行逻辑了,源码如下:

async def wait_for(fut, timeout):
    loop = events.get_running_loop()

    if timeout is None:
        return await fut

    if timeout <= 0:
        # 当超时的值小于等于0时就意味着想马上得到结果
        
        fut = ensure_future(fut, loop=loop)

        if fut.done():
            # 如果执行完成就返回可等待对象的数据
            return fut.result()
        # 取消可等待对象并等待
        await _cancel_and_wait(fut, loop=loop)
        # 如果被_cancel_and_wait取消,那么会抛出CancelledError异常,这时候把它转为超时异常
        try:
            return fut.result()
        except exceptions.CancelledError as exc:
            raise exceptions.TimeoutError() from exc

    # 初始化一个Future,只有在超时和完成时才会变为done
    waiter = loop.create_future()
    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    cb = functools.partial(_release_waiter, waiter)

    fut = ensure_future(fut, loop=loop)
    fut.add_done_callback(cb)

    try:
        try:
            await waiter
        except exceptions.CancelledError:
            # 此时是asyncio.Task被取消,并把取消传播到waiter
            if fut.done():
                return fut.result()
            else:
                # 如果任务被取消了,那么需要确保任务没有被执行才返回
                fut.remove_done_callback(cb)
                await _cancel_and_wait(fut, loop=loop)
                raise
        # 计时结束或者是执行完毕的情况
        if fut.done():
            # 执行完毕,返回对应的值
            return fut.result()
        else:
            # 计时结束,清理资源,并抛出异常
            fut.remove_done_callback(cb)
            # 如果任务被取消了,那么需要确保任务没有被执行才返回
            await _cancel_and_wait(fut, loop=loop)
            # 如果被_cancel_and_wait取消,那么会抛出CancelledError异常,这时候把它转为超时异常
            try:
                return fut.result()
            except exceptions.CancelledError as exc:
                raise exceptions.TimeoutError() from exc
    finally:
        timeout_handle.cancel()
Salin selepas log masuk

wait_for的源码为了兼容各种情况,代码复杂度比较高,同时超时参数小于等于0跟大于0的逻辑是一样的,分开写只是为了避免在小于等于0时创建了一些额外的对象,在精简了一些asyncio.Task传播异常给waiter的逻辑后,wait_for的执行逻辑如下图:

Apakah fungsi asyncio.task yang biasa digunakan dalam perpustakaan Python Asyncio?

fut为可等待对象,timeout为超时时间

可以看到wait_for的主要逻辑是先创建一个名为waiter的容器,接着通过loop.call_later指定在多少时间后释放容器,然后再通过ensure_future使另一个asyncio.Task来托管可等待对象,并安排执行完毕的时候释放容器,再等待waiter容器的执行直到被释放。当容器被释放的时候再判断可等待对象是否执行完毕,如果执行完毕了就直接返回,否则抛出超时异常。

4.简单的等待--wait

asyncio.wait用于等待一批可等待对象,当有一个可等待对象执行完成或者出现异常的时候才会返回数据(具体还是要看return_when指定的条件,默认为所有等待对象结束或取消时才返回),需要注意的是wait虽然支持timeout参数,但是在超时的试试不会取消可等待对象,也不会抛出超时的异常,只会把完成的可等待对象放在完成的集合,把未完成的可等待对象放在未完成的集合并返回,如下代码:

import asyncio


async def main():
    return await asyncio.wait(
        {asyncio.create_task(asyncio.sleep(1))},
        timeout=0.5
    )


if __name__ == "__main__":
    asyncio.run(main())
Salin selepas log masuk

这段代码可以正常的运作,不会抛出超时错,不过还要注意的是在后续版本中asyncio.wait只支持Task对象,如果想要传入的是coroFuture对象,则需要开发者自己手动转换。 wait的逻辑与wait_for类似,源码如下:

async def _wait(fs, timeout, return_when, loop):
    assert fs, &#39;Set of Futures is empty.&#39;
    waiter = loop.create_future()
    timeout_handle = None
    if timeout is not None:
        # 定义一个time handler,在timeout秒后通过`_release_waiter`完成.
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    counter = len(fs)

    def _on_completion(f):
        # 每个可等待对象执行完成的回调
        nonlocal counter
        counter -= 1
        if (counter <= 0 or
            return_when == FIRST_COMPLETED or
            return_when == FIRST_EXCEPTION and
             (not f.cancelled() and f.exception() is not None)
        ):
            # 如果所有任务执行完成,或者是第一个完成或者是第一个抛出异常时,
            # 意味着执行完成,需要取消time handler,并标记为完成
            if timeout_handle is not None:
                timeout_handle.cancel()
            if not waiter.done():
                waiter.set_result(None)
    # 为每个可等待对象添加回调
    for f in fs:
        f.add_done_callback(_on_completion)

    try:
        # 等待替身执行完成
        await waiter
    finally:
        # 取消time handler并移除回调(因为cancel是异步的)
        if timeout_handle is not None:
            timeout_handle.cancel()
        for f in fs:
            f.remove_done_callback(_on_completion)

    # 处理并返回done和pending,其中done代表完成,pending代表执行中。
    done, pending = set(), set()
    for f in fs:
        if f.done():
            done.add(f)
        else:
            pending.add(f)
    return done, pending
Salin selepas log masuk

可以看到wait_for的复杂度没有wait高,而且可以看到asyncio.wait是等waiter这个容器执行完并移除可等待对象上面的_on_completion回调后才把可等待对象按照是否完成区分到donepending两个集合,这样的准确度比在_on_completion高一些,但是如果开发者在处理集合时触发一些异步操作也可能导致pending集合中的部分可等待对象变为完成的,如下代码:

import asyncio


async def main():
    f_list = [asyncio.Future() for _ in range(10)]
    done, pending = await asyncio.wait(f_list, timeout=1)
    print(len(done), len(pending))
    print([i for i in pending if i.done()])
    f_list[1].set_result(True)
    print([i for i in pending if i.done()])


if __name__ == "__main__":
    asyncio.run(main())
# >>> 0 10
# >>> []
# >>> [<Future finished result=True>]
Salin selepas log masuk

通过输出可以发现,在asyncio.wait执行完毕后,pending中的完成的元素只有0个,而在后续强制为其中的一个Future设置数据后,pending中完成的元素有1个了。

5.迭代可等待对象的完成--asyncio.as_completed

asyncio.wait的机制是只要被触发就会返回,其他尚未完成的可等待对象需要开发者自己在处理,而asyncio.as_completed可以确保每个可等待对象完成返回数据或者超时时抛出异常,使用方法如下:

import asyncio


async def sub(i):
    await asyncio.sleep(i)
    return i


async def main():
    for f in asyncio.as_completed([sub(i) for i in range(5)], timeout=3):
        print(await f)


if __name__ == "__main__":
    asyncio.run(main())
# >>> 0
# >>> 1
# >>> 2
# >>> Traceback (most recent call last):
#       File "/home/so1n/github/demo_project/demo.py", line 18, in <module>
#         asyncio.run(main())
#       File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
#         return loop.run_until_complete(main)
#       File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
#         return future.result()
#       File "/home/so1n/github/demo_project/demo.py", line 14, in main
#         print(await f)
#       File "/usr/lib/python3.7/asyncio/tasks.py", line 532, in _wait_for_one
#         raise futures.TimeoutError
#     concurrent.futures._base.TimeoutError
Salin selepas log masuk

该程序并发执行5个协程,其中执行最久的时间是5秒,而as_completed设置的超时为3秒。通过输出可以发现,每当一个可等待对象执行结束时就会把数据抛出来,当超时则会抛出超时错误。为了能达每有一个可等待对象就返回一次数据的效果,as_completed通过一个队列来维护数据的返回,它的源码如下:

def as_completed(fs, *, timeout=None):
    from .queues import Queue  # Import here to avoid circular import problem.
    done = Queue()

    loop = events._get_event_loop()
    todo = {ensure_future(f, loop=loop) for f in set(fs)}
    timeout_handle = None

    def _on_timeout():
        # 超时时调用,需要注意的是,失败时结果为空,所以要推送一个空的数据到队列中
        # 在消费者发现元素为空时抛出错误
        for f in todo:
            f.remove_done_callback(_on_completion)
            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
        todo.clear()  # Can&#39;t do todo.remove(f) in the loop.

    def _on_completion(f):
        # 如果成功,就把Future推送到队列中,消费者可以通过Future获取到结果
        if not todo:
            return  # _on_timeout() was here first.
        todo.remove(f)
        done.put_nowait(f)
        if not todo and timeout_handle is not None:
            timeout_handle.cancel()

    async def _wait_for_one():
        f = await done.get()
        if f is None:
            # 如果元素为空,则证明已经超时了,要抛出异常
            raise exceptions.TimeoutError
        return f.result()

    for f in todo:
        f.add_done_callback(_on_completion)
    if todo and timeout is not None:
        timeout_handle = loop.call_later(timeout, _on_timeout)
    # 通过生成器语法返回协程函数,该协程函数可以获取最近完成的可等待对象的结果
    for _ in range(len(todo)):
        yield _wait_for_one()
Salin selepas log masuk

通过源码可以发现可等待对象就像生产者一样,执行结束的时候就会把结果投递给队列,同时as_completed会迭代跟可等待对象的数量一样的_wait_for_one协程函数,供开发者消费数据。不过需要注意的是as_completed在超时的时候,并不会取消尚未完成的可等待对象,他们会变为不可控的状态,在某些时候会造成内存溢出,如下示例代码:

import asyncio
import random


async def sub():
    # 一半的几率会被set一个值并返回,一半的几率会卡死
    f = asyncio.Future()
    if random.choice([0, 1]) == 0:
        f.set_result(None)
    return await f


async def main():
    try:
        for f in asyncio.as_completed([sub() for i in range(5)], timeout=1):
            print(await f)
    except asyncio.TimeoutError:
        # 忽略超时
        pass
    # 统计未完成的sub任务
    cnt = 0
    for i in asyncio.all_tasks():
        if i._coro.__name__ == sub.__name__:
            cnt += 1
    print("runing task by name sub:", cnt)


if __name__ == "__main__":
    asyncio.run(main())
# >>> None
# >>> None
# >>> None
# >>> runing task by name sub: 2
Salin selepas log masuk

通过结果(由于采用随机,结果可能不一样)可以发现,sub成功执行完成的数量有3个(输出None),而在as_completed触发超时后仍有两个sub在执行中,这时的两个sub成为无人管理的可等待对象,除非开发者通过asyncio.all_tasks去找到他并清理掉,否则这几个可等待对象会一直伴随着程序运行,这很容易造成内存溢出。

Atas ialah kandungan terperinci Apakah fungsi asyncio.task yang biasa digunakan dalam perpustakaan Python Asyncio?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan
Tentang kita Penafian Sitemap
Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!