讓電腦程式並發的運作是一個經常被討論的話題,今天我想討論一下Python下的各種並發方式。
並發方式
線程(Thread)
多線程幾乎是每一個程式猿在使用每一種語言時都會首先想到用於解決並發的工具(JS程式設計師請迴避),使用多執行緒可以迴避的利用CPU資源(Python例外)。然而多執行緒所帶來的程式的複雜度也不可避免,尤其是對競爭資源的同步問題。
然而在python中由於使用了全局解釋鎖(GIL)的原因,程式碼並不能同時在多核心上並發的運行,也就是說,Python的多執行緒不能並發,很多人會發現使用多執行緒來改進自己的Python程式碼後,程式的運作效率卻下降了,這是多麼蛋痛的一件事呀!如果想了解更多細節,推薦閱讀這篇文章。實際上使用多執行緒的程式設計模型是很困難的,程式設計師很容易犯錯,這並不是程式設計師的錯誤,因為並行思維是反人類的,我們大多數人的思維是串列(精神分裂不討論) ,而且馮諾依曼設計的電腦架構也是以順序執行為基礎的。所以如果你總是不能把你的多線程程式搞定,恭喜你,你是個思維正常的程式猿:)
Python提供兩組線程的接口,一組是thread模組,提供基礎的,低等級(Low Level)接口,使用Function作為執行緒的運行體。還有一組是threading模組,提供更容易使用的基於對象的接口(類似於java),可以繼承Thread對象來實現線程,還提供了其它一些線程相關的對象,例如Timer,Lock
使用thread模組的例
1
2
3
4
5
import thread
import thread
import thread
unction"""
PRint 'Worker'
thread.start_new_thread(worker)
使用threading模組的例子
1
25
import threading
def worker():
"""thread worker function"""
print 'Worker'
t = threading.Thread(target=worker)
t = threading.Thread(target=worker)
t = threading.遠
或Java Style 1 23456789er(threading.Thread):
def __init__(self) :
pass
def run():
t = worker()
t.start()
進程(Process)由於前文提到的全局解釋鎖的問題,Python下比較好的並行方式是使用多進程,這樣可以非常有效的使用CPU資源,並實現真正意義上的並發。當然,進程的開銷比線程要大,也就是說如果你要創建數量驚人的並發進程的話,需要考慮一下你的機器是不是有一顆強大的心。 Python的mutliprocess模組和threading具有類似的介面。 1234567
def worker(): """thread worker function""" print 'Worker'p = Process(target=worker)p.start()所以執行緒之間的通訊是非常容易的,然而進程之間的通訊就要複雜一些了。常見的進程間通訊有,管道,訊息佇列,Socket介面(TCP/IP)等等。 Python的mutliprocess模組提供了封裝好的管道和佇列,可以方便的在進程間傳遞訊息。 Python進程間的同步使用鎖,這一點喝線程是一樣的。 另外,Python也提供了一個進程池Pool對象,可以方便的管理和控制執行緒。 遠端分散式主機 (Distributed Node)
隨著大數據時代的到臨,摩爾定理在單機上似乎已經失去了效果,數據的計算和處理需要分佈式的計算機網絡來運行,程序並行的運行在多個主機節點上,已經是現在的軟體架構所必需考慮的問題。
遠端主機間的進程間通訊有幾種常見的方式
TCP/IP
TCP/IP是所有遠端通訊的基礎,然而API比較低級別,使用起來比較繁瑣,所以一般不會考慮
遠端方法呼叫Remote Function Call
RPC是早期的遠端進程間通訊的手段。 Python下有一個開源的實作RPyC
遠端物件 Remote Object
遠端物件是更高層級的封裝,程式可以想操作本地物件一樣去操作一個遠端物件在本地的代理。遠端物件最廣為使用的規範CORBA,CORBA最大的好處是可以在不同語言和平台中進行通訊。當讓不用的語言和平台還有一些各自的遠端物件實現,例如Java的RMI,MS的DCOM
Python的開源實現,有許多對遠端物件的支援
Dopy
Fnorb (CORBA)
omniORB (CORBA)
Pyro
YAMI
訊息隊列Message Queue
比起RPC或遠端對象,訊息是一種更靈活的通訊手段,且有常見的支援機制
ZeroMQ
Kafka
AWS SQS + BOTO
在遠端主機上執行並發和本地的多進程並沒有非常大的差異,都需要解決進程間通訊的問題。當然遠端進程的管理和協調比起本地更複雜。
Python下有許多開源的框架來支援分散式的並發,提供有效的管理手段包括:
Celery
Celery是一個非常成熟的Python分散式框架,可以在分散式的系統中,異步的執行任務,並提供有效的管理和調度功能。參考這裡
SCOOP
SCOOP (Scalable COncurrent Operations in Python)提供簡單易用的分散式呼叫接口,使用Future介面來進行並發。
Dispy
相比起Celery和SCOOP,Dispy提供更為輕量級的分散式並行服務
PP
PP (Parallel Python)是另外一個輕量級的Python並行服務, 參考這裡yn
Asyncoro是另一個利用Generator實現分散式並發的Python框架,
當然還有許多其它的系統,我沒有一一列出
另外,許多的分散式系統多提供了對Python介面的支持,例如Spark
偽線程(Pseudo-Thread)
還有一種並發手段並不常見,我們可以稱之為偽線程,就是看上去像是線程,使用的接口類似線程接口,但是實際使用非線程的方式,對應的執行緒開銷也不存的。
greenlet
greenlet提供輕量的coroutines來支援進程內的並發。
greenlet是Stackless的副產品,使用tasklet來支援一中被稱為微線程(mirco-thread)的技術,這裡是一個使用greenlet的偽線程的例子
1
2
33
4
5
6
7
8
9
10
11
from greenlet import greenlet def test1 (): print 12 gr2.switch() print 3456
gr1.switch()
print 78
gr1 = greenlet( test1)
gr2 = greenlet(test2)
gr1.switch()
12
56
34
偽線程gr1 switch會列印12,然後呼叫gr2 switch得到56,然後switch回到gr1,列印34,然後偽線程gr1結束,程式退出,所以78永遠不會被列印。透過這個例子我們可以看出,使用偽線程,我們可以有效的控製程式的執行流程,但是偽線程並不存在真正意義上的並發。
eventlet,gevent和concurence都是基於greenlet提供並發的。
eventlet http://eventlet.net/
eventlet是一個提供網路呼叫並發的Python函式庫,使用者可以以非阻塞的方式來呼叫阻塞的IO操作。
1
2
3
4
5
6
7
8
6
7
import eventletfrom eventlet.green import urllib2 urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']🎎 fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
執行結果如下
1
2
3
('got body', 17629)
('got body', 17629)
eventlet為了支援generator的操作對urllib2做了修改,介面和urllib2是一致的。這裡的GreenPool和Python的Pool介面一致。
gevent
gevent和eventlet類似,關於它們的差異大家可以參考這篇文章
1
2
3
import geventurls gevent import socketurls = ['www.google.com', 'www.example.com', 'www.python.org']jobs url in urls]gevent.joinall(jobs, timeout=2) print [job.value for job in bs]
1
[ '206.169.145.226', '93.184.216.34', '23.235.39.223']
concurence是另一個利用greenlet提供網路並發的開源庫,我沒有用過,大家可以自己試試看。
實戰運用
通常需要用到並發的場合有兩種,一種是計算密集型,也就是說你的程式需要大量的CPU資源;另一種是IO密集型,程式可能有大量的讀寫入操作,包括讀寫文件,收發網路請求等等。
計算密集型
對應計算密集型的應用,我們選用著名的蒙特卡羅演算法來計算PI值。基本原理如下
蒙特卡羅演算法利用統計學原理來模擬計算圓周率,在一個正方形中,一個隨機的點落在1/4圓的區域(紅色點)的機率與其面積成正比。也就此機率 p = Pi * R*R /4 : R* R , 其中R是正方形的邊長,圓的半徑。也就是說該機率是圓周率的1/4, 利用這個結論,只要我們模擬出點落在四分之一圓上的機率就可以知道圓周率了,為了得到這個機率,我們可以透過大量的實驗,也就是產生大量的點,看看這個點在哪個區域,然後統計出結果。 基本演算法如下: 12345
def test(tries):
return sum(hypot(random(), random())
這裡test方法做了n(tries)次試驗,並返回落在四分之一圓中的點的個數。判斷方法是檢查該點到圓心的距離,如果小於R則是在圓上。
通過大量的並發,我們可以快速的運行多次試驗,試驗的次數越多,結果越接近真實的圓周率。
這裡給出不同並發方法的程式碼非並發我們先在單線程,但進程運行,看看效能如何 12 12 6789101112我 from math import hypotfrom random import randomimport eventletimport time def test(tries): returnsum(hypot): return11(hypot().
def calcPi (nbFutures, tries):
ts = time.time()
result = map(test, [tries] * nbFutures)p含o. * sum(result) / float(nbFutures * tries)
span = time.time() - ts
print "time spend ", span
re審查
多執行緒thread為了使用執行緒池,我們用multiprocessing的dummy包,它是對多執行緒的一個封裝。注意這裡程式碼雖然一個字的沒有提到線程,但它千真萬確是多線程。透過測試我們開(jing)心(ya)的發現,果然不出所料,當線程池為1是,它的運行結果和沒有並發時一樣,當我們把線程池數字設置為5時,耗時幾乎是沒有並發的2倍,我的測試數據從5秒到9秒。所以對於計算密集型的任務,還是放棄多執行緒吧。
1
2
3
4
5
6
7
8
6
7
13141516 1718192021 from multiproces. ort hypotfrom random import randomimport time def test( tries): return sum(hypot(random(), random()),Pamot. )
p = Pool(1)
result = p.map(test, [tries] * nbFutures)
span = time.time() - ts print "time spend ", span return ret print("pi = {}".format(calcPi(3000 , 4000))) 多進程multiprocess理論上對於計算密集型進程的任務,使用多進程並發比較合適,在以下進程的例子中,池的規模化池的大小可以看到對結果的影響,當進程池設定為1時,和多執行緒的結果所需的時間類似,因為這時候並不存在並發;當設定為2時,回應時間有了明顯的改進,是之前沒有並發的一半;然而繼續擴大進程池對效能影響並不大,甚至有所下降,也許我的Apple Air的CPU只有兩個核心? 當心,如果你設定一個非常大的進程池,你會遇到 Resource temporarily unavailable的錯誤,系統並不能支援創建太多的進程,畢竟資源是有限的。 1234567867
13
14
15
16
17
18
19
20
from multiprocessing import Pool
from multiprocessing import Pool
from multiprocessing import Pool
from multiprocessing import
from random import randomimport time def test(tries): return sum(hypot(random(), random()) def calcPi(nbFutures, = Pool( 5) result = p.map(test, [tries] * nbFutures) ret = 4. * sum(result) ret = 4. * sum(result) ret = 4. * sum(result) . time.time() - ts print "time spend ", span return ret if __name__ == '__main__':
gevent (偽線程)
不論是gevent還是eventlet,因為不存在實際的並發,回應時間和沒有並發區別不大,這個和測試結果一致。
1
2
3
4
5
6
7
8
6
7
13141516 1718 import geventfrom math import hypotfrom 5omdef test(tries):
return sum(hypot(random(), random ())
def calcPi(nbFutures, tries):
for t in [tries] * nbFutures]
gevent.joinall(jobs, timeout=2)
ret = 4. * sum([job.valueFu .
span = time.time() - ts
列印「時間花費」,且span
eventlet (α線程)
1
2
3
4
5
6
710
14151617 1819 從數學導入hypot隨機導入隨機導入隨機導入eventlet hypot(random(), 隨機()) def calcPi(nbFutures, attempts): ts 結果 = 池。 imap(測試,[嘗試]*nbfutures)ret =4。 return ret print calcPi(3000,4000) 95的定義,Open在Python3中提供了Future介面。 在塔的SCOOP環境下(單機,4個配置Worker),吞吐量的效能有提高,但不如單一進程池配置的多進程。 12345 6718
13
14
15
16
17
18
19
來自數學導入hypot
def測試(嘗試): 返回總和( hypot(隨機(),隨機()) def calcPi(nbFutures, attempts):
test , [tries] * nbFutures)
ret = 4. * sum(expr) / float(nbFutures * tries) ,span
返回ret
if __name__ == 「__main__」:
print("pi = {}".format(calcPi(3000, 4000))))))))))
任務代碼 1 2 34 5678Celery
來自數學導入hypot
隨機導入隨機
app = Celery('任務',後端='amqp',經紀人='amqp://guest@localhost //')
app.conf.CELERY_RESULT_BACKEND /results.sqlite'
@app。任務
def測試(嘗試):
返回sum(hypot(隨機(),隨機())
14
15
來自芹菜進口組
從任務導入測試
導入時間
def calcPi(nbFutures, attempts): (test.s(tries) for i 在x 範圍內(nbFutures))().get()
ret = 4. * sum
列印「時間花費”, span
返回ret
打印calcPi(3000, 4000)
使用Celery做並發的測試結果出乎意料(環境是單機,4frefork的並發,訊息broker是rabbitMQ),是所有測試案例裡最糟糕的,回應時間是沒有並發的5~6倍。這也許是因為控制協調的開銷太大。對於這樣的計算任務,Celery也許不是一個好的選擇。
asyncoro
Asyncoro的測試結果和非併發保持一致。
1
2
3
4
5
6
7
8
6
7
13141516 171819 import asyncoroimport time def test(tries): yield sum(hypot( random(), random()) def calcPi 和 coros = [ asyncoro.Coro( test,t) for t in [tries] * nbFutures] ret = 4. * sum([job.value() for span = time.time() - ts print "time spend ", span return ret print calc/30000,4000) IO密集的任務是另一個常見的用例,例如網路WEB伺服器就是一個例子,每秒鐘能處理多少個請求時WEB伺服器的重要指標。 我們以網頁讀取為最簡單的例子 12345645
11 121314151617 from math import
urls = ['http://www.google.com' , 'http://www.example.com', 'http://www.python.org']
def test(url):
return urllib2.openurl(url).read(
def testIO(nbFutures):
ts = time.time()
map(test, - ts
print "time spend ", span
testIO(10)
在不同並發庫下的程式碼,由於比較類似,我就不一一列出。大家可以參考計算密集型中程式碼做參考。
透過測試我們可以發現,對於IO密集型的任務,使用多線程,或者是多進程都可以有效的提高程序的效率,而使用偽線程性能提升非常顯著,eventlet比沒有並發的情況下,響應時間從9秒提高到0.03秒。同時eventlet/gevent提供了非阻塞的非同步呼叫模式,非常方便。這裡推薦使用線程或偽線程,因為在回應時間類似的情況下,線程和偽線程消耗的資源更少。
總結
Python提供了不同的並發方式,對應於不同的場景,我們需要選擇不同的方式進行並發。選擇適合的方式,不但要對方法的原理有所了解,還應該做一些測試和試驗,數據才是你做選擇的最好參考。
以上就是使用Python進行並發程式設計的內容,更多相關文章請關注PHP中文網(www.php.cn)!