Here is an example of a communication mechanism: We are all familiar with the word communication, for example, a person wants to call his girlfriend. Once the call is established, an implicit queue (note this terminology) is formed. At this time, this person will continue to tell his girlfriend information through dialogue, and this person's girlfriend is also listening. I think in most cases it's probably the other way around.
The two of them can be compared to two processes. The process of "this person" needs to send information to the process of "girlfriend", so it needs the help of a queue. Since the girlfriend needs to receive information in the queue at all times, she can do other things at the same time, which means that the communication between the two processes mainly relies on the queue.
This queue can support sending and receiving messages. "This person" is responsible for sending messages, while "girlfriend" is responsible for receiving messages.
Since the queue is the focus, let’s take a look at how to create the queue.
Still use the multiprocessing module and call the Queue function of this module to create the queue.
Queue function introduction: Call Queue to create Queue; it has a parameter mac_count that represents the maximum number of messages that can be created in the queue. If not passed, the default length is unlimited. After instantiating a queue object, you need to operate the queue object to put in and take out data.
Function name |
Introduction |
Parameters |
Return Value |
put |
Put message into queue |
message |
None |
# #get | Get queue message | None | str |
put function introduction: pass in data. It has one parameter message, which is a string type.
get function introduction: used to receive data in the queue. (Actually, this is a common json scenario. Many data transmissions are strings. The insertion and retrieval of queues use strings, so json is very suitable for this scenario.)
The next step is Let’s practice using queues.
Inter-process communication - queue demonstration case
The code example is as follows:
# coding:utf-8
import json
import multiprocessing
class Work(object): # 定义一个 Work 类
def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue
self.queue = queue
def send(self, message): # 定义一个 send(发送) 函数,传入 message
# [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
message = json.dumps(message)
self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去
def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
while 1:
result = self.queue.get() # 获取 '队列对象' --> queue 传入的message
# 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
res = json.loads(result)
except:
res = result
print('接收到的信息为:{}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
receive = multiprocessing.Process(target=work.receive)
send.start()
receive.start()
Copy after login
Exceptions encountered when using queues to establish inter-process communication
But here will An error message appears, as shown below:
The error screenshot example is as follows:
The error message here means that the file has not been found. In fact, when we use the queue to do put() and get(), an invisible lock is added, which is the .SemLock in the circle in the picture above. We don't need to care about the specific cause of this error. Solving this problem is actually very simple.
FileNotFoundError: [Errno 2] No such file or directory Exception solution
The only thing that needs to block the process is one of the send or receive sub-processes. Just block one of them. This is the theory. situation. But our receive subprocess is a while loop, which will always execute, so we only need to add a join to the send subprocess.
The solution diagram is as follows:
PS: Although the error problem has been solved, the program did not exit normally.
In fact, since our receive process is a while loop, we don’t know when it will be processed, and there is no way to terminate it immediately. So we need to use the terminate() function in the receive process to terminate the receiving end.
The running results are as follows:
Add data to the send function in batches
Create a new function and write it into a for loop to simulate batch adding to send Message
Then add a thread to this function that simulates sending data in batches.
The sample code is as follows:
# coding:utf-8
import json
import time
import multiprocessing
class Work(object): # 定义一个 Work 类
def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue
self.queue = queue
def send(self, message): # 定义一个 send(发送) 函数,传入 message
# [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
message = json.dumps(message)
self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去
def send_all(self): # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message
for i in range(20):
self.queue.put('第 {} 次循环,发送的消息为:{}'.format(i, i))
time.sleep(1)
def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
while 1:
result = self.queue.get() # 获取 '队列对象' --> queue 传入的message
# 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
res = json.loads(result)
except:
res = result
print('接收到的信息为:{}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
receive = multiprocessing.Process(target=work.receive)
send_all = multiprocessing.Process(target=work.send_all,)
send_all.start() # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的
send.start()
receive.start()
# send.join() # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。
send_all.join() # 所以我们只需要阻塞最长使用率的进程就可以了
receive.terminate()
Copy after login
The running results are as follows:
From the above figure we can see the two processes send and send_all You can send messages through the instantiated Queue object of queue. The same receive function will also print out the messages passed in by the two processes.
Section
In this chapter, we successfully used queues to achieve cross-process communication, and also mastered the operation skills of queues. In a queue, one end (here we are demonstrating the send end) adds relevant information through the put method, and the other end uses the get method to obtain relevant information; the two processes cooperate with each other to achieve the effect of one process communication.
In addition to queues, processes can also communicate using pipes, semaphores, and shared memory. If you are interested, you can learn about these methods. You can expand it yourself.
Other ways of inter-process communication - Supplement
Python provides a variety of ways to communicate between processes, including signals, pipes, message queues, semaphores, shared memory, sockets, etc.
There are two main methods: Queue and Pipe. Queue is used to implement communication between multiple processes, and Pipe is the communication between two processes.
1. Pipes: divided into anonymous pipes and named pipes
Anonymous pipes: Apply for a fixed-size buffer in the kernel. The program has the right to write and read. Generally, fock is used. Function implements communication between parent and child processes
Named pipe: Apply for a fixed-size buffer in memory. The program has the right to write and read. Processes that are not related by blood can also communicate between processes
Features: Oriented to byte stream; life cycle follows the kernel; built-in synchronization mutual exclusion mechanism; half-duplex, one-way communication, two pipes realize two-way communication
One rewriting method is: in operation A queue is established in the system kernel, which contains multiple datagram elements. Multiple processes can access the queue through specific handles. Message queues can be used to send data from one process to another. Each data block is considered to have a type, and the data blocks received by the receiver process can have different types. Message queues also have the same shortcomings as pipes, that is, there is an upper limit on the maximum length of each message, there is an upper limit on the total number of bytes in each message queue, and there is also an upper limit on the total number of message queues on the system
Features: The message queue can be considered as a global linked list. The linked list nodes store the type and content of the datagram and are marked with the identifier of the message queue; the message queue allows one or more processes to write or read messages; The life cycle of the message queue depends on the kernel; the message queue can achieve two-way communication
3. Semaphore: Create a semaphore collection (essentially an array) in the kernel. The elements of the array (semaphore) are all 1. Use P operation to perform -1, and use V operation to perform 1
P(sv): If the value of sv is greater than zero, decrement it by 1; if its value is zero, suspend the execution of the program
V(sv): If there are other processes waiting for sv is suspended, let it resume running. If no process is suspended due to waiting for sv, add 1 to it.
The PV operation is used for the same process to achieve mutual exclusion; the PV operation is used for different processes. Process to achieve synchronization
Function: Protect critical resources
4. Shared memory: Map the same piece of physical memory to the virtual address space of different processes to achieve synchronization between different processes Sharing of the same resource. When it comes to inter-process communication methods, shared memory can be said to be the most useful and fastest form of IPC
Features: Different from frequent switching and copying data from user mode to kernel mode, reading directly from memory That’s fine; shared memory is a critical resource, so atomicity must be guaranteed when operations are required. You can use a semaphore or a mutex lock.
The above is the detailed content of What is the communication method between Python processes?. For more information, please follow other related articles on the PHP Chinese website!