Python 標準ライブラリは、対応するマルチスレッド/マルチプロセス コードを作成するためのスレッド化モジュールとマルチプロセッシング モジュールを提供します。ただし、プロジェクトが一定の規模に達すると、プロセスまたはスレッドの頻繁な作成/破棄は非常にリソースがかかります。現時点では、独自のスレッド プール/プロセス プールを作成するには、スペースと時間を引き換えにします。しかし、Python 3.2 以降、標準ライブラリは concurrent.futures モジュールを提供します。これは、ThreadPoolExecutor と ProcessPoolExecutor の 2 つのクラスを提供し、スレッドとマルチプロセッシングのさらなる抽象化を実現し、スレッド プール/プロセス プールの作成を容易にします。サポートが提供されます。
concurrent.futures モジュールは Executor に基づいており、Executor は抽象クラスであるため、直接使用することはできません。ただし、提供される 2 つのサブクラス ThreadPoolExecutor と ProcessPoolExecutor は、名前が示すように、それぞれスレッド プール コードとプロセス プール コードの作成に使用されます。対応するタスクをスレッド プール/プロセス プールに直接配置でき、デッドロックを心配するためにキューを維持する必要はありません。スレッド プール/プロセス プールが自動的にスケジュールを設定します。
この概念は、Java や Nodejs のプログラミング経験のある友人には馴染みのあるものだと思います これは、従来のプログラミング モデルの基礎です。私たちの例のように、queue.get を操作すると、結果が返されるのを待つ前にブロックが発生し、Future の導入により、待機中に他の操作を完了することができます。 Python の非同期 IO については、この記事を読んだ後に私の Python 同時プログラミング コルーチン/非同期 IO を参照してください。
追記: まだ Python2.x にこだわりたい場合は、最初に futures モジュールをインストールしてください。pip install futures
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
実行結果をもとに分析してみましょう。
submit メソッドを使用してスレッド プールにタスクを追加し、submit は Future オブジェクト を返します。Future オブジェクトは、将来に完了する操作として単純に理解できます。最初の print ステートメントでは、メインスレッドを一時停止するために time.sleep(3) を使用したため、time.sleep(2) が原因で future1 が完了していないことは明らかです。したがって、2 番目の print ステートメントに関しては、次のようになります。スレッド プール ここでのタスクはすべて完了しました。 ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py
ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py
ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
以下は実行結果です
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
map/waitを使用してスレッドプール/プロセスプールを操作します
submitオペレーションの使い方の復習
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
。 ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
ことがわかり、記述されたコードはより簡潔で直感的です。特定の内容に応じていずれかを選択できます。ニーズ。 ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes
次の例で 3 つのパラメータの違いを見てみましょう
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
デフォルトの ALL_COMPLETED が使用されている場合、プログラムはスレッド プール内のすべてのタスクが完了するまでブロックされます。
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
FIRST_COMPLETED パラメータを使用すると、プログラムはスレッド プール内のすべてのタスクが完了するまで待機しません。
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
思考問題
以上がPython 同時プログラミング スレッド プール/プロセス プールの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。