Python 同時プログラミング スレッド プール/プロセス プール

巴扎黑
リリース: 2017-03-18 11:37:10
オリジナル
1855 人が閲覧しました

はじめに

Python 標準ライブラリは、対応するマルチスレッド/マルチプロセス コードを作成するためのスレッド化モジュールとマルチプロセッシング モジュールを提供します。ただし、プロジェクトが一定の規模に達すると、プロセスまたはスレッドの頻繁な作成/破棄は非常にリソースがかかります。現時点では、独自のスレッド プール/プロセス プールを作成するには、スペースと時間を引き換えにします。しかし、Python 3.2 以降、標準ライブラリは concurrent.futures モジュールを提供します。これは、ThreadPoolExecutor と ProcessPoolExecutor の 2 つのクラスを提供し、スレッドとマルチプロセッシングのさらなる抽象化を実現し、スレッド プール/プロセス プールの作成を容易にします。サポートが提供されます。

Executor と Future

concurrent.futures モジュールは Executor に基づいており、Executor は抽象クラスであるため、直接使用することはできません。ただし、提供される 2 つのサブクラス ThreadPoolExecutor と ProcessPoolExecutor は、名前が示すように、それぞれスレッド プール コードとプロセス プール コードの作成に使用されます。対応するタスクをスレッド プール/プロセス プールに直接配置でき、デッドロックを心配するためにキューを維持する必要はありません。スレッド プール/プロセス プールが自動的にスケジュールを設定します。

この概念は、Java や Nodejs のプログラミング経験のある友人には馴染みのあるものだと思います これは、従来のプログラミング モデルの基礎です。私たちの例のように、queue.get を操作すると、結果が返されるのを待つ前にブロックが発生し、Future の導入により、待機中に他の操作を完了することができます。 Python の非同期 IO については、この記事を読んだ後に私の Python 同時プログラミング コルーチン/非同期 IO を参照してください。

追記: まだ Python2.x にこだわりたい場合は、最初に futures モジュールをインストールしてください。

pip install futures
ログイン後にコピー
スレッドプール/プロセスプールを操作するにはsubmitを使用します

まずは以下のコードを通してスレッドプールの概念を理解しましょう

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果
ログイン後にコピー

実行結果をもとに分析してみましょう。

submit

メソッドを使用してスレッド プールにタスクを追加し、submit は Future オブジェクト を返します。Future オブジェクトは、将来に完了する操作として単純に理解できます。最初の print ステートメントでは、メインスレッドを一時停止するために time.sleep(3) を使用したため、time.sleep(2) が原因で future1 が完了していないことは明らかです。したがって、2 番目の print ステートメントに関しては、次のようになります。スレッド プール ここでのタスクはすべて完了しました。

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py
ログイン後にコピー
上記のコードをプロセス プールの形式に書き直すこともできます。API とスレッド プールはまったく同じなので、詳しく説明しません。

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())
ログイン後にコピー

以下は実行結果です

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py
ログイン後にコピー

map/waitを使用してスレッドプール/プロセスプールを操作します

Exectuorはsubmitに加えて、組み込みのmapの使用法と同様のmapメソッドも提供します。以下に 2 つの例を使用して、2 つの違いを比較してみましょう。

submitオペレーションの使い方の復習

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
ログイン後にコピー

実行結果からわかるように、

as_completedはURLSリスト要素の順番で返されません

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes
ログイン後にコピー
map を使用する

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))
ログイン後にコピー

実行結果から、

map は URLS リストの要素を順番に返します

ことがわかり、記述されたコードはより簡潔で直感的です。特定の内容に応じていずれかを選択できます。ニーズ。

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes
ログイン後にコピー
3 番目のオプションは wait です

wait メソッドはタプル (タプル) を返します。タプルには 2 つのセット (セット) が含まれており、1 つは完了済み (completed)、もう 1 つは未完了 (uncompleted) です。 wait メソッドを使用する利点の 1 つは、FIRST_COMPLETED、FIRST_EXCEPTION、および ALL_COMPLETE という 3 つのパラメーターを自由に使用できることです。

次の例で 3 つのパラメータの違いを見てみましょう

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
ログイン後にコピー

デフォルトの ALL_COMPLETED が使用されている場合、プログラムはスレッド プール内のすべてのタスクが完了するまでブロックされます。

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
ログイン後にコピー

FIRST_COMPLETED パラメータを使用すると、プログラムはスレッド プール内のすべてのタスクが完了するまで待機しません。

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})
ログイン後にコピー

思考問題

multiprocessing.pool(ThreadPool) と ProcessPollExecutor(ThreadPoolExecutor) の実行効率の差を比較する小さなプログラムを書き、上記の Future に基づいてなぜそのような結果が生じるのかを考えてください。

以上がPython 同時プログラミング スレッド プール/プロセス プールの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート