首页 后端开发 Python教程 使用Python进行并发编程

使用Python进行并发编程

Dec 16, 2016 am 11:52 AM

让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。

并发方式

线程(Thread)

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!如果想了解更多细节,推荐阅读这篇文章。实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低等级(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

1

2

3

4

5

   

import thread

def worker():

    """thread worker function"""

    PRint 'Worker'

thread.start_new_thread(worker)

   

使用threading模块的例子

1

2

3

4

5

6

   

import threading

def worker():

    """thread worker function"""

    print 'Worker'

t = threading.Thread(target=worker)

t.start()

   

或者Java Style

1

2

3

4

5

6

7

8

9

10

   

import threading

class worker(threading.Thread):

    def __init__(self):

        pass

    def run():

        """thread worker function"""

        print 'Worker'

      

t = worker()

t.start()

   

进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

1

2

3

4

5

6

7

8

   

from multiprocessing import Process

  

def worker():

    """thread worker function"""

    print 'Worker'

p = Process(target=worker)

p.start()

p.join()

   

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。

远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

TCP/IP

TCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑

远程方法调用 Remote Function Call

RPC是早期的远程进程间通信的手段。Python下有一个开源的实现RPyC

远程对象 Remote Object

远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOM

Python的开源实现,有许多对远程对象的支持

Dopy

Fnorb (CORBA)

ICE

omniORB (CORBA)

Pyro

YAMI

消息队列 Message Queue

比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有

RabbitMQ

ZeroMQ

Kafka

AWS SQS + BOTO

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

Celery

Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。参考这里

SCOOP

SCOOP (Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。

Dispy

相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

PP

PP (Parallel Python)是另外一个轻量级的Python并行服务, 参考这里

Asyncoro

Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark

伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

greenlet

greenlet提供轻量级的coroutines来支持进程内的并发。

greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

   

from greenlet import greenlet

  

def test1():

    print 12

    gr2.switch()

    print 34

      

def test2():

    print 56

    gr1.switch()

    print 78

      

gr1 = greenlet(test1)

gr2 = greenlet(test2)

gr1.switch()

   

运行以上程序得到如下结果:

1

2

3

   

12

56

34

   

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

eventlet http://eventlet.net/

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式来调用阻塞的IO操作。

1

2

3

4

5

6

7

8

9

10

11

12

   

import eventlet

from eventlet.green import urllib2

  

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

  

def fetch(url):

    return urllib2.urlopen(url).read()

  

pool = eventlet.GreenPool()

  

for body in pool.imap(fetch, urls):

    print("got body", len(body))

   

 

执行结果如下

1

2

3

   

('got body', 17629)

('got body', 1270)

('got body', 46949)

   

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

gevent

gevent和eventlet类似,关于它们的差异大家可以参考这篇文章

1

2

3

4

5

6

7

   

import gevent

from gevent import socket

urls = ['www.google.com', 'www.example.com', 'www.python.org']

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=2)

  

print [job.value for job in jobs]

   

执行结果如下:

1

   

['206.169.145.226', '93.184.216.34', '23.235.39.223']

   

 

concurence https://github.com/concurrence/concurrence

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。

实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

xiaijmhak2o50.jpg

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4  : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

1

2

3

4

5

   

from math import hypot

from random import random

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

   

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

非并发

我们先在单线程,但进程运行,看看性能如何

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

   

from math import hypot

from random import random

import eventlet

import time

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

  

def calcPi(nbFutures, tries):

    ts = time.time()

    result = map(test, [tries] * nbFutures)

      

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000,4000)

   

多线程 thread

为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。

通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

   

from multiprocessing.dummy import Pool

  

from math import hypot

from random import random

import time

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

  

def calcPi(nbFutures, tries):

    ts = time.time()

    p = Pool(1)

    result = p.map(test, [tries] * nbFutures)

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

if __name__ == '__main__':

    p = Pool()

    print("pi = {}".format(calcPi(3000, 4000)))

   

 

多进程 multiprocess

理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?

当心,如果你设置一个非常大的进程池,你会遇到 Resource temporarily unavailable的错误,系统并不能支持创建太多的进程,毕竟资源是有限的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

   

from multiprocessing import Pool

  

from math import hypot

from random import random

import time

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

  

def calcPi(nbFutures, tries):

    ts = time.time()

    p = Pool(5)

    result = p.map(test, [tries] * nbFutures)

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

if __name__ == '__main__':

    print("pi = {}".format(calcPi(3000, 4000)))

   

 

gevent (伪线程)

不论是gevent还是eventlet,因为不存在实际的并发,响应时间和没有并发区别不大,这个和测试结果一致。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

   

import gevent

from math import hypot

from random import random

import time

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

  

def calcPi(nbFutures, tries):

    ts = time.time()

    jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]

    gevent.joinall(jobs, timeout=2)

    ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000,4000)

   

 

eventlet (伪线程)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

   

from math import hypot

from random import random

import eventlet

import time

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

  

def calcPi(nbFutures, tries):

    ts = time.time()

    pool = eventlet.GreenPool()

    result = pool.imap(test, [tries] * nbFutures)

      

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000,4000)

   

 

SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

   

from math import hypot

from random import random

from scoop import futures

  

import time

  

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

  

def calcPi(nbFutures, tries):

    ts = time.time()

    expr = futures.map(test, [tries] * nbFutures)

    ret = 4. * sum(expr) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

if __name__ == "__main__":

    print("pi = {}".format(calcPi(3000, 4000)))

   

 

Celery

任务代码

1

2

3

4

5

6

7

8

9

10

11

   

from celery import Celery

  

from math import hypot

from random import random

   

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

   

@app.task

def test(tries):

    return sum(hypot(random(), random()) < 1 for _ in range(tries))

   

 

