How to use multiprocessing to implement inter-process communication in Python?
王林
Release: 2023-05-08 21:31:06
forward
2135 people have browsed it
1. Why should we master inter-process communication?
The multi-threaded code efficiency of Python is restricted by GIL and cannot be accelerated by multi-core CPU, while multi-process This method can bypass the GIL, take advantage of multi-CPU acceleration, and significantly improve the performance of the program. However, inter-process communication is an issue that must be considered. A process is different from a thread. A process has its own independent memory space and cannot use global variables to transfer data between processes.
In actual project requirements, there are often intensive calculations or real-time tasks, and sometimes a large amount of data needs to be transferred between processes, such as pictures, large objects, etc.
, if the data is transferred through file serialization or network interface, it is difficult to meet the real-time requirements. Using redis, or the third-party message queue package of kaffka, rabbitMQ will complicate the system.
The Python multiprocessing module itself provides various very efficient inter-process communication methods such as message mechanism, synchronization mechanism, and shared memory
.
Understanding and mastering the use of various methods of Python inter-process communication, as well as security mechanisms, can help greatly improve program running performance.
2. Introduction to various communication methods between processes The main methods of inter-process communication are summarized as follows
About inter-process Memory safety of communication
Memory safety means that shared variable exceptions may occur between multiple processes due to simultaneous grabbing, accidental destruction, etc. The Queue, Pipe, Lock, and Event objects provided by the Multiprocessing module have all implemented inter-process communication security mechanisms. Using shared memory communication, you need to track and destroy these shared memory variables yourself in the code, otherwise they may be scrambled or not destroyed normally. Cause system abnormality. Unless the developer is very clear about the usage characteristics of shared memory, it is not recommended to use this shared memory directly, but to use the shared memory through the Manager manager.
Memory Manager Manager
Multiprocessing provides the memory manager Manager class, which can uniformly solve the memory security issues of process communication. Various shared data can be added to the manager, including list , dict, Queue, Lock, Event, Shared Memory, etc., are tracked and destroyed uniformly. 3. Message mechanism communication
1) Pipe Pipe communication method is similar to the simple socket channel in 1, both ends can send and receive messages.
Pipe object construction method:
parent_conn, child_conn = Pipe(duplex=True/False)
Copy after login
Parameter description
duplex=True, the pipeline is two-way communication
duplex=False, the pipeline is one-way communication, only child_conn can send messages, and parent_conn can only receive messages.
Sample code:
from multiprocessing import Process, Pipe
def myfunction(conn):
conn.send(['hi!! I am Python'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=myfunction, args=(child_conn,))
p.start()
print (parent_conn.recv() )
p.join()
Copy after login
2) Message Queue Queue communication methodThe Queue class of Multiprocessing was modified on python queue 3.0 version. It can be easily implemented to transfer data between producers and messagers, and the Queue module of Multiprocessing implements the lock security mechanism.
Queue module provides a total of 3 types of queues.
(1) FIFO queue, first in first out,
class queue.Queue(maxsize=0)
Copy after login
(2) LIFO queue, last in first out, actually a stack
class queue.LifoQueue(maxsize=0)
Copy after login
(3) With priority queue, the lowest priority entry value is listed first
class queue.PriorityQueue(maxsize=0)
Copy after login
The main method of Multiprocessing.Queue class:
method
Description
queue.qsize()
Return queue length
queue.full()
If the queue is full, return True, otherwise return False
queue.empty()
If the queue is empty, return True , otherwise return False
import multiprocessing
def producer(numbers, q):
for x in numbers:
if x % 2 == 0:
if q.full():
print("queue is full")
break
q.put(x)
print(f"put {x} in queue by producer")
return None
def consumer(q):
while not q.empty():
print(f"take data {q.get()} from queue by consumer")
return None
if __name__ == "__main__":
# 设置1个queue对象,最大长度为5
qu = multiprocessing.Queue(maxsize=5,)
# 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写
p5 = multiprocessing.Process(
name="producer-1",
target=producer,
args=([random.randint(1, 100) for i in range(0, 10)], qu)
)
p5.start()
p5.join()
#创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读
p6 = multiprocessing.Process(
name="consumer-1",
target=consumer,
args=(qu,)
)
p6.start()
p6.join()
print(qu.qsize())
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once
>>> buffer[4] = 100 # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5]) # Access via shm_a
b'howdy'
>>> shm_b.close() # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink() # Call unlink only once to release the shared memory
>>> with SharedMemoryManager() as smm:
... sl = smm.ShareableList(range(2000))
... # Divide the work among two processes, storing partial results in sl
... p1 = Process(target=do_work, args=(sl, 0, 1000))
... p2 = Process(target=do_work, args=(sl, 1000, 2000))
... p1.start()
... p2.start() # A multiprocessing.Pool might be more efficient
... p1.join()
... p2.join() # Wait for all work to complete in both processes
... total_result = sum(sl) # Consolidate the partial results now in sl
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8))
Copy after login
The above is the detailed content of How to use multiprocessing to implement inter-process communication in Python?. For more information, please follow other related articles on the PHP Chinese website!
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn