過去 2 日間で、Python のマルチプロセッシング モジュールをレビューしました。パイプとキューの 2 つの IPC メソッドについて説明しました。 ipc はプロセス間通信モードで、一般的に使用されるものの半分はソケット、rpc、パイプ、メッセージ キューです。
今日はまたパイプとキューをいじってみます。
コードは次のとおりです | |
#coding:utf-8 マルチプロセッシングをインポートします 輸入時間 def proc1(パイプ): True の間: xrange(10000) の私: print "%s を送信"%i パイプ.send(i) 時間.睡眠(1) def proc2(パイプ): True の間: print 'proc2 は次を受け取ります:',pipe.recv() 時間.睡眠(1) def proc3(パイプ): True の間: print 'proc3 は次を受け取ります:',pipe.recv() 時間.睡眠(1) #パイプを作る パイプ = multiprocessing.Pipe() プリントパイプ # パイプの終端を処理1に渡します p1 = multiprocessing.Process(target=proc1, args=(pipe[0],)) # パイプのもう一方の端をプロセス 2 に渡します p2 = multiprocessing.Process(target=proc2, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join() |
マルチプロセッシング パイプだけでなく、他のパイプ実装も、私があなたに渡してあなたがそれを受け取るか、あなたが来て私がそれを受け取るかのような 2 つのプロセス間の単なるゲームです。 もちろん二重状態にすることも可能です。
キューの場合、より多くのプロセスが参加できます。使用法は他のキューと同様です。
公式 Web サイトのドキュメントをご覧ください:
multiprocessing.Pipe([duplex])
パイプの端を表す Connection オブジェクトのペア (conn1、conn2) を返します。
#2 つのパイプ オブジェクト。これら 2 つのオブジェクトを使用して相互に通信します。
duplex が True (デフォルト) の場合、パイプは双方向です。 duplex が False の場合、パイプは単方向です。conn1 はメッセージの受信にのみ使用でき、conn2 はメッセージの送信にのみ使用できます。
クラス multiprocessing.Queue([maxsize])
パイプといくつかのロック/セマフォを使用して実装されたプロセス共有キューを返します。プロセスが最初にアイテムをキューに置くと、オブジェクトをバッファからパイプに転送するフィーダー スレッドが開始されます。
#キューの最大数
標準ライブラリの Queue モジュールからの通常の Queue.Empty 例外と Queue.Full 例外は、タイムアウトを通知するために発生します。
Queue は、task_done() と join() を除く、Queue.Queue のすべてのメソッドを実装します。
qsize()
マルチスレッド/マルチプロセッシングのセマンティクスのため、キューのおおよそのサイズを返します。
#キューサイズ
sem_getvalue() が実装されていない Mac OS X などの Unix プラットフォームでは、これにより NotImplementedError が発生する可能性があることに注意してください。
空()
キューが空の場合は True を返し、そうでない場合は False を返します。マルチスレッド/マルチプロセスのセマンティクスのため、これは信頼できません。
#穴が開いているかどうか。 空の場合は、True ステータスを返します。
いっぱい()
キューがいっぱいの場合は True を返し、そうでない場合は False を返します。マルチスレッド/マルチ処理のセマンティクスのため、これは信頼できません。
#キューのステータスがフルかどうか。
put(obj[, block[, timeout]])
obj をキューに入れます。オプションの引数 block が True (デフォルト) で、timeout が None (デフォルト) の場合、空きスロットが使用可能になるまで必要に応じてブロックされます。 timeout が正の数の場合は、最長 timeout 秒でブロックされます。その時間内に空きスロットがなかった場合は Queue.Full 例外が発生します。それ以外の場合 (ブロックが False)、空きスロットがすぐに利用できる場合は項目をキューに置き、それ以外の場合は Queue.Full 例外を発生させます (タイムアウトは無視されます)。場合)。
#それをキューに入れると、タイムアウトを追加できます。
put_nowait(obj)
put(obj, False) と同等です。
#ここは渋滞なし
get([ブロック[, タイムアウト]])
キューから項目を削除して返します。オプションの args block が True (デフォルト) で、timeout が None (デフォルト) の場合、項目が使用可能になるまで必要に応じてブロックされます。 timeout が正の数値の場合、最大タイムアウト秒数でブロックされます。それ以外の場合 (ブロックが False)、すぐに使用可能な項目がある場合は項目を返し、それ以外の場合は Queue.Empty 例外を発生させます (その場合、タイムアウトは無視されます)。
#getstatus
get_nowait()
get(False) と同等です。
#ブロックせずにキュー内のデータを取得します
Queue には、Queue.Queue にはない追加のメソッドがいくつかあります。これらのメソッドは、通常、ほとんどのコードには不要です。
閉じる()
現在のプロセスによってこれ以上データがこのキューに置かれないことを示します。これは、キューがガベージ コレクションされるときに自動的に呼び出されます。
#閉じて、現在のプロセスのリソースを保存します。
マルチプロセッシングキューの長さを 3 に設定しました。次に、4 番目のキューを配置すると、誰かがデータを取得して 1 つを削除するのを待っていることがわかります。その時点で、キューはブロックされ続ける可能性があります。 。 入力。 put_nowait() を使用すると、キューが制限を超えるとすぐにエラーが発生します。
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.pyc in put_nowait(self, obj)
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.pyc in put(self, obj, block, timeout)
以下はテスト コードの一部です。学生はデモを実行してその感触を得ることができます。
コードは次のとおりです | |
#coding:utf-8 OSをインポート マルチプロセッシングをインポートします 輸入時間 # ワーカーに手紙を書きます def inputQ(キュー): True の間: info = "プロセス ID %s: 時刻: %s"%(os.getpid(),int(time.time())) queue.put(情報) 時間.睡眠(1) #ワーカーをゲット def OutputQ(キュー,ロック): True の間: info = queue.get() #lock.acquire() print (str(os.getpid()) + '(get):' + info) #lock.release() 時間.睡眠(1) #==================== #メイン Record1 = [] # 入力プロセスを保存します Record2 = [] # 出力プロセスを保存します lock = multiprocessing.Lock() # 印刷が汚くなるのを防ぐため キュー = multiprocessing.Queue(3) # 入力プロセス 範囲内 (10) の場合: process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() Record1.append(処理) #出力プロセス 範囲内 (10) の場合: process = multiprocessing.Process(target=outputQ,args=(queue,lock)) process.start() Record2.append(処理) |
さて、パイプとキューの使用法について簡単に説明しましょう。 実は、当初は Python パイプについて話したかったのですが、Google で検索したところ、マルチプロセッシング パイプが見つかりました。パイプを書いた後、記事の内容が少なすぎると感じたので、追加のキューを追加しました。 。 。