ホームページ > バックエンド開発 > Python チュートリアル > Pythonのキューとマルチプロセス

Pythonのキューとマルチプロセス

高洛峰
リリース: 2017-02-25 10:10:02
オリジナル
1350 人が閲覧しました

最近、複数の仮想マシンでタスクを実行する必要があるプロジェクトに遭遇したので、他の人の以前のプロジェクトのコードを参照してマルチプロセスを使用して処理したため、Pythonのマルチプロセスについてオンラインで調べました

1 . まずは Queue (Queue オブジェクト) について話しましょう

Queue は Python の標準ライブラリで、直接インポートして参照することができます。 以前勉強していたときに、有名な「eat first, pull first」と「eat first,」を聞きました。 「最初に嘔吐します」、これが実際にここで言及されているキューです。キューを構築するときに、その容量を定義できます。過負荷にしないでください。食べすぎるとエラーが報告されます。構築時にこれを書き込まないと、または 1 未満の数値を書き込むと、無限を意味します。 ')

q.put(4)

q.put(['yan ','xing'])

キュー内の値を取得get()

デフォルトのキューは最初にありますまずは

>>> q.get()'yang'>> q.get()

4

>>> ', 'xing']

キューが空の場合、再度 get を使用するとブロックされるため、キューを取得する場合は、通常、

get_nowait() メソッドが Empty をスローします。空のキューから値を取得する場合は例外です

したがって、より一般的な方法は、キューが空かどうかを最初に判断することです


キューで一般的に使用されるメソッド


。 () キューのサイズを返す
Queue.empty() キューが空の場合はTrueを返し、それ以外の場合はFalse
Queue.full() If キューが満杯の場合はTrueを返し、それ以外の場合はFalse

Queue.get([ block[, timeout]]) キューを取得、タイムアウト待機時間

Queue.get_nowait() は Queue.get(False)

ノンブロッキング Queue.put(item) キューに書き込み、タイムアウト待機時間

Queue と同等です。 put_nowait(item) は Queue.put(item, False) と同等です

2. マルチプロセスでサブプロセスの概念を使用します

マルチプロセスインポートプロセスから

プロセスプロセスを通じてサブプロセスを構築できます

p = Process(target=fun,args=(args))

次に、p.start() を使用して子プロセスを開始します

次に、p.join() メソッドを使用して、親プロセスを実行する前に子プロセスの実行を終了させますプロセス

from multiprocessing import Process
import os
 
# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'
ログイン後にコピー

3. マルチプロセスでプールを使用する

複数の子プロセスが必要な場合は、マルチプロセスインポートプールからプロセスプール(プール)を使用して管理

することを検討できます。

プール内にサブプロセスを作成する方法はプロセスとは異なり、

Pythonのキューとマルチプロセスp.apply_async(func,args=(args))によって実装されます。コンピューターの CPU の数 (私の場合など) このコンピューターには 4 つの CPU が搭載されています。この場合、サブプロセス task0、task1、task2、および task3 は、前のプロセスが終了した後にのみ開始できます。実際に上記のプログラムを実行した結果は、上図の1、2、3の処理に従い、最初に1を出力し、3秒後に2を出力し、3秒後に3を出力するコードのp.close()です。プロセスを追加した後、Pool オブジェクトの join() メソッドを呼び出すと、すべての子プロセスが実行を完了するまで待機してから、join() を呼び出す必要があります。 close() を呼び出すと、新しいプロセスを追加できません。

その時点でインスタンスプールのプロセス数を定義することもできます上記のコードでp=Pool(5)の場合、すべてのサブプロセスを同時に実行できます

3.複数のサブプロセス間

複数のサブプロセス間の通信には、最初のステップで説明したキューの使用が必要です。たとえば、次の要件がある場合、1 つのサブプロセスがキューにデータを書き込み、別のプロセスがデータを受け取ります。キューから、

from multiprocessing import Pool
import os, time
 
def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))
 
if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'
ログイン後にコピー

4. 上記のコードに関するいくつかの興味深い質問 Pythonのキューとマルチプロセス

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start() 
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'
ログイン後にコピー

main 関数が上記のサンプルのように記述されている場合、私が本来望んでいるのは、キューを作成し、それをパラメーターとして渡します。ただし、エラー

RuntimeError: Queue object should not be available only through infection

を確認すると、キュー オブジェクトは共有できないということになります。親プロセスと子プロセスの間で通信します。プロセスプールでキューを使用したい場合は、multiprocess の Manager クラスを使用する必要があります

if __name__=='__main__': 
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'
ログイン後にコピー

このようにして、キューオブジェクトは親プロセス間で通信できます。プールを使用しない場合は、Manager クラスを後で拡張できます。

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk
 
from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random
 
# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value  
  q.put(value)  
 lock.release() #释放锁 
 
# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break
 
if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock)) 
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()
 
 print
 print '所有数据都写入并且读完'
ログイン後にコピー

更多Pythonのキューとマルチプロセス相关文章请关注PHP中文网!

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート