process

Multi-threading in Python is not really multi-threading. If you want to make full use of the resources of a multi-core CPU, you need to use multiple processes in most cases in Python. Python provides a very easy-to-use multiprocessing package. You only need to define a function and Python will do everything else. With the help of this package, the conversion from single process to concurrent execution can be easily accomplished. Multiprocessing supports sub-processes, communication and sharing data, performs different forms of synchronization, and provides components such as Process, Queue, Pipe, and Lock.

1. Class Process

Create a process class: Process([group [, target [, name [, args [, kwargs]]]]])

target represents the calling object

args represents the positional parameter tuple of the calling object

kwargs represents the dictionary of the calling object

name is an alias

group is essentially not used

Let’s look at an example of creating a function and using it as multiple processes:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval, name):
    print(name + '【start】')
    time.sleep(interval)
    print(name + '【end】')
if __name__ == "__main__":
    p1 = multiprocessing.Process(target=worker, args=(2, '两点水1'))
    p2 = multiprocessing.Process(target=worker, args=(3, '两点水2'))
    p3 = multiprocessing.Process(target=worker, args=(4, '两点水3'))
    p1.start()
    p2.start()
    p3.start()
    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

Output results:

Multiple process output results

2. Create a process into a class

Of course we can also create a process into a class, as in the following example, when process p calls start(), it is automatically called run() method.

# -*- coding: UTF-8 -*-
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    def run(self):
        n = 5
        while n > 0:
            print("当前时间: {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1
if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

The output results are as follows:

Create process class

3. Daemon attribute

I want to know what the daemon attribute is used for. Take a look at the following two examples, one with the daemon attribute added and one without. Compare the output results:

Example without adding the deamon attribute:

# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print('【EMD】')

Output result:

【EMD】
工作开始时间:Mon Oct  9 17:47:06 2017
工作结果时间:Mon Oct  9 17:47:09 2017

In the above example, process p adds the daemon attribute:

# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print('【EMD】')

Output result:

【EMD】

According to the output result, if the daemon attribute is added to the child process, then when the main process ends , the child process will also end. So no information about the child process is printed.

4. Join method

Continue with the above example, what should we do if we want the child thread to finish executing?

Then we can use the join method. The main function of the join method is to block the current process until the process that calls the join method is executed, and then continue to execute the current process.

So look at the example of adding the join method:

import multiprocessing
import time
def worker(interval):
    print('工作开始时间:{0}'.format(time.ctime()))
    time.sleep(interval)
    print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print('【EMD】')

The output result:

工作开始时间:Tue Oct 10 11:30:08 2017
工作结果时间:Tue Oct 10 11:30:11 2017
【EMD】

5, Pool

If many child processes are needed, do we need to create them one by one?

Of course not, we can use the process pool method to create child processes in batches.

The example is as follows:

# -*- coding: UTF-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print('进程的名称:{0} ;进程的PID: {1} '.format(name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('进程 {0} 运行了 {1} 秒'.format(name, (end - start)))
if __name__ == '__main__':
    print('主进程的 PID:{0}'.format(os.getpid()))
    p = Pool(4)
    for i in range(6):
        p.apply_async(long_time_task, args=(i,))
    p.close()
    # 等待所有子进程结束后在关闭主进程
    p.join()
    print('【End】')

The output result is as follows:

主进程的 PID:7256
进程的名称:0 ;进程的PID: 1492
进程的名称:1 ;进程的PID: 12232
进程的名称:2 ;进程的PID: 4332
进程的名称:3 ;进程的PID: 11604
进程 2 运行了 0.6500370502471924 秒
进程的名称:4 ;进程的PID: 4332
进程 1 运行了 1.0830621719360352 秒
进程的名称:5 ;进程的PID: 12232
进程 5 运行了 0.029001712799072266 秒
进程 4 运行了 0.9720554351806641 秒
进程 0 运行了 2.3181326389312744 秒
进程 3 运行了 2.5331451892852783 秒
【End】

There is one thing to note here: The Pool object calling the join() method will wait for all child processes to execute Completed, you must call close() before calling join(). After calling close(), you cannot continue to add new Processes.

Please pay attention to the output results. Child processes 0, 1, 2, and 3 are executed immediately, while child process 4 has to wait for the completion of a previous child process before executing. This is because the default size of the Pool is in our On my computer it is 4, so at most 4 processes are executing simultaneously. This is an intentional design limitation of Pool, not a limitation of the operating system. If you change it to:

p = Pool(5)

, you can run 5 processes at the same time.

6. Inter-process communication

Processes definitely need to communicate. The operating system provides many mechanisms to achieve inter-process communication. Python's multiprocessing module wraps the underlying mechanism and provides multiple ways to exchange data, such as Queue and Pipes.

Take Queue as an example, create two child processes in the parent process, one writes data to Queue, and the other reads data from Queue:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process, Queue
import os, time, random
def write(q):
    # 写数据进程
    print('写进程的PID:{0}'.format(os.getpid()))
    for value in ['两点水', '三点水', '四点水']:
        print('写进 Queue 的值为:{0}'.format(value))
        q.put(value)
        time.sleep(random.random())
def read(q):
    # 读取数据进程
    print('读进程的PID:{0}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('从 Queue 读取的值为:{0}'.format(value))
if __name__ == '__main__':
    # 父进程创建 Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程 pw
    pw.start()
    # 启动子进程pr
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止
    pr.terminate()

The output result is:

读进程的PID:13208
写进程的PID:10864
写进 Queue 的值为:两点水
从 Queue 读取的值为:两点水
写进 Queue 的值为:三点水
从 Queue 读取的值为:三点水
写进 Queue 的值为:四点水
从 Queue 读取的值为:四点水
Continuing Learning
||
submitReset Code