首頁 > 後端開發 > Python教學 > 使用Python進行並發編程

使用Python進行並發編程

黄舟
發布: 2016-12-16 11:52:59
原創
1426 人瀏覽過

讓電腦程式並發的運作是一個經常被討論的話題,今天我想討論一下Python下的各種並發方式。

並發方式

線程(Thread)

多線程幾乎是每一個程式猿在使用每一種語言時都會首先想到用於解決並發的工具(JS程式設計師請迴避),使用多執行緒可以迴避的利用CPU資源(Python例外)。然而多執行緒所帶來的程式的複雜度也不可避免,尤其是對競爭資源的同步問題。

然而在python中由於使用了全局解釋鎖(GIL)的原因,程式碼並不能同時在多核心上並發的運行,也就是說,Python的多執行緒不能並發,很多人會發現使用多執行緒來改進自己的Python程式碼後,程式的運作效率卻下降了,這是多麼蛋痛的一件事呀!如果想了解更多細節,推薦閱讀這篇文章。實際上使用多執行緒的程式設計模型是很困難的,程式設計師很容易犯錯,這並不是程式設計師的錯誤,因為並行思維是反人類的,我們大多數人的思維是串列(精神分裂不討論) ,而且馮諾依曼設計的電腦架構也是以順序執行為基礎的。所以如果你總是不能把你的多線程程式搞定,恭喜你,你是個思維正常的程式猿:)

Python提供兩組線程的接口,一組是thread模組,提供基礎的,低等級(Low Level)接口,使用Function作為執行緒的運行體。還有一組是threading模組,提供更容易使用的基於對象的接口(類似於java),可以繼承Thread對象來實現線程,還提供了其它一些線程相關的對象,例如Timer,Lock

使用thread模組的例

1

2

3

4

5

   

import thread

   

import thread

   

