Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

WBOY
リリース: 2023-04-15 20:07:01
転載
1373 人が閲覧しました

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み


スレッドとプロセスの違い

プロセス (プロセス) とスレッド (スレッド) は、オペレーティング システムの基本概念です。 、しかし、それらは比較的抽象的で理解しにくいです。マルチプロセスとマルチスレッドに関して、教科書で最も古典的な文章は「プロセスはリソース割り当ての最小単位であり、スレッドは CPU スケジューリングの最小単位です。」です。スレッドは、プログラム内の単一の連続した制御フローです。プロセス内の比較的独立したスケジュール可能な実行単位。システムが独立して CPU をスケジュールし、割り当てるための基本単位です。実行中のプログラムのスケジューリング単位を指します。複数のスレッドを同時に実行して、1 つのプログラム内で異なるタスクを完了することをマルチスレッドと呼びます。

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

#プロセスとスレッドの違い

プロセスは、リソース割り当ての基本単位です。プロセスに関連するすべてのリソースはプロセス制御ブロック PCB に記録されます。プロセスがこれらのリソースを所有しているか、使用していることを示します。さらに、プロセスはプロセッサをプリエンプトするスケジューリング単位でもあり、完全な仮想アドレス空間を持っています。プロセスがスケジュールされると、異なるプロセスは異なる仮想アドレス空間を持ち、同じプロセス内の異なるスレッドは同じアドレス空間を共有します。

プロセスに対応するスレッドは、リソースの割り当てとは関係なく、特定のプロセスに属し、そのプロセスのリソースをプロセス内の他のスレッドと共有します。スレッドは、関連するスタック (システム スタックまたはユーザー スタック) レジスタとスレッド制御テーブル TCB のみで構成されます。レジスタはスレッド内のローカル変数を格納するために使用できますが、他のスレッドに関連する変数を格納することはできません。

通常、プロセスには複数のスレッドを含めることができ、プロセスが所有するリソースを利用できます。スレッドを導入したオペレーティング システムでは、通常、プロセスがリソース割り当ての基本単位とみなされ、スレッドが独立動作および独立スケジューリングの基本単位とみなされます。

スレッドはプロセスよりも小さく、基本的にシステム リソースを所有しないため、スレッドをスケジュールするためのオーバーヘッドがはるかに小さくなり、システム内の複数のプログラム間の同時実行の度合いをより効率的に高めることができます。システムリソースの使用率とスループットが向上します。

したがって、近年発売された汎用オペレーティング システムでは、システムの同時実行性をさらに向上させるためにスレッドが導入されており、それが現代のオペレーティング システムの重要な指標と見なされています。

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

スレッドとプロセスの違いは、次の 4 つの点に要約できます。

  • アドレス空間とその他のリソース (オープン ファイルなど) : プロセスは相互に対話します。独立しており、同じプロセスのスレッド間で共有されます。プロセス内のスレッドは、他のプロセスには表示されません。
  • 通信: プロセス間通信 IPC、スレッドはプロセス データ セグメント (グローバル変数など) を直接読み書きして通信できます。データの一貫性を確保するには、プロセス同期と相互排他手段の支援が必要です。
  • スケジューリングと切り替え: スレッド コンテキストの切り替えは、プロセス コンテキストの切り替えよりもはるかに高速です。
  • マルチスレッド OS では、プロセスは実行可能なエンティティではありません。

#マルチプロセスとマルチスレッドの比較

##データの共有と同期##スレッド優位性##作成、破棄、切り替えプログラミングとデバッグプロセスは相互に影響しません

#比較の次元

複数のプロセス

複数のスレッド

概要

データ共有は複雑ですが、同期はシンプルです

データ共有はシンプルですが、同期は複雑です

それぞれ独自の長所と短所があります

メモリ、CPU

多くのメモリを消費し、複雑なスイッチングが発生し、低い CPU 使用率

#低いメモリ使用量、シンプルなスイッチング、高い CPU 使用率

複雑、遅い

シンプル、速い

スレッド占有率が優れている

簡単なプログラミングと簡単なデバッグ

# #プログラミングは複雑で、デバッグも複雑

#プロセスの優位性

##信頼性

#1 つのスレッドがハングアップすると、プロセス全体がハングアップします

#プロセスが優位です

分散型

マルチコアおよびマルチマシンに適しており、複数のマシンに簡単に拡張できます

次の用途に適していますマルチコア

プロセスの優位性

要約すると、プロセスとスレッドは電車や車両にたとえることもできます。

  • スレッドはプロセスの下を移動します (単純な車両は走行できません)
  • プロセスには複数のプロセスを含めることができます
  • 異なるプロセス間でデータを共有するのが難しい(駅の乗り換えなど、ある列車に乗っている乗客が別の列車に乗り換えるのが難しい)
  • 同じプロセス内の異なるスレッド間でデータを簡単に共有できます (車両 A を車両 B に変更するのは簡単です)
  • プロセスはスレッドよりも多くのコンピューター リソースを消費します (複数の列車を使用すると、複数の車両より多くのリソースを消費します)
  • プロセスは相互に影響しません。1 つのスレッドがハングアップすると、プロセス全体がハングアップします (ある列車は別の列車に影響を与えませんが、列車の中間車両が火災になった場合、その列車のすべての車両に影響します)
  • このプロセスは複数のマシンに拡張でき、最大複数のコアに適しています (異なる列車が複数の線路で走行でき、同じ列車の車両が異なる線路で走行することはできません)。
  • プロセスによって使用されるメモリ アドレスはロックされる可能性があります。つまり、スレッドが共有メモリを使用する場合、他のスレッドはそのメモリを使用する前に共有メモリの終了を待つ必要があります。 (例:電車のトイレ) ・「Mutex(ミューテックス)」
  • プロセスが使用するメモリアドレスにより使用制限が可能(例:電車内のレストラン、最大人数のみ)入ることができますが、満員の場合は、入る前に誰かが出てくるまでドアで待つ必要があります) - 「セマフォ」

Python グローバル インタープリター ロック GIL

グローバル インタープリター ロック (英語: Global Interpreter Lock、GIL (略称 GIL) は Python の機能ではなく、Python パーサー (CPython) を実装する際に導入された概念です。 CPython はほとんどの環境でデフォルトの Python 実行環境であるためです。したがって、多くの人の概念では、CPython は Python であり、GIL は Python 言語の欠陥であると当然のことと考えています。では、CPython 実装における GIL とは何でしょうか?公式の説明を見てみましょう:

CPython インタープリタが一度に 1 つのスレッドだけが Python バイトコードを実行することを保証するために使用されるメカニズムです。これにより、オブジェクト モデル (重要な組み込みコードを含む) を作成することで CPython の実装が簡素化されます。 dict などの型で) 同時アクセスに対して暗黙的に安全です。インタープリター全体をロックすると、マルチプロセッサ マシンによって提供される並列処理の多くが犠牲になりますが、インタープリターのマルチスレッド化が容易になります。

Pythonコードの実行は、Python 仮想マシン (インタプリタ メイン ループ、CPython バージョンとも呼ばれます) によって制御されます。Python は元々、インタプリタのメイン ループで同時に実行されるスレッドが 1 つだけ、つまり、常に 1 つだけであるように設計されました。 thread インタプリタ内で実行されるスレッド。 Python 仮想マシンへのアクセスは、グローバル インタープリター ロック (GIL) によって制御され、一度に 1 つのスレッドのみが実行されるようになります。

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

GIL の利点は何ですか?簡単に言えば、シングルスレッド環境では高速であり、C ライブラリと組み合わせるとより便利であり、スレッド セーフティの問題を考慮する必要はありません。これは、初期の Python の最も一般的なアプリケーション シナリオであり利点でもありました。さらに、GIL の設計により CPython の実装が簡素化され、辞書などの主要な組み込み型を含むオブジェクト モデルが暗黙的に同時にアクセスできるようになります。グローバル インタプリタをロックすると、マルチスレッド サポートの実装が容易になりますが、マルチプロセッサ ホストの並列コンピューティング機能も失われます。

マルチスレッド環境では、Python 仮想マシンは次のように実行されます。

  1. GIL のセットアップ
  2. 実行するスレッドに切り替えます
  3. 指定された数のバイトコード命令が実行されるまで、またはスレッドが積極的に制御を放棄するまで実行します (sleep(0) を呼び出すことができます)
  4. スレッドをスリープ状態に設定します
  5. GIL のロックを解除します
  6. もう一度上記の手順をすべて繰り返します

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

