This article brings you an introduction to the inter-process data communication module multiprocessing.Manager in python. It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.
Currently in development, there are situations where data needs to be shared between processes. So I studied multiprocessing.Manager, mainly using dict as an example to illustrate inter-process sharing (same parent process).
dict usage instructions
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() # 3. 创建一个测试程序 def test(idx, test_dict): test_dict[idx] = idx # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
too simple.
Now let’s look at another example
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict): test_dict['test'][idx] = idx # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
You can see the output The result is strange{'test': {}}
If we simply modify the code
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict): row = test_dict['test'] row[idx] = idx test_dict['test'] = row # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict)) pool.close() pool.join() print(temp_dict)
The output result will be in line with expectations.
For To understand the reasons behind this phenomenon, I briefly read the source code. The following pieces of code are key.
def Manager(): ''' Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. ''' from multiprocessing.managers import SyncManager m = SyncManager() m.start() return m ... def start(self, initializer=None, initargs=()): ''' Spawn a server process for this manager object ''' assert self._state.value == State.INITIAL if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), ) ident = ':'.join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + '-' + ident self._process.start() ...
As can be seen from the above code, when we declare a Manager object, the program actually Other processes start a server service, and this server is blocked to achieve inter-process data security.
My understanding is that operations between different processes are mutually exclusive, and a process requests this part of data from the server. , then modify this part of the data and return it to the server, and then the server will handle the requests of other processes.
Back to the strange phenomenon above, this operationtest_dict['test'][idx] = idx
is actually modified after pulling the data from the server, but it is not returned to the server, so the data in temp_dict has not changed at all. In the second piece of normal code, it is equivalent to requesting data from the server first. , and then transmit the modified data to the server. This can explain this phenomenon.
If a situation occurs at this time, two processes request a copy at the same time What will happen if the same data is modified separately and then submitted to the server? Of course, the data is abnormal. Based on this, we need another object of the Manager, Lock(). This object is not difficult to understand, the Manager itself is A server, dict and lock all come from this server, so when you lock it, other processes cannot get the data, and naturally the above abnormal situation will not occur.
Code example:
import multiprocessing # 1. 创建一个Manger对象 manager = multiprocessing.Manager() # 2. 创建一个dict temp_dict = manager.dict() lock = manager.Lock() temp_dict['test'] = {} # 3. 创建一个测试程序 def test(idx, test_dict, lock): lock.acquire() row = test_dict['test'] row[idx] = idx test_dict['test'] = row lock.release() # 4. 创建进程池进行测试 pool = multiprocessing.Pool(4) for i in range(100): pool.apply_async(test, args=(i, temp_dict, lock)) pool.close() pool.join() print(temp_dict)
Don’t create a new lock object in the process, use a unified lock object.
This article has ended here, more other exciting content can be Pay attention to the python video tutorial column on the PHP Chinese website!
The above is the detailed content of Introduction to the inter-process data communication module multiprocessing.Manager in Python. For more information, please follow other related articles on the PHP Chinese website!