Future 是一個容器,可以保存計算結果或計算期間發生的錯誤。建立 future 時,它以 PENDING 狀態開始。該庫不打算手動建立此對象,除非出於測試目的。
import concurrent.futures as futures f = futures.Future() assert(f._result is None) assert(f._exception is None) assert(f._state == 'PENDING')
PENDING 狀態表示使用者請求的計算已註冊到執行緒池中並放入佇列中,但尚未被任何執行緒拾取執行。一旦空閒執行緒從佇列中取得任務(回呼),Future 就會轉換為 RUNNING 狀態。 Future 只能在處於 PENDING 狀態時被取消。因此,在 PENDING 和 RUNNING 狀態之間存在一個時間窗口,在此期間可以取消請求的計算。
import concurrent.futures as futures def should_cancel_pending_future(): f = futures.Future() assert(f._state == 'PENDING') assert(f.cancel()) assert(f._state == 'CANCELLED') def should_not_cancel_running_future(): f = futures.Future() f.set_running_or_notify_cancel() assert(f._state == 'RUNNING') assert(not f.cancel()) def cancel_is_idempotent(): f = futures.Future() assert(f.cancel()) assert(f.cancel()) should_cancel_pending_future() should_not_cancel_running_future() cancel_is_idempotent()
執行緒池中請求的操作可以完成計算值或導致錯誤。無論結果如何,未來都會過渡到 FINISHED 狀態。然後結果或錯誤將儲存在相應的欄位中。
import concurrent.futures as futures def future_completed_with_result(): f = futures.Future() f.set_result('foo') assert(f._state == 'FINISHED') assert(f._result == 'foo') assert(f._exception is None) def future_completed_with_exception(): f = futures.Future() f.set_exception(NameError()) assert(f._state == 'FINISHED') assert(f._result is None) assert(isinstance(f._exception, NameError)) future_completed_with_result() future_completed_with_exception()
要檢索計算結果,請使用 result 方法。如果計算尚未完成,此方法將阻塞目前執行緒(從中呼叫結果),直到計算完成或等待逾時。
如果計算成功完成且沒有錯誤,則 result 方法會傳回計算值。
import concurrent.futures as futures import time import threading f = futures.Future() def target(): time.sleep(1) f.set_result('foo') threading.Thread(target=target).start() assert(f.result() == 'foo')
如果計算過程中發生異常,結果將引發此異常。
import concurrent.futures as futures import time import threading f = futures.Future() def target(): time.sleep(1) f.set_exception(NameError) threading.Thread(target=target).start() try: f.result() raise Exception() except NameError: assert(True)
如果方法在等待時逾時,則會引發 TimeoutError。
import concurrent.futures as futures f = futures.Future() try: f.result(1) raise Exception() except TimeoutError: assert(f._result is None) assert(f._exception is None)
嘗試取得已取消的計算結果將引發 CancelledError。
import concurrent.futures as futures f = futures.Future() assert(f.cancel()) try: f.result() raise Exception() except futures.CancelledError: assert(True)
在開發過程中,需要在執行緒池上執行N次運算並等待其完成是很常見的。為了實現這一點,該庫提供了等待函數。有幾個等待策略:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED。
所有等待策略的共同點是,如果傳遞給 wait 方法的 future 已經完成,則無論選擇何種策略,都會傳回傳遞的 future 的集合。無論它們是如何完成的,無論是有錯誤、結果還是被取消,都無關緊要。
import concurrent.futures as futures def test(return_when): f1, f2, f3 = futures.Future(), futures.Future(), futures.Future() f1.cancel() f1.set_running_or_notify_cancel() # required f2.set_result('foo') f3.set_exception(NameError) r = futures.wait([f1, f2, f3], return_when=return_when) assert(len(r.done) == 3) assert(len(r.not_done) == 0) for return_when in [futures.ALL_COMPLETED, futures.FIRST_EXCEPTION, futures.FIRST_COMPLETED]: test(return_when)
ALL_COMPLETED 策略保證等待所有傳遞的 future 完成,或在逾時後退出,並收集截至該時刻完成的 future,這可能是不完整的。
import concurrent.futures as futures f = futures.Future() assert(f._result is None) assert(f._exception is None) assert(f._state == 'PENDING')
FIRST_COMPLETED 策略保證會傳回至少有一個已完成的 future 的集合,或在逾時的情況下傳回空集合。 此策略並不表示傳回的集合不能包含多個元素。
import concurrent.futures as futures def should_cancel_pending_future(): f = futures.Future() assert(f._state == 'PENDING') assert(f.cancel()) assert(f._state == 'CANCELLED') def should_not_cancel_running_future(): f = futures.Future() f.set_running_or_notify_cancel() assert(f._state == 'RUNNING') assert(not f.cancel()) def cancel_is_idempotent(): f = futures.Future() assert(f.cancel()) assert(f.cancel()) should_cancel_pending_future() should_not_cancel_running_future() cancel_is_idempotent()
如果其中一個計算完成時發生錯誤,FIRST_EXCEPTION 策略會中斷等待。如果沒有發生異常,行為與 ALL_COMPLETED 未來相同。
import concurrent.futures as futures def future_completed_with_result(): f = futures.Future() f.set_result('foo') assert(f._state == 'FINISHED') assert(f._result == 'foo') assert(f._exception is None) def future_completed_with_exception(): f = futures.Future() f.set_exception(NameError()) assert(f._state == 'FINISHED') assert(f._result is None) assert(isinstance(f._exception, NameError)) future_completed_with_result() future_completed_with_exception()
該物件負責建立執行緒池。與該物件互動的主要方法是 Submit 方法。它允許在線程池中註冊計算。作為回應,傳回一個 Future 對象,用於監控計算狀態並取得最終結果。
屬性
以上是python 並發.futures的詳細內容。更多資訊請關注PHP中文網其他相關文章!