客户端代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

   

from celery import group

from tasks import test

  

import time

  

def calcPi(nbFutures, tries):

    ts = time.time()

    result = group(test.s(tries) for i in xrange(nbFutures))().get()

      

    ret = 4. * sum(result) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000, 4000)

   

使用Celery做并发的测试结果出乎意料(环境是单机,4frefork的并发,消息broker是rabbitMQ),是所有测试用例里最糟糕的,响应时间是没有并发的5~6倍。这也许是因为控制协调的开销太大。对于这样的计算任务,Celery也许不是一个好的选择。

asyncoro

Asyncoro的测试结果和非并发保持一致。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

   

import asyncoro

  

from math import hypot

from random import random

import time

  

def test(tries):

    yield sum(hypot(random(), random()) < 1 for _ in range(tries))

  

  

def calcPi(nbFutures, tries):

    ts = time.time()

    coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]

    ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)

    span = time.time() - ts

    print "time spend ", span

    return ret

  

print calcPi(3000,4000)

   

IO密集型

IO密集型的任务是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能处理多少个请求时WEB服务器的重要指标。

我们就以网页读取作为最简单的例子

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

   

from math import hypot

import time

import urllib2

  

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

  

def test(url):

    return urllib2.urlopen(url).read()

  

def testIO(nbFutures):

    ts = time.time()

    map(test, urls * nbFutures)

  

    span = time.time() - ts

    print "time spend ", span

  

testIO(10)

   

在不同并发库下的代码,由于比较类似,我就不一一列出。大家可以参考计算密集型中代码做参考。

通过测试我们可以发现,对于IO密集型的任务,使用多线程,或者是多进程都可以有效的提高程序的效率,而使用伪线程性能提升非常显著,eventlet比没有并发的情况下,响应时间从9秒提高到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,非常方便。这里推荐使用线程或者伪线程,因为在响应时间类似的情况下,线程和伪线程消耗的资源更少。

总结

Python提供了不同的并发方式,对应于不同的场景,我们需要选择不同的方式进行并发。选择合适的方式,不但要对该方法的原理有所了解,还应该做一些测试和试验,数据才是你做选择的最好参考。

 以上就是使用Python进行并发编程的内容,更多相关文章请关注PHP中文网(www.php.cn)!


本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

PHP和Python:解释了不同的范例 PHP和Python:解释了不同的范例 Apr 18, 2025 am 12:26 AM

PHP主要是过程式编程,但也支持面向对象编程(OOP);Python支持多种范式,包括OOP、函数式和过程式编程。PHP适合web开发,Python适用于多种应用,如数据分析和机器学习。

在PHP和Python之间进行选择:指南 在PHP和Python之间进行选择:指南 Apr 18, 2025 am 12:24 AM

PHP适合网页开发和快速原型开发,Python适用于数据科学和机器学习。1.PHP用于动态网页开发,语法简单,适合快速开发。2.Python语法简洁,适用于多领域,库生态系统强大。

Python vs. JavaScript:学习曲线和易用性 Python vs. JavaScript:学习曲线和易用性 Apr 16, 2025 am 12:12 AM

Python更适合初学者,学习曲线平缓,语法简洁;JavaScript适合前端开发,学习曲线较陡,语法灵活。1.Python语法直观,适用于数据科学和后端开发。2.JavaScript灵活,广泛用于前端和服务器端编程。

PHP和Python:深入了解他们的历史 PHP和Python:深入了解他们的历史 Apr 18, 2025 am 12:25 AM

PHP起源于1994年,由RasmusLerdorf开发,最初用于跟踪网站访问者,逐渐演变为服务器端脚本语言,广泛应用于网页开发。Python由GuidovanRossum于1980年代末开发,1991年首次发布,强调代码可读性和简洁性,适用于科学计算、数据分析等领域。

vs code 可以在 Windows 8 中运行吗 vs code 可以在 Windows 8 中运行吗 Apr 15, 2025 pm 07:24 PM

VS Code可以在Windows 8上运行,但体验可能不佳。首先确保系统已更新到最新补丁,然后下载与系统架构匹配的VS Code安装包,按照提示安装。安装后,注意某些扩展程序可能与Windows 8不兼容,需要寻找替代扩展或在虚拟机中使用更新的Windows系统。安装必要的扩展,检查是否正常工作。尽管VS Code在Windows 8上可行,但建议升级到更新的Windows系统以获得更好的开发体验和安全保障。

visual studio code 可以用于 python 吗 visual studio code 可以用于 python 吗 Apr 15, 2025 pm 08:18 PM

VS Code 可用于编写 Python,并提供许多功能,使其成为开发 Python 应用程序的理想工具。它允许用户:安装 Python 扩展,以获得代码补全、语法高亮和调试等功能。使用调试器逐步跟踪代码,查找和修复错误。集成 Git,进行版本控制。使用代码格式化工具,保持代码一致性。使用 Linting 工具,提前发现潜在问题。

vscode 扩展是否是恶意的 vscode 扩展是否是恶意的 Apr 15, 2025 pm 07:57 PM

VS Code 扩展存在恶意风险,例如隐藏恶意代码、利用漏洞、伪装成合法扩展。识别恶意扩展的方法包括:检查发布者、阅读评论、检查代码、谨慎安装。安全措施还包括:安全意识、良好习惯、定期更新和杀毒软件。

vscode怎么在终端运行程序 vscode怎么在终端运行程序 Apr 15, 2025 pm 06:42 PM

在 VS Code 中,可以通过以下步骤在终端运行程序:准备代码和打开集成终端确保代码目录与终端工作目录一致根据编程语言选择运行命令(如 Python 的 python your_file_name.py)检查是否成功运行并解决错误利用调试器提升调试效率

See all articles