Python3.2 より前の GIL のリリース ロジックは、現在のスレッドが IO 操作またはティック カウントに遭遇するというものでした。 100 に達しました (ティックは Python 自体とみなすことができます。特に GIL に使用されるカウンターは、リリースごとにゼロにリセットされます。このカウントは、リリース前に sys.setcheckinterval を通じて調整できます)。コンピューティング集約型スレッドは GIL を解放した直後に GIL を申請し、通常は他のスレッドがスケジュールを完了する前に GIL を再取得するため、コンピューティング集約型スレッドは非常に短時間で GIL を取得します。 GIL は、スレッドの実行が終了するまで、長時間占有されます。

Python 3.2 は新しい GIL の使用を開始します。新しい GIL 実装は、固定タイムアウトを使用して、現在のスレッドにグローバル ロックを放棄するように指示します。現在のスレッドがこのロックを保持し、他のスレッドがこのロックを要求すると、現在のスレッドは 5 ミリ秒後に強制的にロックを解放します。この改善により、シングルコアの場合に単一のスレッドが GIL を長時間占有する状況が改善されます。

シングルコア CPU では、数百回の間隔チェックによりスレッドが切り替わります。マルチコア CPU では、深刻なスレッド スラッシングが発生します。 GIL ロックが解放されるたびに、スレッドがロックをめぐって競合し、スレッドが切り替わり、リソースが消費されます。単一コア配下に複数のスレッドがある場合、GIL が解放されるたびに、目覚めたスレッドが GIL ロックを取得できるため、シームレスに実行できますが、マルチコアでは、CPU0 が GIL を解放した後、他の CPU 上のスレッドが競合し、ただし、GIL は CPU0 によってすぐに取得されるため、他のいくつかの CPU で起動されたスレッドが起動し、スケジュールされる状態に入る前に切り替え時間まで待機することになり、スレッドのスラッシングが発生し、効率が低下します。

さらに、上記の実装メカニズムから、Python のマルチスレッドは、CPU を集中的に使用するコードよりも IO を集中的に使用するコードに適していると推測できます。

GIL への対策:

  • Python の上位バージョンを使用します (GIL メカニズムが最適化されています)
  • マルチスレッドの代わりにマルチプロセスを使用します (マルチプロセス GIL はありませんが、プロセス自体がより多くのリソースを消費します)
  • CPU 実行スレッドを指定します (アフィニティ モジュールを使用します)
  • Jython や IronPython などの GIL フリー インタープリターを使用します
  • 完全なマルチスレッドは IO 集中型のタスクにのみ使用されます
  • コルーチン (効率的なシングルスレッド モード、マイクロスレッドとも呼ばれ、通常はマルチプロセスと組み合わせて使用​​されます) を使用します
  • 主要コンポーネントに C/C を使用する Python 拡張機能として記述された ctypes を使用すると、Python プログラムが C 言語でコンパイルされたダイナミック リンク ライブラリのエクスポートされた関数を直接呼び出すことができます。 (GIL 制限を呼び出す nogil を使用)

Python のマルチプロセス パッケージ multiprocessing

Python のスレッド パッケージは主にマルチスレッド開発を使用しますが、GIL の存在により、多くのPython スレッドは実際にはマルチスレッドではないため、マルチコア CPU のリソースを最大限に活用するには、ほとんどの場合複数のプロセスを使用する必要があります。マルチプロセッシング パッケージは Python バージョン 2.6 で導入され、移行を容易にするためにスレッドによって提供される一連のインターフェイスを完全に複製します。唯一の違いは、複数のスレッドではなく複数のプロセスを使用することです。各プロセスには独自の独立した GIL があるため、プロセス間で GIL の競合は発生しません。

このマルチプロセスを使用すると、単一プロセスから同時実行への変換を簡単に完了できます。マルチプロセッシングは、サブプロセス、通信、データの共有をサポートし、さまざまな形式の同期を実行し、プロセス、キュー、パイプ、ロックなどのコンポーネントを提供します。

マルチプロセッシングの背景

マルチプロセッシングのもう 1 つの理由は、Python の GIL への対処に加えて、Windows オペレーティング システムと Linux/Unix システム間の不一致です。

Unix/Linux オペレーティング システムは、非常に特殊な fork() システム コールを提供します。通常の関数は 1 回呼び出されて 1 回戻りますが、fork() は 1 回呼び出されて 2 回戻ります。これは、オペレーティング システムが現在のプロセス (親プロセス) (子プロセス) を自動的にコピーし、親プロセスと子プロセスにそれぞれコピーするためです。 。 戻る。子プロセスは常に 0 を返し、親プロセスは子プロセスの ID を返します。この理由は、親プロセスが多くの子プロセスをフォークアウトできるため、親プロセスは各子プロセスの ID を記録する必要があり、子プロセスは親プロセスの ID を取得するために getpid() を呼び出すだけでよいためです。

Python の os モジュールは、fork を含む一般的なシステム コールをカプセル化し、Python プログラムで子プロセスを簡単に作成できます。

import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
ログイン後にコピー

Linux、Unix、Mac で上記のコードを実行すると、結果は次のようになります。 ##

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
ログイン後にコピー

フォーク呼び出しを使用すると、プロセスが新しいタスクを受け取ると、子プロセスをコピーして新しいタスクを処理できます。一般的な Apache サーバーでは、親プロセスがポートでリッスンします。 http リクエストが行われると、新しい http リクエストを処理するために子プロセスがフォークされます。

Windows にはフォーク呼び出しがないため、上記のコードは Windows 上で実行できません。 Python はクロスプラットフォームであるため、当然、クロスプラットフォームのマルチプロセス サポートを提供する必要があります。マルチプロセッシング モジュールは、マルチプロセス モジュールのクロスプラットフォーム バージョンです。マルチプロセッシング モジュールは fork() 呼び出しをカプセル化するため、fork() の詳細に注意を払う必要はありません。 Windows にはフォーク呼び出しがないため、マルチプロセッシングはフォークの効果を「シミュレート」する必要があります。

マルチプロセッシングの共通コンポーネントと機能

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

管理プロセス モジュールの作成:

    プロセス (プロセスの作成に使用)
  • プール (プロセス プールの作成と管理に使用されます)
  • キュー (プロセス通信、リソース共有に使用されます)
  • 値、配列 (プロセス通信、リソース共有に使用されます)
  • Pipe (パイプ通信用)
  • Manager (リソース共有用)
同期サブプロセスモジュール:

    Condition (条件変数) )
  • イベント
  • ロック (ミューテックス)
  • RLock (再入可能ミューテックス (原因をブロックせずに同じプロセスが複数回取得できます)
  • セマフォ (セマフォ) )
各コンポーネントと関数の具体的な使用法を学習しましょう。

Process (プロセスの作成用)

マルチプロセッシング モジュールは、プロセスを表す Process クラスを提供します。プロセス オブジェクト。

マルチプロセッシングでは、各プロセスは Process クラスによって表されます。

構築メソッド: Process([group [, target [, name [, args [, kwargs]]]] ])

  • group:分组,实际上不使用,值始终为None
  • target:表示调用对象,即子进程要执行的任务,你可以传入方法名
  • name:为子进程设定名称
  • args:要传给target函数的位置参数,以元组方式进行传入。
  • kwargs:要传给target函数的字典参数,以字典方式进行传入。

实例方法:

  • start():启动进程,并调用该子进程中的p.run()
  • run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
  • is_alive():返回进程是否在运行。如果p仍然运行,返回True
  • join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍:

  • daemon:默认值为False,如果设为True,代表p为后台运行的守护进程;当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程;必须在p.start()之前设置
  • name:进程的名称
  • pid:进程的pid
  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  • authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

使用示例:(注意:在windows中Process()必须放到if name == ‘main’:下)

from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
ログイン後にコピー

Pool(用于创建管理进程池)

Pythonのマルチスレッドとマルチプロセスの詳細な仕組み

Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

构造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes :要创建的进程数,如果省略,将默认使用cpu_count()返回的数量。
  • initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
  • initargs:是要传给initializer的参数组。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

