這篇文章帶給大家的內容是關於python中多進程的詳細介紹(程式碼範例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。
本節講學習Python的多進程。
多進程Multiprocessing
和多執行緒threading 類似, 他們都是在python 中用來並行
運算的. 不過既然有了threading, 為什麼Python 還要出一個multiprocessing 呢? 原因很簡單, 就是用來彌補threading 的一些劣勢, 比如在threading 教程中提到的GIL
.
使用multiprocessing 也非常簡單, 如果對threading 有一定了解的朋友, 你們的享受時間就到了. 因為python 把multiprocessing 和threading 的使用方法做的幾乎差不多. 這樣我們就更容易上手. 也更容易發揮你電腦多核心系統的威力了!
import multiprocessing as mp import threading as td def job(a,d): print('aaaaa') t1 = td.Thread(target=job,args=(1,2)) p1 = mp.Process(target=job,args=(1,2)) t1.start() p1.start() t1.join() p1.join()
從上面的使用比較程式碼可以看出,執行緒和進程的使用方法相似。
在運用時需要新增上一個定義main函數的語句
if __name__=='__main__':
完整的應用程式碼:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_test.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(a, d): print a, d if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) p1.start() p1.join()
執行環境要在terminal環境下,可能其他的編輯工具會出現運行結束後沒有列印結果,在terminal中的運行後打印的結果為:
➜ baseLearn python ./process/process_test.py 1 2 ➜ baseLearn
Queue的功能是將每個核或執行緒的運算結果放在隊中, 等到每個執行緒或核運行完畢後再從佇列中取出結果, 繼續載入運算。原因很簡單, 多執行緒呼叫的函數不能有傳回值, 所以使用Queue儲存多個執行緒運算的結果
#process_queue.py
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp # 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果 def job(q): res = 0 for i in range(1000): res += i + i**2 + i**3 q.put(res) #queue if __name__ == '__main__': q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) # 分别启动、连接两个线程 p1.start() p2.start() p1.join() p2.join() # 上面是分两批处理的,所以这里分两批输出,将结果分别保存 res1 = q.get() res2 = q.get() print res1,res2
列印輸出結果:
➜ python ./process/process_queue.py 249833583000 249833583000
進程池
就是我們將要運行的東西,放到池子裡,Python會自行解決多進程的問題
。
首先import multiprocessing
和定義job()
import multiprocessing as mp def job(x): return x*x
然後我們定義一個Pool
pool = mp.Pool()
有了池子之後,就可以讓池子對應某一個函數,我們向池子裡丟資料,池子就會傳回函數傳回的值。 Pool
和先前的Process的
不同點是丟向Pool的函數有回傳值,而Process
的沒有回傳值。
接下來用map()
取得結果,在map()
中需要放入函數和需要迭代運算的值,然後它會自動分配給CPU核,回傳結果
res = pool.map(job, range(10))
讓我們來運行一下
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
完成程式碼:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(x): return x*x # 注意这里的函数有return返回值 def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
執行結果:
➜ baseLearn python ./process/process_pool.py [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
我們怎麼知道Pool
是否真的呼叫了多個核心呢?我們可以把迭代次數增大些,然後打開CPU負載看下CPU運行情況
打開CPU負載(Mac):活動監視器> CPU > CPU負載(單擊即可)
Pool預設大小是CPU的核數,我們也可以透過在Pool
中傳入processes
參數即可自訂所需的核數
def multicore(): pool = mp.Pool(processes=3) # 定义CPU核数量为3 res = pool.map(job, range(10)) print(res)
Pool
除了map()
外,還有可以回傳結果的方式,那就是apply_async()
.
apply_async()
中只能傳遞一個值,它只會放入一個核進行運算,但是傳入值時要注意是可迭代的,所以在傳入值後需要加逗號, 同時需要用get()方法取得回傳值
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get())
運行結果;
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map() 4 # apply_async()
Pool
預設呼叫是CPU的核數,傳入processes參數可自訂CPU核數
map()
放入迭代參數,傳回多個結果
apply_async()
只能放入一組參數,並傳回一個結果,如果想得到map()的效果需要透過迭代
這節我們學習如何定義共享記憶體。 只有用共享記憶體才能讓CPU之間有交流
。
我們可以透過使用Value
資料儲存在一個共享的記憶體表中。
import multiprocessing as mp value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14)
其中d
和i
參數用來設定資料類型的,d
表示一個雙精浮點型別double,i
表示一個帶符號的整數
。
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' |
signed char | int | 1 |
'B' |
unsigned char | int | 1 |
'u' |
Py_UNICODE | Unicode character | 2 |
'h' |
signed short | int | 2 |
'H' |
unsigned short | int | 2 |
'i' |
signed int | int | 2 |
'I' |
unsigned int | int | 2 |
'l' |
signed long | int | 4 |
'L' |
unsigned long | int | 4 |
'q' |
signed long long | int | 8 |
'Q' |
unsigned long long | int | 8 |
'f' |
float | float | 4 |
'd' |
double | float | 8 |
在Python的 mutiprocessing
中,有还有一个Array
类,可以和共享内存交互,来实现在进程之间共享数据。
array = mp.Array('i', [1, 2, 3, 4])
这里的Array
和numpy中的不同,它只能是一维
的,不能是多维的。同样和Value
一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.
错误形式
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list """ TypeError: an integer is required """
让我们看看没有加进程锁时会产生什么样的结果。
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_no_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num): for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) def multicore(): v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1)) p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
在上面的代码中,我们定义了一个共享变量v
,两个进程都可以对它进行操作。 在job()中我们想让v
每隔0.1秒输出一次累加num
的结果,但是在两个进程p1
和p2
中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。
结果打印:
➜ baseLearn python ./process/process_no_lock.py 1 5 9 9 13 13 17 17 18 18 ➜ baseLearn
我们可以看到,进程1和进程2在相互抢
着使用共享内存v
。
为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
首先需要定义一个进程锁
l = mp.Lock() # 定义一个进程锁
然后将进程锁的信息传入各个进程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=(v,3,l))
在job()
中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # v.value获取共享内存 print(v.value) l.release() # 释放
全部代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入 p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
运行一下,让我们看看是否还会出现抢占资源的情况:
结果打印:
➜ baseLearn python ./process/process_lock.py 1 2 3 4 5 9 13 17 21 25
显然,进程锁保证了进程p1
的完整运行,然后才进行了进程p2
的运行
相关推荐:
以上是python中多進程的詳細介紹(程式碼範例)的詳細內容。更多資訊請關注PHP中文網其他相關文章!