This article brings you a detailed introduction (code example) about multi-processing in python. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.
This section talks about learning Python’s multi-process.
Multi-processMultiprocessing
is similar to multi-threading, they are both used in pythonParallel
But since there is threading, why does Python have a multiprocessing? The reason is very simple, it is to make up for some disadvantages of threading, such as GIL
.
Using multiprocessing is also very simple. If you have a certain understanding of threading, your time to enjoy is here. Because python makes the use of multiprocessing and threading almost the same. This makes it easier for us to get started. It is also easier to play. The power of your computer's multi-core system!
import multiprocessing as mp import threading as td def job(a,d): print('aaaaa') t1 = td.Thread(target=job,args=(1,2)) p1 = mp.Process(target=job,args=(1,2)) t1.start() p1.start() t1.join() p1.join()
As can be seen from the above usage comparison code, threads and processes are used in similar ways.
You need to add a statement defining the main function when using it
if __name__=='__main__':
Complete application code:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_test.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(a, d): print a, d if __name__ == '__main__': p1 = mp.Process(target=job, args=(1, 2)) p1.start() p1.join()
The running environment must be in the terminal environment Under other editing tools, there may be no printed result after running. The result printed after running in the terminal is:
➜ baseLearn python ./process/process_test.py 1 2 ➜ baseLearn
Queue is Put the operation results of each core or thread in the queue, wait until each thread or core has finished running, then take the results out of the queue, and continue loading the operation. The reason is very simple. Functions called by multiple threads cannot have return values, so Queue is used to store the results of multiple thread operations
process_queue.py
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp # 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果 def job(q): res = 0 for i in range(1000): res += i + i**2 + i**3 q.put(res) #queue if __name__ == '__main__': q = mp.Queue() p1 = mp.Process(target=job, args=(q,)) p2 = mp.Process(target=job, args=(q,)) # 分别启动、连接两个线程 p1.start() p2.start() p1.join() p2.join() # 上面是分两批处理的,所以这里分两批输出,将结果分别保存 res1 = q.get() res2 = q.get() print res1,res2
Print the output results:
➜ python ./process/process_queue.py 249833583000 249833583000
Process Pool
means that we put the things we want to run into the pool, Python will solve the problem of multiple processes by itself
.
First import multiprocessing
and define job()
import multiprocessing as mp def job(x): return x*x
Then we define a Pool
pool = mp.Pool()
After we have the pool, we can make the pool correspond to a certain function, and we throw data into the pool , the pool will return the value returned by the function. The difference between Pool
and the previous Process is that the
function thrown to the Pool has a return value , while the of Process
does not return value.
map() to get the result. In
map() you need to put the function and the value that needs to be iterated, and then it will be automatically allocated to the CPU Core, return result
res = pool.map(job, range(10))
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_queue.py @time: 18/8/26 01:12 """ import multiprocessing as mp def job(x): return x*x # 注意这里的函数有return返回值 def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
➜ baseLearn python ./process/process_pool.py [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Pool actually calls multiple cores? We can increase the number of iterations, and then open the CPU load to see the CPU operation
processes parameter in
Pool
def multicore(): pool = mp.Pool(processes=3) # 定义CPU核数量为3 res = pool.map(job, range(10)) print(res)
PoolIn addition to
map(), there is also a way to return results, that is
apply_async() .
apply_async() can only pass one value, it will only put one core into the operation, but when passing in the value, you should pay attention to iterable, so in You need to add a comma after passing in the value, and you need to use the get() method to get the return value
def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get())
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map() 4 # apply_async()
PoolThe default call is the number of CPU cores. You can customize the number of CPU cores by passing in the processes parameter.
map() Put in iteration Parameters, return multiple results
apply_async()You can only put a set of parameters and return one result. If you want to get the effect of map(), you need to iterate
Only shared memory can allow communication between CPUs.
Value.
import multiprocessing as mp value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14)
d and
i parameters are used to set the data type,
d represents a double precision floating point type,
i represents a signed
integer type .
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' |
signed char | int | 1 |
'B' |
unsigned char | int | 1 |
'u' |
Py_UNICODE | Unicode character | 2 |
'h' |
signed short | int | 2 |
'H' |
unsigned short | int | 2 |
'i' |
signed int | int | 2 |
'I' |
unsigned int | int | 2 |
'l' |
signed long | int | 4 |
'L' |
unsigned long | int | 4 |
'q' |
signed long long | int | 8 |
'Q' |
unsigned long long | int | 8 |
'f' |
float | float | 4 |
'd' |
double | float | 8 |
在Python的 mutiprocessing
中,有还有一个Array
类,可以和共享内存交互,来实现在进程之间共享数据。
array = mp.Array('i', [1, 2, 3, 4])
这里的Array
和numpy中的不同,它只能是一维
的,不能是多维的。同样和Value
一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.
错误形式
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list """ TypeError: an integer is required """
让我们看看没有加进程锁时会产生什么样的结果。
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_no_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num): for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) def multicore(): v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1)) p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
在上面的代码中,我们定义了一个共享变量v
,两个进程都可以对它进行操作。 在job()中我们想让v
每隔0.1秒输出一次累加num
的结果,但是在两个进程p1
和p2
中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。
结果打印:
➜ baseLearn python ./process/process_no_lock.py 1 5 9 9 13 13 17 17 18 18 ➜ baseLearn
我们可以看到,进程1和进程2在相互抢
着使用共享内存v
。
为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
首先需要定义一个进程锁
l = mp.Lock() # 定义一个进程锁
然后将进程锁的信息传入各个进程中
p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=(v,3,l))
在job()
中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # v.value获取共享内存 print(v.value) l.release() # 释放
全部代码:
# -*- coding:utf-8 -*- """ @author: Corwien @file: process_lock.py @time: 18/8/26 09:22 """ import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.5) # 暂停0.5秒,让输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入 p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()
运行一下,让我们看看是否还会出现抢占资源的情况:
结果打印:
➜ baseLearn python ./process/process_lock.py 1 2 3 4 5 9 13 17 21 25
显然,进程锁保证了进程p1
的完整运行,然后才进行了进程p2
的运行
相关推荐:
The above is the detailed content of Detailed introduction to multi-process in python (code example). For more information, please follow other related articles on the PHP Chinese website!