Python 標準ライブラリは、対応するマルチスレッド/マルチプロセス コードを作成するためのスレッド化モジュールとマルチプロセッシング モジュールを提供します。ただし、プロジェクトが一定の規模に達すると、プロセスまたはスレッドの頻繁な作成/破棄は非常にリソースがかかります。現時点では、独自のスレッド プール/プロセス プールを作成するには、スペースと時間を引き換えにします。ただし、Python 3.2 以降、標準ライブラリは concurrent.futures モジュールを提供します。このモジュールは、ThreadPoolExecutor と ProcessPoolExecutor の 2 つのクラスを提供します。これは、スレッドとマルチプロセッシングのさらなる抽象化を実現し、スレッド プール/プロセス プールの書き込みへの直接アクセスを提供します。サポート。
concurrent.futuresモジュールはExecutorに基づいており、Executorは抽象クラスであり、直接使用することはできません。ただし、提供される 2 つのサブクラス ThreadPoolExecutor と ProcessPoolExecutor は、名前が示すように、それぞれスレッド プール コードとプロセス プール コードを作成するために使用されます。対応するタスクをスレッド プール/プロセス プールに直接配置でき、デッドロックを心配するためにキューを維持する必要はありません。スレッド プール/プロセス プールが自動的にスケジュールを設定します。
この概念は、Java や Nodejs のプログラミング経験のある友人には馴染みのあるものだと思います これは、従来のプログラミング モデルの基礎です。私たちの例のように、queue.get を操作すると、結果が返されるのを待つ前にブロックが発生し、Future の導入により、待機中に他の操作を完了することができます。 Python の非同期 IO については、この記事を読んだ後に私の Python 同時プログラミング コルーチン/非同期 IO を参照してください。 追記: まだ Python2.x に固執している場合は、最初に futures モジュールをインストールしてください。
pipインストール先物
まず次のコードを通してスレッドプールの概念を理解しましょう
# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time。 sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # 最大 2 つのタスクを収容できるスレッド プールを作成します
future1 = pool.submit(return_future_result, ("hello")) # スレッド プールに移動 Add a task
future2 = pool.submit(return_future_result, ("world")) # スレッドプールにタスクを追加します
print(future1.done()) # task1 が終了したかどうかを判断します
time.sleep(3)
print( future2.done()) # task2が終了したかどうかを判定
print(future1.result()) # task1が返した結果を確認
print(future2.result()) # task2が返した結果を確認
をもとに分析してみましょう実行結果。 submit メソッドを使用してタスクをスレッド プールに追加し、submit は Future オブジェクトを返します。Future オブジェクトは、将来に完了する操作として単純に理解できます。最初の print ステートメントでは、メインスレッドを一時停止するために time.sleep(3) を使用したため、time.sleep(2) が原因で future1 が完了していないことは明らかです。したがって、2 番目の print ステートメントに関しては、次のようになります。スレッド プール ここでのタスクはすべて完了しました。
ziwenxie :: ~ » python example1.py
False
True
hello
world
# 上記のプログラムの実行中、ps コマンドを通じて、バックグラウンドで同時に 3 つのスレッドが実行されていることがわかります
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py
ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1 .py
ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
上記のコードをプロセス プール形式に書き直すこともできます。API とスレッド プールはまったく同じなので、くどくどではないだろう。
# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_ future_result , ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print ( future1.result())
print(future2.result())
以下は実行結果です
ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps - eLf | grep python
ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py
ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00: 00:00 python example2.py
ジウェンシー8560 7557 8564 0 3 19:53 ポイント/0 00:00:00 Python example2.py
ziwenxie 8561 8560 8561 0 1 19:53 ポイント/0 00:00:00 Python example2.py
ziwenxie 8562 60 0 1 19 :53 ポイント /0 00:00:00 python example2.py
送信に加えて、Exectuor は、ビルドされたメソッドと同様のマップ メソッドも提供します。以下では、2 つの例を使用して 2 つの違いを比較します。
# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https:/ /api.github.com/']
defload_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# with ステートメントを使用できますスレッドが即座にクリーンアップされるようにするため
concurrent.futures.ThreadPoolExecutor(max_workers=3) を実行者として使用します:
# ロード操作を開始し、各 Future に URL をマークします
future_to_url = {executor.submit(load_url, url, 60): URL の URL の URL}
concurrent.futures.as_completed(future_to_url) の将来の場合:
url = future_to_url[future]
try:
data = future.result()
exc としての例外を除く:
print('%r が生成されました例外: %s' % (url, exc))
else:
print('%r ページは %d バイト' % (url, len(data)))
从运行結果可看出,as_completed不是按照
ziwenxie :: ~ » python example3.py
'http://example.com/' ページは 1270 バイトです
'https://api.github.com/' ページは 1270 バイトです2039 バイト
「http://httpbin.org」ページは 12150 バイトです
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org' , 'http://example.com/', 'https://api.github.com/']
defload_url(url):
urllib.request.urlopen(url, timeout=60) as conn:
return conn.read()
# with ステートメントを使用して、スレッドが即座にクリーンアップされるようにすることができます
concurrent.futures.ThreadPoolExecutor(max_workers=3) を実行者として使用します:
for url, data in zip(URLS, executor.map( load_url, URLS)):
print('%r ページは %d バイト' % (url, len(data)))
実行結果が表示されます、
map は按照 URLS 表元素の順序が返される、そして
ziwenxie :: ~ » python example4.py
'http://httpbin.org' ページは 12150 バイトです
'http:// example.com/' ページは 1270 バイトです
'https://api.github.com/' ページは 2039 バイトです
私は次の 3 つのパラメータの領域を確認します
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from randint import randint
def return_after_random_secs(番号):
sleep(randint(1, 5))
return "{}の戻り".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool .submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
如果採用默认的ALL_COMPLETED,プログラム会は回線池里面まで阻止する
ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
< ;Future at 0x7f0b06373898 state=finished returns str>,
如果採用FIRST_COMPLETEDパラメータ、
ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
not_done={
以上がPython 同時プログラミング スレッド プール/プロセス プールの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。