「Python Asyncio スケジューリングの原則」では、Asyncio
の 2 つの基本スケジューリング ユニット、Handler
と TimeHandler## が紹介されています。 #、これらは
loop.call_xx 関数によってのみ呼び出すことができます。開発者は表面上でそれらの存在を知りません。これらと
loop.call_xx はイベント ループの基本関数に属しますただし、これらの操作はすべて 1 つの操作に属しており、開発者は操作を直列に接続する独自のコードを記述する必要があります。 「Asyncio における Python の待機可能オブジェクトの役割」では、コルーチン チェーン
asyncio.Task のイニシエーターが
loop.call_soon を通じてイベント ループと対話し、接続できることが紹介されています。コルーチン チェーン全体の待機可能オブジェクトをシリーズ化し、待機可能オブジェクトの実行をスケジュールします。ただし、
loop.call_at と
loop.call_later の場合、開発者は引き続き
asyncio.Future を使用して
Timehandler の実行結果を比較する必要があります。
asyncio.Task は、1 秒間のスリープのコード実装など、直列に接続されています。
import asyncio async def main(): loop = asyncio.get_event_loop() f = asyncio.Future() def _on_complete(): f.set_result(True) loop.call_later(1, _on_complete) return await f if __name__ == "__main__": import time s_t = time.time() asyncio.run(main()) print(time.time() - s_t)
asyncio.Future がコンテナーを実行します。この関数は、さまざまな状態を受け入れ、現在のコルーチン チェーンを管理する
asyncio.Task に自身の状態を同期するため、
asyncio.Task は他の種類の操作を管理できます。
asyncio.tasks モジュール内のすべての関数の原理は似ています。受け入れられるパラメータは基本的に待機可能なオブジェクトであり、呼び出しを同期するためのコンテナとして
asyncio.Futurte が使用されます。待機可能オブジェクトのステータスを使用して、他のメソッドを通じて
asyncio.Task のステータスを待機可能オブジェクトに同期することもできます。
asyncio.sleep は、開発者がコルーチンをスリープに簡単に設定できる一般的に使用されるメソッドです。ソース コードは次のとおりです:
@types.coroutine def __sleep0(): yield async def sleep(delay, result=None): """Coroutine that completes after a given time (in seconds).""" if delay <= 0: await __sleep0() return result loop = events.get_running_loop() future = loop.create_future() h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result) try: return await future finally: h.cancel()
sleep は
yield のみを実行することがわかります。 、他のロジックは実行されず、値が 0 より大きい場合は
Future オブジェクトが作成され、その後
Future オブジェクトが ## になるまで待機します。 #loop.call_later
結果値は制御が完了した後にのみ返されます。
が 0 の場合、sleep
実行 yield
により Task.__step
が実行される可能性があることに注意してください。認識して制御を放棄することは、現在のコルーチンの制御を放棄する最小の方法であるため、大量の CPU を必要とする関数や長い時間がかかる関数を作成する場合は、asyncio を使用できます。次のように制御を放棄します:
import asyncio async def demo() -> None: for index, i in enumerate(range(10000)): if index % 100 == 0: await asyncio.sleep(0) ... # 假设这里的代码占用过多的CPU时间
この例では、ループ内で 100 回ごとに制御を放棄して、他のコルーチンへの影響を軽減します。
2. シールドのキャンセル -- asyncio.shieldasyncio.shield によって管理されますが、待機可能オブジェクトの
cancel メソッドを呼び出すと、次の例に示すように待機可能オブジェクトの操作をキャンセルできます。 # このうち、
f1、
f2
main 関数で作成され、同時に
sub 関数でラップされます。
asyncio.create_task を通過し、バックグラウンドで非同期に実行され、それぞれ sub1
と sub2
が返されます。2 つの Future
は実行ステータスに対応しますsub
関数の。次に、f1
と sub2
の実行をそれぞれキャンセルし、f1
,f2
,sub1
,## を置き換えます。 # sub2 が
done であるかどうかを出力すると、
f1、
sub1、
sub2 のステータスがすべてであることがわかります。
done (キャンセルも完了したとみなされます)、
f2 はまだ実行中です。
記事「Asyncio における Python の待機可能オブジェクトの役割」で述べたように、コルーチン チェーンは
asyncio.Task によって主導され、その後の成功と例外はすべて伝播されます。キャンセルは基本的に例外であるため、コルーチン チェーンに伝播することもできます。実行中の待機可能オブジェクトがコルーチン チェーンから例外を受け取るのを防ぎ、待機可能オブジェクトの実行結果をコルーチン チェーンに知らせるために、待機可能オブジェクトはまず別のコルーチン チェーンで実行され、次にコンテナーを作成して接続します。元のチェーンを呼び出し、待機可能オブジェクトが完了したら結果をコンテナーに伝え、コンテナーはその結果を元のコルーチン チェーンに伝播します。対応するソース コードは次のとおりです:
def shield(arg): # 如果是Coro,则需要包装成future inner = _ensure_future(arg) if inner.done(): # 如果已经完成,就不需要被处理了 return inner loop = futures._get_loop(inner) # 创建一个future容器 outer = loop.create_future() def _inner_done_callback(inner): if outer.cancelled(): if not inner.cancelled(): # 如果容器已经被取消,而自己没被取消且已经完成,则手动获取下结果,方便被回收 inner.exception() return if inner.cancelled(): # 如果自己被取消,则把取消通过容器传播到协程链上 outer.cancel() else: # 自己已经完成且容器未完成,把自己的结果或者异常通过替身传播到协程链上 exc = inner.exception() if exc is not None: outer.set_exception(exc) else: outer.set_result(inner.result()) def _outer_done_callback(outer): if not inner.done(): inner.remove_done_callback(_inner_done_callback) # 添加回调,在执行成功或被取消时通知对方 inner.add_done_callback(_inner_done_callback) outer.add_done_callback(_outer_done_callback) return outer
通过源码可以发现shield
被调用的时候(假设驱动调用shield
的Task
名为main.Task
),会先通过_ensure_future
辅助函数创建一个Task
(other.Task
)在后台异步运行可等待对象,驱动可等待对象的运行,由于是新的Task
驱动着可等待对象的执行,所以main.Task
的任何状态不会传播到当前的可等待对象。 接着创建一个Future
容器,并在other.Task
和Future
容器挂上完成的回调使他们在完成的时候都能通知到对方,最后返回Future
容器给main.Task
,使main.Task
能够间接的知道可等待对象的运行结果,如下图:
不过Future
容器完成的回调只是把托管可等待对象的other.Task
回调给移除了,导致main.Task
的状态不会同步到other.Task
中(图中Future
通知可等待对象aws
的通道是不通的),进而不会影响到托管的可等待对象。 而other.Task
完成的回调会把任何状态同步到Future
中,进而影响到main.Task
。
asyncio.wait_for
可以托管可等待对象,直到可等待对象完成,不过可等待对象在设定的时间内还没执行完成时会被直接取消执行并抛出asyncio.TimeoutError
异常。 它的运行原理综合了上面的asyncio.shield
和asyncio.sleep
,它一样会为可等待对象创建一个Future
容器,并在容器上挂了一个超时的回调和可等待对象执行结束的回调,接着就等待容器执行结束。 不过在了解asyncio.wait_for
之前,先了解他用到的两个辅助函数_cancel_and_wait
和_release_waiter
,他们的源码如下:
def _release_waiter(waiter, *args): if not waiter.done(): waiter.set_result(None) async def _cancel_and_wait(fut, loop): waiter = loop.create_future() cb = functools.partial(_release_waiter, waiter) fut.add_done_callback(cb) try: fut.cancel() await waiter finally: fut.remove_done_callback(cb)
可以看出源码比较简单,他们的作用都是为了确保可等待对象能完全执行结束才返回,其中_release_waiter
是确保可等待对象一定被设置为执行结束,而_cancel_and_wait
是为了确保能等到可等待对象被取消且完整结束时才返回。
可等待对象的cancel
方法可以认为是异步的,调用后需要等事件循环再次调用可等待对象时,可等待对象才会被取消。而_cancel_and_wait
通过一个容器来规避这个问题,使取消这个操作变为同步的,这个方法在某些开发场景经常被使用,如果不是私有API就更好了。
接下来就可以通过wait_for
的源码了解他的执行逻辑了,源码如下:
async def wait_for(fut, timeout): loop = events.get_running_loop() if timeout is None: return await fut if timeout <= 0: # 当超时的值小于等于0时就意味着想马上得到结果 fut = ensure_future(fut, loop=loop) if fut.done(): # 如果执行完成就返回可等待对象的数据 return fut.result() # 取消可等待对象并等待 await _cancel_and_wait(fut, loop=loop) # 如果被_cancel_and_wait取消,那么会抛出CancelledError异常,这时候把它转为超时异常 try: return fut.result() except exceptions.CancelledError as exc: raise exceptions.TimeoutError() from exc # 初始化一个Future,只有在超时和完成时才会变为done waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) cb = functools.partial(_release_waiter, waiter) fut = ensure_future(fut, loop=loop) fut.add_done_callback(cb) try: try: await waiter except exceptions.CancelledError: # 此时是asyncio.Task被取消,并把取消传播到waiter if fut.done(): return fut.result() else: # 如果任务被取消了,那么需要确保任务没有被执行才返回 fut.remove_done_callback(cb) await _cancel_and_wait(fut, loop=loop) raise # 计时结束或者是执行完毕的情况 if fut.done(): # 执行完毕,返回对应的值 return fut.result() else: # 计时结束,清理资源,并抛出异常 fut.remove_done_callback(cb) # 如果任务被取消了,那么需要确保任务没有被执行才返回 await _cancel_and_wait(fut, loop=loop) # 如果被_cancel_and_wait取消,那么会抛出CancelledError异常,这时候把它转为超时异常 try: return fut.result() except exceptions.CancelledError as exc: raise exceptions.TimeoutError() from exc finally: timeout_handle.cancel()
wait_for
的源码为了兼容各种情况,代码复杂度比较高,同时超时参数小于等于0跟大于0的逻辑是一样的,分开写只是为了避免在小于等于0时创建了一些额外的对象,在精简了一些asyncio.Task
传播异常给waiter
的逻辑后,wait_for
的执行逻辑如下图:
fut为可等待对象,timeout为超时时间
可以看到wait_for
的主要逻辑是先创建一个名为waiter
的容器,接着通过loop.call_later
指定在多少时间后释放容器,然后再通过ensure_future
使另一个asyncio.Task
来托管可等待对象,并安排执行完毕的时候释放容器,再等待waiter
容器的执行直到被释放。当容器被释放的时候再判断可等待对象是否执行完毕,如果执行完毕了就直接返回,否则抛出超时异常。
asyncio.wait
用于等待一批可等待对象,当有一个可等待对象执行完成或者出现异常的时候才会返回数据(具体还是要看return_when
指定的条件,默认为所有等待对象结束或取消时才返回),需要注意的是wait
虽然支持timeout
参数,但是在超时的试试不会取消可等待对象,也不会抛出超时的异常,只会把完成的可等待对象放在完成的集合,把未完成的可等待对象放在未完成的集合并返回,如下代码:
import asyncio async def main(): return await asyncio.wait( {asyncio.create_task(asyncio.sleep(1))}, timeout=0.5 ) if __name__ == "__main__": asyncio.run(main())
这段代码可以正常的运作,不会抛出超时错,不过还要注意的是在后续版本中asyncio.wait
只支持Task
对象,如果想要传入的是coro
和Future
对象,则需要开发者自己手动转换。 wait
的逻辑与wait_for
类似,源码如下:
async def _wait(fs, timeout, return_when, loop): assert fs, 'Set of Futures is empty.' waiter = loop.create_future() timeout_handle = None if timeout is not None: # 定义一个time handler,在timeout秒后通过`_release_waiter`完成. timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) def _on_completion(f): # 每个可等待对象执行完成的回调 nonlocal counter counter -= 1 if (counter <= 0 or return_when == FIRST_COMPLETED or return_when == FIRST_EXCEPTION and (not f.cancelled() and f.exception() is not None) ): # 如果所有任务执行完成,或者是第一个完成或者是第一个抛出异常时, # 意味着执行完成,需要取消time handler,并标记为完成 if timeout_handle is not None: timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) # 为每个可等待对象添加回调 for f in fs: f.add_done_callback(_on_completion) try: # 等待替身执行完成 await waiter finally: # 取消time handler并移除回调(因为cancel是异步的) if timeout_handle is not None: timeout_handle.cancel() for f in fs: f.remove_done_callback(_on_completion) # 处理并返回done和pending,其中done代表完成,pending代表执行中。 done, pending = set(), set() for f in fs: if f.done(): done.add(f) else: pending.add(f) return done, pending
可以看到wait_for
的复杂度没有wait
高,而且可以看到asyncio.wait
是等waiter
这个容器执行完并移除可等待对象上面的_on_completion
回调后才把可等待对象按照是否完成区分到done
和pending
两个集合,这样的准确度比在_on_completion
高一些,但是如果开发者在处理集合时触发一些异步操作也可能导致pending
集合中的部分可等待对象变为完成的,如下代码:
import asyncio async def main(): f_list = [asyncio.Future() for _ in range(10)] done, pending = await asyncio.wait(f_list, timeout=1) print(len(done), len(pending)) print([i for i in pending if i.done()]) f_list[1].set_result(True) print([i for i in pending if i.done()]) if __name__ == "__main__": asyncio.run(main()) # >>> 0 10 # >>> [] # >>> [<Future finished result=True>]
通过输出可以发现,在asyncio.wait
执行完毕后,pending
中的完成的元素只有0个,而在后续强制为其中的一个Future
设置数据后,pending
中完成的元素有1个了。
asyncio.wait
的机制是只要被触发就会返回,其他尚未完成的可等待对象需要开发者自己在处理,而asyncio.as_completed
可以确保每个可等待对象完成返回数据或者超时时抛出异常,使用方法如下:
import asyncio async def sub(i): await asyncio.sleep(i) return i async def main(): for f in asyncio.as_completed([sub(i) for i in range(5)], timeout=3): print(await f) if __name__ == "__main__": asyncio.run(main()) # >>> 0 # >>> 1 # >>> 2 # >>> Traceback (most recent call last): # File "/home/so1n/github/demo_project/demo.py", line 18, in <module> # asyncio.run(main()) # File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run # return loop.run_until_complete(main) # File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete # return future.result() # File "/home/so1n/github/demo_project/demo.py", line 14, in main # print(await f) # File "/usr/lib/python3.7/asyncio/tasks.py", line 532, in _wait_for_one # raise futures.TimeoutError # concurrent.futures._base.TimeoutError
该程序并发执行5个协程,其中执行最久的时间是5秒,而as_completed
设置的超时为3秒。通过输出可以发现,每当一个可等待对象执行结束时就会把数据抛出来,当超时则会抛出超时错误。为了能达每有一个可等待对象就返回一次数据的效果,as_completed
通过一个队列来维护数据的返回,它的源码如下:
def as_completed(fs, *, timeout=None): from .queues import Queue # Import here to avoid circular import problem. done = Queue() loop = events._get_event_loop() todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None def _on_timeout(): # 超时时调用,需要注意的是,失败时结果为空,所以要推送一个空的数据到队列中 # 在消费者发现元素为空时抛出错误 for f in todo: f.remove_done_callback(_on_completion) done.put_nowait(None) # Queue a dummy value for _wait_for_one(). todo.clear() # Can't do todo.remove(f) in the loop. def _on_completion(f): # 如果成功,就把Future推送到队列中,消费者可以通过Future获取到结果 if not todo: return # _on_timeout() was here first. todo.remove(f) done.put_nowait(f) if not todo and timeout_handle is not None: timeout_handle.cancel() async def _wait_for_one(): f = await done.get() if f is None: # 如果元素为空,则证明已经超时了,要抛出异常 raise exceptions.TimeoutError return f.result() for f in todo: f.add_done_callback(_on_completion) if todo and timeout is not None: timeout_handle = loop.call_later(timeout, _on_timeout) # 通过生成器语法返回协程函数,该协程函数可以获取最近完成的可等待对象的结果 for _ in range(len(todo)): yield _wait_for_one()
通过源码可以发现可等待对象就像生产者一样,执行结束的时候就会把结果投递给队列,同时as_completed
会迭代跟可等待对象的数量一样的_wait_for_one
协程函数,供开发者消费数据。不过需要注意的是as_completed
在超时的时候,并不会取消尚未完成的可等待对象,他们会变为不可控的状态,在某些时候会造成内存溢出,如下示例代码:
import asyncio import random async def sub(): # 一半的几率会被set一个值并返回,一半的几率会卡死 f = asyncio.Future() if random.choice([0, 1]) == 0: f.set_result(None) return await f async def main(): try: for f in asyncio.as_completed([sub() for i in range(5)], timeout=1): print(await f) except asyncio.TimeoutError: # 忽略超时 pass # 统计未完成的sub任务 cnt = 0 for i in asyncio.all_tasks(): if i._coro.__name__ == sub.__name__: cnt += 1 print("runing task by name sub:", cnt) if __name__ == "__main__": asyncio.run(main()) # >>> None # >>> None # >>> None # >>> runing task by name sub: 2
通过结果(由于采用随机,结果可能不一样)可以发现,sub
成功执行完成的数量有3个(输出None
),而在as_completed
触发超时后仍有两个sub
在执行中,这时的两个sub
成为无人管理的可等待对象,除非开发者通过asyncio.all_tasks
去找到他并清理掉,否则这几个可等待对象会一直伴随着程序运行,这很容易造成内存溢出。
以上がPython Asyncio ライブラリの asyncio.task の一般的に使用される関数は何ですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。