实例方法:

  • apply(func[, args[, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()。它是阻塞的。apply很少使用
  • apply_async(func[, arg[, kwds={}[, callback=None]]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。它是非阻塞。
  • map(func, iterable[, chunksize=None]):Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
  • map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async
  • imap():imap 与 map的区别是,map是当所有的进程都已经执行完了,并将结果返回了,imap()则是立即返回一个iterable可迭代对象。
  • imap_unordered():不保证返回的结果顺序与进程添加的顺序一致。
  • close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
  • join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用,让其不再接受新的Process。
  • terminate():结束工作进程,不再处理未处理的任务。

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法:

  • get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
  • ready():如果调用完成,返回True
  • successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
  • wait([timeout]):等待结果变为可用。
  • terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
<span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># -*- coding:utf-8 -*-</span><br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Queue(用于进程通信,资源共享)</span><br><span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># Pool+map</span><br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">from</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">multiprocessing</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">import</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span><br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">def</span> <span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>):<br><span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>)<br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">if</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">__name__</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">==</span> <span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"__main__"</span>:<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">lists</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">range</span>(<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">100</span>)<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span>(<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">8</span>)<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">map</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>, <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">lists</span>)<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">close</span>()<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">join</span>()<br>
ログイン後にコピー
<span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># -*- coding:utf-8 -*-</span><br><span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 异步进程池(非阻塞)</span><br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">from</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">multiprocessing</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">import</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span><br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">def</span> <span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>):<br><span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>)<br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">if</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">__name__</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">==</span> <span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"__main__"</span>:<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span>(<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">8</span>)<br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">for</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">in</span> <span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">range</span>(<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">100</span>):<br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">For循环中执行步骤:</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">apply_async</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>, <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">args</span><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>,))<span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.</span><br><span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"test"</span>)<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">close</span>()<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">join</span>()<br>
ログイン後にコピー
<span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># -*- coding:utf-8 -*-</span><br><span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 异步进程池(非阻塞)</span><br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">from</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">multiprocessing</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">import</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span><br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">def</span> <span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>):<br><span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>)<br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">if</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">__name__</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">==</span> <span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"__main__"</span>:<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">Pool</span>(<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">8</span>)<br><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">for</span> <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span> <span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">in</span> <span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">range</span>(<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">100</span>):<br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">实际测试发现,for循环内部执行步骤:</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(1)遍历100个可迭代对象,往进程池放一个子进程</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">(2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">for循环执行完毕,再执行print函数。</span><br><span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">'''</span><br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">apply</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">test</span>, <span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">args</span><span  style="color: rgb(215, 58, 73); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">=</span>(<span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">i</span>,))<span  style="color: rgb(106, 115, 125); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);"># 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.</span><br><span  style="color: rgb(111, 66, 193); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">print</span>(<span  style="color: rgb(102, 153, 0); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">"test"</span>)<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">close</span>()<br><span  style="color: rgb(89, 89, 89); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">pool</span>.<span  style="color: rgb(0, 92, 197); margin: 0px; padding: 0px; background: none 0% 0% / auto repeat scroll padding-box border-box rgba(0, 0, 0, 0);">join</span>()<br>
ログイン後にコピー

Queue(用于进程通信,资源共享)

在使用多进程的过程中,最好不要使用共享资源。普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。

Queue是用来创建进程间资源共享的队列的类,使用Queue可以达到多进程间数据传递的功能(缺点:只适用Process类,不能在Pool进程池中使用)。

构造方法:Queue([maxsize])

  • maxsize是队列中允许最大项数,省略则无大小限制。

实例方法:

  • put():用以插入数据到队列。put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
  • get():可以从队列读取并且删除一个元素。get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。若不希望在empty的时候抛出异常,令blocked为True或者参数全部置空即可。
  • get_nowait():同q.get(False)
  • put_nowait():同q.put(False)
  • empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
  • full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  • qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

使用示例:

from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()# 等待pw结束
pr.terminate()# pr进程里是死循环,无法等待其结束,只能强行终止
ログイン後にコピー

JoinableQueue就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

构造方法:JoinableQueue([maxsize])

  • maxsize:队列中允许最大项数,省略则无大小限制。

实例方法

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

  • task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

使用示例:

# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print('消费者拿到了 %s' % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生产者做好了 %s' % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = ('产品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True# 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
p.start()
producer(seq, q)
print('主线程')
ログイン後にコピー

Value,Array(用于进程通信,资源共享)

multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。

Value

构造方法:Value((typecode_or_type, args[, lock])

  • typecode_or_type:定义ctypes()对象的类型,可以传Type code或 C Type,具体对照表见下文。
  • args:传递给typecode_or_type构造函数的参数
  • lock:默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。

typecode_or_type支持的类型:

| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char| int | 1 |
| `'B'` | unsigned char| int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long| int | 4 |
| `'L'` | unsigned long| int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float| float | 4 |
| `'d'` | double | float | 8 |
ログイン後にコピー

参考地址:https://docs.python.org/3/library/array.html

Array

构造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

  • typecode_or_type:同上
  • size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。
  • kwds:传递给typecode_or_type构造函数的参数
  • lock:同上

使用示例:

import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
ログイン後にコピー

注意:Value和Array只适用于Process类。

Pipe(用于管道通信)

多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。

构造方法:Pipe([duplex])

  • dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

实例方法:

  • send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
  • recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
  • fileno():返回连接使用的整数文件描述符
  • poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
  • recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
  • send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
  • recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

使用示例:

from multiprocessing import Process, Pipe
import time
# 子进程执行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了吗")
print("来自父亲的问候:", Subconn.recv())
Subconn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()# 创建管道两端
p = Process(target=f, args=(child_conn,))# 创建子进程
p.start()
print("来自儿子的问候:", parent_conn.recv())
parent_conn.send("嗯")
ログイン後にコピー

Manager(用于资源共享)

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。Manager模块常与Pool模块一起使用。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

管理器是独立运行的子进程,其中存在真实的对象,并以服务器的形式运行,其他进程通过使用代理访问共享对象,这些代理作为客户端运行。Manager()是BaseManager的子类,返回一个启动的SyncManager()实例,可用于创建共享对象并返回访问这些共享对象的代理。

BaseManager,创建管理器服务器的基类

构造方法:BaseManager([address[, authkey]])

  • address:(hostname,port),指定服务器的网址地址,默认为简单分配一个空闲的端口
  • authkey:连接到服务器的客户端的身份验证,默认为current_process().authkey的值

实例方法:

  • start([initializer[, initargs]]):启动一个单独的子进程,并在该子进程中启动管理器服务器
  • get_server():获取服务器对象
  • connect():连接管理器对象
  • shutdown():关闭管理器对象,只能在调用了start()方法之后调用

实例属性:

  • address:只读属性,管理器服务器正在使用的地址

SyncManager,以下类型均不是进程安全的,需要加锁..

实例方法:

  • Array(self,*args,**kwds)
  • BoundedSemaphore(self,*args,**kwds)
  • Condition(self,*args,**kwds)
  • Event(self,*args,**kwds)
  • JoinableQueue(self,*args,**kwds)
  • Lock(self,*args,**kwds)
  • Namespace(self,*args,**kwds)
  • Pool(self,*args,**kwds)
  • Queue(self,*args,**kwds)
  • RLock(self,*args,**kwds)
  • Semaphore(self,*args,**kwds)
  • Value(self,*args,**kwds)
  • dict(self,*args,**kwds)
  • list(self,*args,**kwds)

使用示例:

import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
# 子进程执行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了吗")
print("来自父亲的问候:", Subconn.recv())
print(x.value)
print(arr)
print(l)
print(d)
print(n)
ログイン後にコピー

同步子进程模块

Lock(互斥锁)

Lock锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

构造方法:Lock()

实例方法:

  • acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
  • release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

使用示例:

from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock()# 这个一定要定义为全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
ログイン後にコピー

RLock(可重入的互斥锁(同一个进程可以多次获得它,同时不会造成阻塞)

RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

构造方法:RLock()

实例方法:

  • acquire([timeout]):同Lock
  • release(): 同Lock

Semaphore(信号量)

信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁。

构造方法:Semaphore([value])

  • value:设定信号量,默认值为1

实例方法:

  • acquire([timeout]):同Lock
  • release(): 同Lock

使用示例:

from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print('%s 占到一个茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
ログイン後にコピー

Condition(条件变量)

可以把Condition理解为一把高级的锁,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。Condition在内部维护一个锁对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与锁的acquire, release方法一致,其实它只是简单的调用内部锁对象的对应的方法而已。Condition还提供了其他的一些方法。

构造方法:Condition([lock/rlock])

  • 可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

实例方法:

  • acquire([timeout]):首先进行acquire,然后判断一些条件。如果条件不满足则wait
  • release():释放 Lock
  • wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。处于wait状态的线程接到通知后会重新判断条件。
  • notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
  • notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

使用示例:

import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
 target=stage_1,
 args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
ログイン後にコピー

Event(事件)

Event内部包含了一个标志位,初始的时候为false。可以使用set()来将其设置为true;或者使用clear()将其从新设置为false;可以使用is_set()来检查标志位的状态;另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。

使用示例:

import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
ログイン後にコピー

其他内容

multiprocessing.dummy 模块与 multiprocessing 模块的区别:dummy 模块是多线程,而 multiprocessing 是多进程, api 都是通用的。所有可以很方便将代码在多线程和多进程之间切换。multiprocessing.dummy通常在IO场景可以尝试使用,比如使用如下方式引入线程池。

from multiprocessing.dummy import Pool as ThreadPool
ログイン後にコピー

multiprocessing.dummy与早期的threading,不同的点好像是在多多核CPU下,只绑定了一个核心(具体未考证)。

参考文档:

Python并发之concurrent.futures

Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。concurrent.futures基础模块是executor和future。

Executor

Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

ThreadPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用。

class concurrent.futures.ThreadPoolExecutor(max_workers)
ログイン後にコピー

使用max_workers数目的线程池执行异步调用。

ProcessPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)
ログイン後にコピー

使用max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。

submit()方法

Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。

Executor.submit(fn, *args, **kwargs)

  • fn:需要异步执行的函数
  • *args, **kwargs:fn参数

使用示例:

from concurrent import futures
def test(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())
ログイン後にコピー

map()方法

除了submit,Exectuor还为我们提供了map方法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。

Executor.map(func, *iterables, timeout=None)

  • func:需要异步执行的函数
  • *iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
  • timeout:设置每次异步操作的超时时间,timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。

使用示例:

from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
ログイン後にコピー

shutdown()方法

释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。

Executor.shutdown(wait=True)

Future

Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

Future类封装了可调用的异步执行。Future 实例通过 Executor.submit()方法创建。

  • cancel():试图取消调用。如果调用当前正在执行,并且不能被取消,那么该方法将返回False,否则调用将被取消,方法将返回True。
  • cancelled():如果成功取消调用,返回True。
  • running():如果调用当前正在执行并且不能被取消,返回True。
  • done():如果调用成功地取消或结束了,返回True。
  • result(timeout=None):返回调用返回的值。如果调用还没有完成,那么这个方法将等待超时秒。如果调用在超时秒内没有完成,那么就会有一个Futures.TimeoutError将报出。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。
  • exception(timeout=None):返回调用抛出的异常,如果调用还未完成,该方法会等待timeout指定的时长,如果该时长后调用还未完成,就会报出超时错误futures.TimeoutError。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。如果调用完成并且无异常报出,返回None.
  • add_done_callback(fn):将可调用fn捆绑到future上,当Future被取消或者结束运行,fn作为future的唯一参数将会被调用。如果future已经运行完成或者取消,fn将会被立即调用。
  • wait(fs, timeout=None, return_when=ALL_COMPLETED)
  • 等待fs提供的 Future 实例(possibly created by different Executor instances) 运行结束。返回一个命名的2元集合,分表代表已完成的和未完成的
  • return_when 表明什么时候函数应该返回。它的值必须是一下值之
  • FIRST_COMPLETED :函数在任何future结束或者取消的时候返回。
  • FIRST_EXCEPTION :函数在任何future因为异常结束的时候返回,如果没有future报错,效果等于
  • ALL_COMPLETED :函数在所有future结束后才会返回。
  • as_completed(fs, timeout=None):参数是一个 Future 实例列表,返回值是一个迭代器,在运行结束后产出 Future实例 。

使用示例:

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)
ログイン後にコピー

参考链接:

以上がPythonのマルチスレッドとマルチプロセスの詳細な仕組みの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:51cto.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート