この記事では主にPythonのマルチプロセスとプロセスプール(Processing library)の詳しい解説を紹介していますので、必要な方は参考にしてください
環境:win7+python2.7
私は以前からマルチプロセスやマルチスレッドを学びたいと思っていましたが、基礎知識と簡単な概要を見ただけでは、それをどのように応用するのか理解できませんでした。少し前に、github でクローラプロジェクトを目にしました。マルチプロセスとマルチスレッド関連のコンテンツを調べました。次に、関連するナレッジ ポイントといくつかのアプリケーションを記録として書き留めます
まず、プロセスとは何かについて説明します。コンピュータ上のプログラムのアクティビティ。プログラムが実行されると、プロセスがシステム プロセスとユーザー プロセスに分けられます。実行状態のオペレーティング システム自体と、ユーザーが開始したすべてのプロセスはユーザー プロセスです。プロセスは、オペレーティング システムがリソースを割り当てる単位です。
直感的に言えば、タスク マネージャーで system とマークされているユーザー名がシステム プロセスであり、管理者とマークされているものがユーザー プロセスです。さらに、net は netro、lcacal サービスはローカル サービスです。百科事典にありますので、こちらを参照してください。 いくつかの労力を節約する必要があります。そうしないと、元に戻すことができません
1. マルチプロセッシングの簡単な使い方
図に示すように、マルチプロセッシングには複数の機能があります。ここでは、私が現時点で知っていることだけを話します。
プロセスの作成:Process(target=メインの実行関数、name=カスタムプロセス名はオプション、args= (パラメータ))メソッド:
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def fun1(t):
print 'this is fun1',time.ctime()
time.sleep(t)
print 'fun1 finish',time.ctime()
def fun2(t):
print 'this is fun2',time.ctime()
time.sleep(t)
print 'fun2 finish',time.ctime()
if name == 'main':
a=time.time()
p1=Process(target=fun1,args=(4,))
p2 = Process(target=fun2, args=(6,))
p1.start()
p2.start()
p1.join()
p2.join()
b=time.time()
print 'finish',b-a
this is fun2 Mon Jun 05 13:48:04 2017 this is fun1 Mon Jun 05 13:48:04 2017 fun1 finish Mon Jun 05 13:48:08 2017 fun2 finish Mon Jun 05 13:48:10 2017 finish 6.20300006866 Process finished with exit code 0
start() と join() が異なる位置にある場合に何が起こるかを見てみましょう
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() p1=Process(target=fun1,args=(4,)) p2 = Process(target=fun2, args=(6,)) p1.start() p1.join() p2.start() p2.join() b=time.time() print 'finish',b-a
結果:
this is fun1 Mon Jun 05 14:19:28 2017 fun1 finish Mon Jun 05 14:19:32 2017 this is fun2 Mon Jun 05 14:19:32 2017 fun2 finish Mon Jun 05 14:19:38 2017 finish 10.1229999065 Process finished with exit code 0
見て、fun1 関数が最初に実行されます。 、それが完了してから fun2 が実行され、次に print 'finish' が実行されます。つまり、最初にプロセス p1 が実行され、次にプロセス p2 が実行されます。 これで、join() の魅力に達したと感じます。 join() をコメントアウトして、再度何が起こるかを確認します
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() p1=Process(target=fun1,args=(4,)) p2 = Process(target=fun2, args=(6,)) p1.start() p2.start() p1.join() #p2.join() b=time.time() print 'finish',b-a
結果:
this is fun1 Mon Jun 05 14:23:57 2017 this is fun2 Mon Jun 05 14:23:58 2017 fun1 finish Mon Jun 05 14:24:01 2017 finish 4.05900001526 fun2 finish Mon Jun 05 14:24:04 2017 Process finished with exit code 0
今回は fun1 の実行が終了しています (p1 プロセスが join() を使用しているため、メイン プログラムは p1 の実行が終了するのを待ってから、次のプログラムを実行します)ステップ)、メインプロセスの print 'finish' を実行し続け、最後に fun2 の実行が完了した後に終了します
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def fun1(t):
print 'this is fun1',time.ctime()
time.sleep(t)
print 'fun1 finish',time.ctime()
def fun2(t):
print 'this is fun2',time.ctime()
time.sleep(t)
print 'fun2 finish',time.ctime()
if name == 'main':
a=time.time()
p1=Process(name='fun1进程',target=fun1,args=(4,))
p2 = Process(name='fun2进程',target=fun2, args=(6,))
p1.daemon=True
p2.daemon = True
p1.start()
p2.start()
p1.join()
print p1,p2
print '进程1:',p1.is_alive(),'进程2:',p2.is_alive()
#p2.join()
b=time.time()
print 'finish',b-a
this is fun2 Mon Jun 05 14:43:49 2017 this is fun1 Mon Jun 05 14:43:49 2017 fun1 finish Mon Jun 05 14:43:53 2017 <Process(fun1进程, stopped daemon)> <Process(fun2进程, started daemon)> 进程1: False 进程2: True finish 4.06500005722 Process finished with exit code 0
name がプロセスに名前を付けることであることがわかります。 print 'process 1:', p1.is_alive(), 'process 2:', p2.is_alive() という文に到達すると、p1 プロセスは終了します (戻り値)。 False) の場合、p2 プロセスは引き続き実行されます (True を返します)。ただし、p2 は join() を使用しないため、daemon=Ture が使用されるため、親プロセスは終了後に自動的に終了します。 p2 プロセスが終了する前にプログラム全体が強制終了されます。 .
3.run()run() Process がターゲット関数を指定しない場合、デフォルトで run() 関数がプログラムの実行に使用されます。
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a = time.time() p=Process() p.start() p.join() b = time.time() print 'finish', b - a
finish 0.0840001106262
目的関数にはパラメーターがありません:
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(): print 'this is fun1',time.ctime() time.sleep(2) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a = time.time() p=Process() p.run=fun1 p.start() p.join() b = time.time() print 'finish', b - a
this is fun1 Mon Jun 05 16:34:41 2017 fun1 finish Mon Jun 05 16:34:43 2017 finish 2.11500000954 Process finished with exit code 0
# -*- coding:utf-8 -*- from multiprocessing import Process import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a = time.time() p=Process() p.run=fun1(2) p.start() p.join() b = time.time() print 'finish', b - a
this is fun1 Mon Jun 05 16:36:27 2017 fun1 finish Mon Jun 05 16:36:29 2017 Process Process-1: Traceback (most recent call last): File "E:\Anaconda2\lib\multiprocessing\process.py", line 258, in _bootstrap self.run() TypeError: 'NoneType' object is not callable finish 2.0529999733 Process finished with exit code 0
2. プロセスプール
对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程
Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去
apply_async(函数,(参数)):非阻塞,其中参数是tulpe类型,
apply(函数,(参数)):阻塞
close():关闭pool,不能再添加新的任务
terminate():结束运行的进程,不再处理未完成的任务
join():和Process介绍的作用一样, 但要在close或terminate之后使用。
1.单个进程池
# -*- coding:utf-8 -*- from multiprocessing import Pool import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() pool = Pool(processes =3) # 可以同时跑3个进程 for i in range(3,8): pool.apply_async(fun1,(i,)) pool.close() pool.join() b=time.time() print 'finish',b-a
结果:
this is fun1 Mon Jun 05 15:15:38 2017 this is fun1 Mon Jun 05 15:15:38 2017 this is fun1 Mon Jun 05 15:15:38 2017 fun1 finish Mon Jun 05 15:15:41 2017 this is fun1 Mon Jun 05 15:15:41 2017 fun1 finish Mon Jun 05 15:15:42 2017 this is fun1 Mon Jun 05 15:15:42 2017 fun1 finish Mon Jun 05 15:15:43 2017 fun1 finish Mon Jun 05 15:15:47 2017 fun1 finish Mon Jun 05 15:15:49 2017 finish 11.1370000839 Process finished with exit code 0
从上面的结果可以看到,设置了3个运行进程上限,15:15:38这个时间同时开始三个进程,当第一个进程结束时(参数为3秒那个进程),会添加新的进程,如此循环,直至进程池运行完再执行主进程语句b=time.time() print 'finish',b-a .这里用到非阻塞apply_async(),再来对比下阻塞apply()
# -*- coding:utf-8 -*- from multiprocessing import Pool import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() pool = Pool(processes =3) # 可以同时跑3个进程 for i in range(3,8): pool.apply(fun1,(i,)) pool.close() pool.join() b=time.time() print 'finish',b-a
结果:
this is fun1 Mon Jun 05 15:59:26 2017 fun1 finish Mon Jun 05 15:59:29 2017 this is fun1 Mon Jun 05 15:59:29 2017 fun1 finish Mon Jun 05 15:59:33 2017 this is fun1 Mon Jun 05 15:59:33 2017 fun1 finish Mon Jun 05 15:59:38 2017 this is fun1 Mon Jun 05 15:59:38 2017 fun1 finish Mon Jun 05 15:59:44 2017 this is fun1 Mon Jun 05 15:59:44 2017 fun1 finish Mon Jun 05 15:59:51 2017 finish 25.1610000134 Process finished with exit code 0
可以看到,阻塞是当一个进程结束后,再进行下一个进程,一般我们都用非阻塞apply_async()
2.多个进程池
上面是使用单个进程池的,对于多个进程池,我们可以用for循环,直接看代码
# -*- coding:utf-8 -*- from multiprocessing import Pool import time def fun1(t): print 'this is fun1',time.ctime() time.sleep(t) print 'fun1 finish',time.ctime() def fun2(t): print 'this is fun2',time.ctime() time.sleep(t) print 'fun2 finish',time.ctime() if name == 'main': a=time.time() pool = Pool(processes =3) # 可以同时跑3个进程 for fun in [fun1,fun2]: for i in range(3,8): pool.apply_async(fun,(i,)) pool.close() pool.join() b=time.time() print 'finish',b-a
结果:
this is fun1 Mon Jun 05 16:04:38 2017 this is fun1 Mon Jun 05 16:04:38 2017 this is fun1 Mon Jun 05 16:04:38 2017 fun1 finish Mon Jun 05 16:04:41 2017 this is fun1 Mon Jun 05 16:04:41 2017 fun1 finish Mon Jun 05 16:04:42 2017 this is fun1 Mon Jun 05 16:04:42 2017 fun1 finish Mon Jun 05 16:04:43 2017 this is fun2 Mon Jun 05 16:04:43 2017 fun2 finish Mon Jun 05 16:04:46 2017 this is fun2 Mon Jun 05 16:04:46 2017 fun1 finish Mon Jun 05 16:04:47 2017 this is fun2 Mon Jun 05 16:04:47 2017 fun1 finish Mon Jun 05 16:04:49 2017 this is fun2 Mon Jun 05 16:04:49 2017 fun2 finish Mon Jun 05 16:04:50 2017 this is fun2 Mon Jun 05 16:04:50 2017 fun2 finish Mon Jun 05 16:04:52 2017 fun2 finish Mon Jun 05 16:04:55 2017 fun2 finish Mon Jun 05 16:04:57 2017 finish 19.1670000553 Process finished with exit code 0
看到了,在fun1运行完接着运行fun2.
另外对于没有参数的情况,就直接 pool.apply_async(funtion),无需写上参数.
在学习编写程序过程,曾遇到不用if _name_ == '_main_':而直接运行程序,这样结果会出错,经查询,在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if _name_ == ‘_main_' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。原因有人这么说:在执行的時候,由于你写的 py 会被当成module 读进执行。所以,一定要判断自身是否为 _main_。也就是要:
if name == ‘main' : # do something.
这里我自己还搞不清楚,期待以后能够理解
以上がPythonでのマルチプロセスとプロセスプール(処理ライブラリ)のサンプルコードの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。