行程

Python 中的多線程其實不是真正的多線程,如果想要充分地使用多核心 CPU 的資源,在 Python 中大部分情況都需要使用多進程。 Python 提供了非常好用的多進程套件 multiprocessing,只需要定義一個函數,Python 會完成其他所有事情。借助這個套件,可以輕鬆完成從單一進程到並發執行的轉換。 multiprocessing 支援子進程、通訊和共享資料、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等元件。

1、類別Process

建立流程的類別:Process([group [, target [, name [, args [, kwargs]]]]])

target 表示呼叫物件

args 表示呼叫物件的位置參數元組

#kwargs表示呼叫物件的字典

name為別名

group實質上不使用

下面看一個創建函數並將其作為多個進程的例子:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval, name):
    print(name + '【start】')
    time.sleep(interval)
    print(name + '【end】')
if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker, args=(2, '两点水1'))
    p2 = multiprocessing.Process(target=worker, args=(3, '两点水2'))
    p3 = multiprocessing.Process(target=worker, args=(4, '两点水3'))
    p1.start()
    p2.start()
    p3.start()
    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

輸出的結果:

多進程輸出結果

2、把進程創建成類

當然我們也可以把進程創建成一個類,如下面的例子,當進程p 呼叫start() 時,自動調用run() 方法。

# -*- coding: UTF-8 -*-
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    def run(self):
        n = 5
        while n > 0:
            print("当前时间: {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1
if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

輸出結果如下:

建立進程類別

#3、daemon 屬性

想知道daemon 屬性有什麼用,看下面兩個例子吧,一個加了daemon 屬性,一個沒有加,對比輸出的結果:

沒有加deamon 屬性的例子:

# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print('【EMD】')

輸出結果:

【EMD】
工作开始时间:Mon Oct  9 17:47:06 2017
工作结果时间:Mon Oct  9 17:47:09 2017

在上面範例中,進程p 新增daemon 屬性:

# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print('【EMD】')

輸出結果:

【EMD】

根據輸出結果可見,如果在子進程中新增了daemon 屬性,那麼當主進程結束的時候,子進程也會跟著結束。所以沒有列印子進程的資訊。

4、join 方法

結合上面的範例繼續,如果我們想要讓子執行緒執行完該怎麼做呢?

那麼我們可以用到 join 方法,join 方法的主要作用是:阻塞目前進程,直到呼叫 join 方法的那個進程執行完,然後再繼續執行目前進程。

因此看下加了join 方法的例子:

import multiprocessing
import time
def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print('【EMD】')

輸出的結果:

工作开始时间:Tue Oct 10 11:30:08 2017
工作结果时间:Tue Oct 10 11:30:11 2017
【EMD】

5、Pool

如果需要很多的子進程,我們就需要一個一個的去創建嗎?

當然不用,我們可以使用進程池的方法批次建立子進程。

範例如下:

# -*- coding: UTF-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print('进程的名称:{0} ;进程的PID: {1} '.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('进程 {0} 运行了 {1} 秒'.format(name, (end - start)))
if __name__ == '__main__':
    print('主进程的 PID:{0}'.format(os.getpid()))
    p = Pool(4)
    for i in range(6):
        p.apply_async(long_time_task, args=(i,))
    p.close()
    # 等待所有子进程结束后在关闭主进程
    p.join()
    print('【End】')

輸出的結果如下:

主进程的 PID:7256
进程的名称:0 ;进程的PID: 1492
进程的名称:1 ;进程的PID: 12232
进程的名称:2 ;进程的PID: 4332
进程的名称:3 ;进程的PID: 11604
进程 2 运行了 0.6500370502471924 秒
进程的名称:4 ;进程的PID: 4332
进程 1 运行了 1.0830621719360352 秒
进程的名称:5 ;进程的PID: 12232
进程 5 运行了 0.029001712799072266 秒
进程 4 运行了 0.9720554351806641 秒
进程 0 运行了 2.3181326389312744 秒
进程 3 运行了 2.5331451892852783 秒
【End】

這裡有一點要注意: Pool 物件呼叫join() 方法會等待所有子程序執行完畢,呼叫join() 之前必須先呼叫close() ,呼叫close() 之後就不能繼續加入新的Process 了。

請注意輸出的結果,子進程0,1,2,3是立刻執行的,而子進程4 要等待前面某個子進程完成後才執行,這是因為Pool 的預設大小在我的電腦上是4,因此,最多同時執行4 個行程。這是 Pool 有意設計的限制,並不是作業系統的限制。如果改成:

p = Pool(5)

就可以同時跑 5 個進程。

6、進程間通訊

Process 之間一定是需要通訊的,作業系統提供了許多機制來實現進程間的通訊。 Python 的 multiprocessing 模組包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換資料。

以 Queue 為例,在父進程中建立兩個子進程,一個往 Queue 寫數據,一個從 Queue 讀取資料:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process, Queue
import os, time, random
def write(q):
    # 写数据进程
    print('写进程的PID:{0}'.format(os.getpid()))
    for value in ['两点水', '三点水', '四点水']:
        print('写进 Queue 的值为:{0}'.format(value))
        q.put(value)
        time.sleep(random.random())
def read(q):
    # 读取数据进程
    print('读进程的PID:{0}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('从 Queue 读取的值为:{0}'.format(value))
if __name__ == '__main__':
    # 父进程创建 Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程 pw
    pw.start()
    # 启动子进程pr
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止
    pr.terminate()

輸出的結果為:

读进程的PID:13208
写进程的PID:10864
写进 Queue 的值为:两点水
从 Queue 读取的值为:两点水
写进 Queue 的值为:三点水
从 Queue 读取的值为:三点水
写进 Queue 的值为:四点水
从 Queue 读取的值为:四点水
繼續學習