Examples of multi-process and multi-threading in Python (1)

零下一度
Release: 2017-06-01 10:02:44
Original
1714 people have browsed it

一、背景

  最近在Azkaban的测试工作中,需要在测试环境下模拟线上的调度场景进行稳定性测试。故而重操python旧业,通过python编写脚本来构造类似线上的调度场景。在脚本编写过程中,碰到这样一个需求:要在测试环境创建10000个作业流。

  最开始的想法是在一个azkaban project下循环调用10000次create job接口(每个Flow只包含一个job)。由于azkaban它本身没有增加/删除作业流的接口,所有的作业流修改、增加、删除其实都是通过重新上传项目zip包实现的,相应地每次调猛犸前端的create job接口,实际上是在猛犸端对zip包的内容进行了重新的整合后再重新上传zip包到azkaban,整个过程可以拆解成如下过程:解压zip包获得zip包内容,变更zip包内的文件内容,重新打包zip包,上传到azkaban。因此,随着循环次数越往后,zip包包含的内容会越多,接口执行一次的时间就越长。实践发现,第一次调该接口的时间大致不到1秒,到循环1000次的时候接口调用一次的时间就达到了将近3秒。因此,如果指望一个循环10000次来构造该场景,显然要耗费巨大的时间。

  在此背景下, 自然而然地就想到用多进程/多线程的方式来处理该问题。

二、“多任务”的操作系统基础

  大家都知道,操作系统可以同时运行多个任务。比如你一边听音乐,一边聊IM,一边写博客等。现在的cpu大都是多核的,但即使是过去的单核cpu也是支持多任务并行执行。

  单核cpu执行多任务的原理:操作系统交替轮流地执行各个任务。先让任务1执行0.01秒,然后切换到任务2执行0.01秒,再切换到任务3执行0.01秒...这样往复地执行下去。由于cpu的执行速度非常快,所以使用者的主观感受就是这些任务在并行地执行。

  多核cpu执行多任务的原理:由于实际应用中,任务的数量往往远超过cpu的核数,所以操作系统实际上是把这些多任务轮流地调度到每个核心上执行。

  对于操作系统来说,一个应用就是一个进程。比如打开一个浏览器,它是一个进程;打开一个记事本,它是一个进程。每个进程有它特定的进程号。他们共享系统的内存资源。进程是操作系统分配资源的最小单位

  而对于每一个进程而言,比如一个视频播放器,它必须同时播放视频和音频,就至少需要同时运行两个“子任务”,进程内的这些子任务就是通过线程来完成。线程是最小的执行单元。一个进程它可以包含多个线程,这些线程相互独立,同时又共享进程所拥有的资源。

三、Python多进程编程

  1. multiprocessing

  multiprocessing是Python提供的一个跨平台的多进程模块,通过它可以很方便地编写多进程程序,在不同的平台(Unix/Linux, Windows)都可以执行。

  下面就是使用multiprocessing编写多进程程序的代码:  

#!/usr/bin/python# -*- coding: utf-8 -*author = 'zni.feng'import  sys
reload (sys)
sys.setdefaultencoding('utf-8')from multiprocessing import Processimport osimport time#子进程fundef child_projcess_fun(name):    print 'Child process %s with processId %s starts.' % (name, os.getpid())
    time.sleep(3)    print 'Child process %s with processId %s ends.' % (name, os.getpid())if name == "main":    print 'Parent processId is: %s.' % os.getpid()
    p = Process(target = child_projcess_fun, args=('zni',))    print 'Process starts'
    p.start() #开始进程
    p.join() #等待子进程结束后再继续往下执行
    print 'Process ends.'
Copy after login

程序的输出:

Parent processId is: 11076.
Process starts
Child process zni with processId 11077 starts.
Child process zni with processId 11077 ends.
Process ends.
[Finished in 3.1s]
Copy after login

  2. Pool

  某些情况下,我们希望批量创建多个子进程,或者给定子进程数的上限,避免无限地消耗系统的资源。通过Pool(进程池)的方式,就可以完成这项工作,下面是使用Pool的代码:

 1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import  sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7  8 from multiprocessing import Pool 9 import os, time10 11 def child_process_test(name, sleep_time):12     print 'Child process %s with processId %s starts.' % (name, os.getpid())13     time.sleep(sleep_time)14     print 'Child process %s with processId %s ends.' % (name, os.getpid())15 16 if name == "main":17     print 'Parent processId is: %s.' % os.getpid()18     p = Pool()  #进程池默认大小是cpu的核数19     #p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程20     for i in range(5):21         p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求22 23     print 'Child processes are running.'24     p.close()25     p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行26     print 'All Processes end.'
Copy after login

程序的输出:

Parent processId is: 5050.
Child processes are running.
Child process zni_0 with processId 5052 starts.
Child process zni_1 with processId 5053 starts.
Child process zni_2 with processId 5054 starts.
Child process zni_3 with processId 5055 starts.
Child process zni_0 with processId 5052 ends.
Child process zni_4 with processId 5052 starts.
Child process zni_1 with processId 5053 ends.
Child process zni_2 with processId 5054 ends.
Child process zni_3 with processId 5055 ends.
Child process zni_4 with processId 5052 ends.
All Processes end.
[Finished in 6.2s]
Copy after login

close()方法和terminate()方法的区别:

  close:关闭进程池,使之不能再添加新的进程。已经执行的进程会等待继续执行直到结束。

  terminate:强制终止线程池,正在执行的进程也会被强制终止。

  3. 进程间通信

  Python的multiprocessing模块提供了多种进程间通信的方式,如Queue、Pipe等。

  3.1 Queue、Lock

  Queue是multiprocessing提供的一个模块,它的数据结构就是"FIFO——first in first out"的队列,常用的方法有:put(object)入队;get()出队;empty()判断队列是否为空。

  Lock:当多个子进程对同一个queue执行写操作时,为了避免并发操作产生冲突,可以通过加锁的方式使得某个子进程对queue拥有唯一的写权限,其他子进程必须等待该锁释放后才能再开始执行写操作。

  下面就是使用Queue进行进程间通信的代码:在父进程里创建两个子进程,分别实现对queue的读和写操作

 1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import  sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Queue, Lock 8 import os, time, random 9 #写数据进程10 def write(q, lock, name):11     print 'Child Process %s starts' % name12     #获得锁13     lock.acquire()14     for value in ['A' , 'B', 'C']:15         print 'Put %s to queue...' % value16         q.put(value)17         time.sleep(random.random())18     #释放锁19     lock.release()20     print 'Child Process %s ends' % name21 22 #读数据进程23 def read(q, lock, name):24     print 'Child Process %s starts' % name25     while True: #持续地读取q中的数据26         value =q.get()27         print 'Get %s from queue.' % value28     print 'Child Process %s ends' % name29 30 if name == "main":31     #父进程创建queue,并共享给各个子进程32     q= Queue()33     #创建锁34     lock = Lock()35     #创建第一个“写”子进程36     pw = Process(target = write , args=(q, lock, 'WRITE', ))37     #创建“读”进程38     pr = Process(target = read, args=(q,lock, 'READ',))39     #启动子进程pw,写入:40     pw.start()41     #启动子进程pr,读取:42     pr.start()43     #等待pw结束:44     pw.join()45     #pr是个死循环,通过terminate杀死:46     pr.terminate()47     print 'Test finish.'
Copy after login

  程序的输出结果为:

Child Process WRITE starts
Put A to queue...
Child Process READ starts
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Child Process WRITE ends
Test finish.
[Finished in 2.0s]
Copy after login

  3.2 Pipe

  Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
  下面就是使用Pipe通信的代码:

 1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 author = 'zni.feng' 4 import  sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Pipe 8 import os, time, random 9 10 #发送数据进程11 def send(child_pipe, name):12     print 'Child Process %s starts' % name13     child_pipe.send('This is Mr.Ni')14     child_pipe.close()15     time.sleep(random.random())16     print 'Child Process %s ends' % name17 18 #接收数据进程19 def recv(parent_pipe, name):20     print 'Child Process %s starts' % name21     print parent_pipe.recv()22     time.sleep(random.random())23     print 'Child Process %s ends' % name24 25 if name == "main":26     #创建管道27     parent,child = Pipe()28     #创建send进程29     ps = Process(target=send, args=(child, 'SEND'))30     #创建recv进程31     pr = Process(target=recv, args=(parent, 'RECEIVE'))32     #启动send进程33     ps.start()34     #等待send进程结束35     ps.join()36     #启动recv进程37     pr.start()38     #等待recv进程结束39     pr.join()40     print 'Test finish.'
Copy after login

  程序的输出结果如下:

Child Process SEND starts
Child Process SEND ends
Child Process RECEIVE starts
This is Mr.Ni
Child Process RECEIVE ends
Test finish.
[Finished in 1.8s]
Copy after login

【相关推荐】

1. Python中多进程与多线程实例(二)编程方法

2. Python中推荐使用多进程而不是多线程?分享推荐使用多进程的原因

3. python多进程快还是多线程快?

4. 关于Python进程、线程、协程详细介绍

5. Python 并发编程之线程池/进程池

The above is the detailed content of Examples of multi-process and multi-threading in Python (1). For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!