Python 標準ライブラリは、対応するマルチスレッド/マルチプロセスコードを記述するためのスレッド化モジュールとマルチプロセッシングモジュールを提供しますが、プロジェクトが一定の規模に達すると、プロセスまたはスレッドの作成/破棄が頻繁に発生します。現時点では、スペースと時間を交換するために独自のスレッド プール/プロセス プールを作成する必要があります。しかし、Python 3.2 以降、標準ライブラリは concurrent.futures モジュールを提供します。これは、ThreadPoolExecutor と ProcessPoolExecutor の 2 つのクラスを提供します。これは、スレッドとマルチプロセッシングのさらなる抽象化を実現し、スレッドの作成に役立ちます。 /Process プールは直接サポートを提供します。
concurrent.futures モジュールは Executor に基づいており、Executor は 抽象クラス であり、直接使用することはできません。ただし、提供される 2 つのサブクラス ThreadPoolExecutor と ProcessPoolExecutor は、名前が示すように、それぞれスレッド プール コードとプロセス プール コードの作成に使用されます。対応するタスクをスレッド プール/プロセス プールに直接配置でき、デッドロックを心配するためにキューを維持する必要はありません。スレッド プール/プロセス プールが自動的にスケジュールを設定します。
将来この概念は、Javajsプログラミングを経験している友人にはよく知られていると思いますこれは、将来完成する操作として理解できます。従来のプログラミング モードでは、たとえば、queue.get を操作すると、結果が返されるのを待つ前にブロックが発生し、CPU を解放して他の作業を行うことができなくなります。Future の導入により、待機中に他の操作を完了できるようになります。 。 Python の非同期 IO については、この記事を読んだ後に私の Python 同時プログラミング コルーチン/非同期 IO を参照してください。
追記: まだ Python2.x に固執している場合は、まずfutures モジュールをinstallしてください。
pip install futures
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
submit メソッドを使用してスレッド プールにタスクを追加し、submit は Future オブジェクト を返します。Future オブジェクトは、将来に完了する操作として単純に理解できます。最初の print ステートメントでは、メインスレッドを一時停止するために time.sleep(3) を使用したため、time.sleep(2) が原因で future1 が完了していないことは明らかです。したがって、2 番目の print ステートメントに関しては、次のようになります。スレッド プール ここでのタスクはすべて完了しました。
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
api はスレッド プールとまったく同じなので、詳しく説明しません。
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
as_completedはURLSリスト要素の順番で返されません。
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
map は URLS リストの要素を順番に返します ことがわかり、記述されたコードはより簡潔で直感的です。特定の内容に応じていずれかを選択できます。ニーズ。
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
(セット) が含まれており、1 つは完了済み (completed)、もう 1 つは未完了 (uncompleted) です。 wait メソッドを使用する利点の 1 つは、FIRST_COMPLETED、FIRST_EXCEPTION、および ALL_COMPLETE という 3 つのパラメーターを自由に使用できることです。 次の例で 3 つのパラメータの違いを見てみましょう
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
デフォルトの ALL_COMPLETED が使用されている場合、プログラムはスレッド プール内のすべてのタスクが完了するまでブロックされます。
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
FIRST_COMPLETED パラメータを使用すると、プログラムはスレッド プール内のすべてのタスクが完了するまで待機しません。
りー
以上がPython 同時プログラミングにおけるスレッド プール/プロセス プールの詳細な紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。