import thread

 unction"""

    PRint 'Worker'

thread.start_new_thread(worker)

   

使用threading模組的例子

1

25 

import threading

def worker():

    """thread worker function"""

    print 'Worker'

t = threading.Thread(target=worker)

t = threading.Thread(target=worker)

t = threading.遠

或Java Style

1

2

3

4

5

6

7

8

9

er(threading.Thread):

    def __init__(self) :

        pass

    def run():

        

      

t = worker()

t.start()

   

   

進程(Process)

由於前文提到的全局解釋鎖的問題,Python下比較好的並行方式是使用多進程,這樣可以非常有效的使用CPU資源,並實現真正意義上的並發。當然,進程的開銷比線程要大,也就是說如果你要創建數量驚人的並發進程的話,需要考慮一下你的機器是不是有一顆強大的心。

Python的mutliprocess模組和threading具有類似的介面。

1

2

3

4

5

6

7

  

def worker():

    """thread worker function"""

    print 'Worker'

p = Process(target=worker)

p.start()

所以執行緒之間的通訊是非常容易的,然而進程之間的通訊就要複雜一些了。常見的進程間通訊有,管道,訊息佇列,Socket介面(TCP/IP)等等。

Python的mutliprocess模組提供了封裝好的管道和佇列,可以方便的在進程間傳遞訊息。

Python進程間的同步使用鎖,這一點喝線程是一樣的。

另外,Python也提供了一個進程池Pool對象,可以方便的管理和控制執行緒。

遠端分散式主機 (Distributed Node)

隨著大數據時代的到臨,摩爾定理在單機上似乎已經失去了效果,數據的計算和處理需要分佈式的計算機網絡來運行,程序並行的運行在多個主機節點上,已經是現在的軟體架構所必需考慮的問題。

遠端主機間的進程間通訊有幾種常見的方式

TCP/IP

TCP/IP是所有遠端通訊的基礎,然而API比較低級別,使用起來比較繁瑣,所以一般不會考慮

遠端方法呼叫Remote Function Call

RPC是早期的遠端進程間通訊的手段。 Python下有一個開源的實作RPyC

遠端物件 Remote Object

遠端物件是更高層級的封裝,程式可以想操作本地物件一樣去操作一個遠端物件在本地的代理。遠端物件最廣為使用的規範CORBA,CORBA最大的好處是可以在不同語言和平台中進行通訊。當讓不用的語言和平台還有一些各自的遠端物件實現,例如Java的RMI,MS的DCOM

Python的開源實現,有許多對遠端物件的支援

Dopy

Fnorb (CORBA)

omniORB (CORBA)

Pyro

YAMI

訊息隊列Message Queue

比起RPC或遠端對象,訊息是一種更靈活的通訊手段,且有常見的支援機制

ZeroMQ

Kafka

AWS SQS + BOTO

在遠端主機上執行並發和本地的多進程並沒有非常大的差異,都需要解決進程間通訊的問題。當然遠端進程的管理和協調比起本地更複雜。

Python下有許多開源的框架來支援分散式的並發,提供有效的管理手段包括:

Celery

Celery是一個非常成熟的Python分散式框架,可以在分散式的系統中,異步的執行任務,並提供有效的管理和調度功能。參考這裡

SCOOP

SCOOP (Scalable COncurrent Operations in Python)提供簡單易用的分散式呼叫接口,使用Future介面來進行並發。

Dispy

相比起Celery和SCOOP,Dispy提供更為輕量級的分散式並行服務

PP

PP (Parallel Python)是另外一個輕量級的Python並行服務, 參考這裡yn

Asyncoro是另一個利用Generator實現分散式並發的Python框架,

當然還有許多其它的系統,我沒有一一列出

另外,許多的分散式系統多提供了對Python介面的支持,例如Spark

偽線程(Pseudo-Thread)

還有一種並發手段並不常見,我們可以稱之為偽線程,就是看上去像是線程,使用的接口類似線程接口,但是實際使用非線程的方式,對應的執行緒開銷也不存的。

greenlet

greenlet提供輕量的coroutines來支援進程內的並發。

greenlet是Stackless的副產品,使用tasklet來支援一中被稱為微線程(mirco-thread)的技術,這裡是一個使用greenlet的偽線程的例子

1

2

33

4

5

6

7

8

9

10

11

   

from greenlet import greenlet

  

def test1 ():

    print 12

    gr2.switch()

    print 34

 56

    gr1.switch()

    print 78

      

gr1 = greenlet( test1)

gr2 = greenlet(test2)

gr1.switch()

   

 

12

56

34

   

偽線程gr1 switch會列印12,然後呼叫gr2 switch得到56,然後switch回到gr1,列印34,然後偽線程gr1結束,程式退出,所以78永遠不會被列印。透過這個例子我們可以看出,使用偽線程,我們可以有效的控製程式的執行流程,但是偽線程並不存在真正意義上的並發。

eventlet,gevent和concurence都是基於greenlet提供並發的。

eventlet http://eventlet.net/

eventlet是一個提供網路呼叫並發的Python函式庫,使用者可以以非阻塞的方式來呼叫阻塞的IO操作。

1

2

3

4

5

6

7

8

6

7

   

import eventlet

from eventlet.green import urllib2

  

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

🎎 fetch(url):

    return urllib2.urlopen(url).read()

  

pool = eventlet.GreenPool()

  

for body in pool.imap(fetch, urls):

    

執行結果如下

1

2

3

   

('got body', 17629)

('got body', 17629)

   

eventlet為了支援generator的操作對urllib2做了修改,介面和urllib2是一致的。這裡的GreenPool和Python的Pool介面一致。

gevent

gevent和eventlet類似,關於它們的差異大家可以參考這篇文章

1

2

3

import gevent

urls gevent import socket

urls = ['www.google.com', 'www.example.com', 'www.python.org']

jobs url in urls]

gevent.joinall(jobs, timeout=2)

  

print [job.value fo​​r job in bs]

1

   

[ '206.169.145.226', '93.184.216.34', '23.235.39.223']

 concurence是另一個利用greenlet提供網路並發的開源庫,我沒有用過,大家可以自己試試看。

實戰運用

通常需要用到並發的場合有兩種,一種是計算密集型,也就是說你的程式需要大量的CPU資源;另一種是IO密集型,程式可能有大量的讀寫入操作,包括讀寫文件,收發網路請求等等。

計算密集型

對應計算密集型的應用,我們選用著名的蒙特卡羅演算法來計算PI值。基本原理如下

蒙特卡羅演算法利用統計學原理來模擬計算圓周率,在一個正方形中,一個隨機的點落在1/4圓的區域(紅色點)的機率與其面積成正比。也就此機率 p = Pi * R*R /4  : R* R , 其中R是正方形的邊長,圓的半徑。也就是說該機率是圓周率的1/4, 利用這個結論,只要我們模擬出點落在四分之一圓上的機率就可以知道圓周率了,為了得到這個機率,我們可以透過大量的實驗,也就是產生大量的點,看看這個點在哪個區域,然後統計出結果。

基本演算法如下:

1

2

3

4

5

  

def test(tries):

    return sum(hypot(random(), random())

   

這裡test方法做了n(tries)次試驗,並返回落在四分之一圓中的點的個數。判斷方法是檢查該點到圓心的距離,如果小於R則是在圓上。

通過大量的並發,我們可以快速的運行多次試驗,試驗的次數越多,結果越接近真實的圓周率。 使用Python進行並發編程

這裡給出不同並發方法的程式碼

非並發

我們先在單線程,但進程運行,看看效能如何

1

2

1

2 6

7

8

9

10

11

12

   

from math import hypot

from random import random

import eventlet

import time

  

def test(tries):

    returnsum(hypot):

    return11(hypot().

  

def calcPi (nbFutures, tries):

    ts = time.time()

    result = map(test, [tries] * nbFutures)🠎🠎🠎p含o. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    re審查

   

多執行緒thread

為了使用執行緒池,我們用multiprocessing的dummy包,它是對多執行緒的一個封裝。注意這裡程式碼雖然一個字的沒有提到線程,但它千真萬確是多線程。

透過測試我們開(jing)心(ya)的發現,果然不出所料,當線程池為1是,它的運行結果和沒有並發時一樣,當我們把線程池數字設置為5時,耗時幾乎是沒有並發的2倍,我的測試數據從5秒到9秒。所以對於計算密集型的任務,還是放棄多執行緒吧。

1

2

3

4

5

6

7

8

6

7

13

14

15

16

17

18

19

20

21

   

from multiproces. ort hypot

from random import random

import time

  

def test( tries):

    return sum(hypot(random(), random())   

,Pamot​​. )

    p = Pool(1)

    result = p.map(test, [tries] * nbFutures)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

    print("pi = {}".format(calcPi(3000 , 4000)))

   

 

多進程multiprocess

理論上對於計算密集型進程的任務,使用多進程並發比較合適,在以下進程的例子中,池的​​規模化池的大小可以看到對結果的影響,當進程池設定為1時,和多執行緒的結果所需的時間類似,因為這時候並不存在並發;當設定為2時,回應時間有了明顯的改進,是之前沒有並發的一半;然而繼續擴大進程池對效能影響並不大,甚至有所下降,也許我的Apple Air的CPU只有兩個核心?

當心,如果你設定一個非常大的進程池,你會遇到 Resource temporarily unavailable的錯誤,系統並不能支援創建太多的進程,畢竟資源是有限的。

1

2

3

4

5

6

7

8

6

7

13

14

15

16

17

18

19

20

   

from multiprocessing import Pool

from multiprocessing import Pool

from multiprocessing import Pool

from multiprocessing import

from random import random

import time

  

def test(tries):

return sum(hypot(random(), random())   

def calcPi(nbFutures, = Pool( 5)

    result = p.map(test, [tries] * nbFutures)

    ret = 4. * sum(result)    ret = 4. * sum(result)    ret = 4. * sum(result)    . time.time() - ts

    print "time spend ", span

    return ret

  

if __name__ == '__main__':

  

 

gevent (偽線程)

不論是gevent還是eventlet,因為不存在實際的並發,回應時間和沒有並發區別不大,這個和測試結果一致。

1

2

3

4

5

6

7

8

6

7

13

14

15

16

17

18

   

import gevent

from math import hypot

from 5om

def test(tries):

    return sum(hypot(random(), random ())

  

def calcPi(nbFutures, tries):

   for t in [tries] * nbFutures]

    gevent.joinall(jobs, timeout=2)

    ret = 4. * sum([job.valueFu  .

    span = time.time() - ts

    列印「時間花費」,且span

    

 

eventlet (α線程)

1

2

3

4

5

6

710

14

15

16

17

18

19

 

從數學導入hypot

隨機導入隨機導入隨機

導入eventlet

hypot(random(), 隨機())   

def calcPi(nbFutures, attempts):

    ts    結果 = 池。 imap(測試,[嘗試]*nbfutures)

ret =4。

    return ret

  

print calcPi(3000,4000)

 95的定義,Open在Python3中提供了Future介面。

在塔的SCOOP環境下(單機,4個配置Worker),吞吐量的效能有提高,但不如單一進程池配置的多進程。

1

2

3

4

5

6

7

18

13

14

15

16

17

18

19

   

來自數學導入hypot

def測試(嘗試):

    返回總和( hypot(隨機(),隨機())  

def calcPi(nbFutures, attempts):

  test , [tries] * nbFutures)

    ret = 4. * sum(expr) / float(nbFutures * tries) ,span

    返回ret

  

if __name__ == 「__main__」:

    print("pi = {}".format(calcPi(3000, 4000))))))))))

任務代碼

1

2

3

4

5

6

7

8

 Celery

  

來自數學導入hypot

隨機導入隨機

   

app = Celery('任務',後端='amqp',經紀人='amqp://guest@localhost //')

app.conf.CELERY_RESULT_BACKEND /results.sqlite'

   

@app。任務

def測試(嘗試):

    返回sum(hypot(隨機(),隨機())

 

14

15

   

來自芹菜進口組

從任務導入測試

  

導入時間

  

def calcPi(nbFutures, attempts): (test.s(tries) for i 在x 範圍內(nbFutures))().get()

      

    ret = 4. * sum 

    列印「時間花費”, span

    返回ret

  

打印calcPi(3000, 4000)

   

使用Celery做並發的測試結果出乎意料(環境是單機,4frefork的並發,訊息broker是rabbitMQ),是所有測試案例裡最糟糕的,回應時間是沒有並發的5~6倍。這也許是因為控制協調的開銷太大。對於這樣的計算任務,Celery也許不是一個好的選擇。

asyncoro

Asyncoro的測試結果和非併發保持一致。

1

2

3

4

5

6

7

8

6

7

13

14

15

16

17

18

19

   

import asyncoro

  

import time

  

def test(tries):

    yield sum(hypot( random(), random())   

  

def calcPi 和 coros = [ asyncoro.Coro( test,t) for t in [tries] * nbFutures]

    ret = 4. * sum([job.value() for span = time.time() - ts

    print "time spend ", span

    return ret

  

print calc/30000,4000)

IO密集的任務是另一個常見的用例,例如網路WEB伺服器就是一個例子,每秒鐘能處理多少個請求時WEB伺服器的重要指標。

我們以網頁讀取為最簡單的例子

1

2

3

4

5

6

4

5

11

12

13

14

15

16

17

   

from math import

  

urls = ['http://www.google.com' , 'http://www.example.com', 'http://www.python.org']

  

def test(url):

    return urllib2.openurl(url).read(

def testIO(nbF​​utures):

    ts = time.time()

    map(test, - ts

    print "time spend ", span

  

testIO(10)

   

在不同並發庫下的程式碼,由於比較類似,我就不一一列出。大家可以參考計算密集型中程式碼做參考。

透過測試我們可以發現,對於IO密集型的任務,使用多線程,或者是多進程都可以有效的提高程序的效率,而使用偽線程性能提升非常顯著,eventlet比沒有並發的情況下,響應時間從9秒提高到0.03秒。同時eventlet/gevent提供了非阻塞的非同步呼叫模式,非常方便。這裡推薦使用線程或偽線程,因為在回應時間類似的情況下,線程和偽線程消耗的資源更少。

總結

Python提供了不同的並發方式,對應於不同的場景,我們需要選擇不同的方式進行並發。選擇適合的方式,不但要對方法的原理有所了解,還應該做一些測試和試驗,數據才是你做選擇的最好參考。

 以上就是使用Python進行並發程式設計的內容,更多相關文章請關注PHP中文網(www.php.cn)!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板