This article brings you an introduction to Python concurrency PoolExecutor (with examples). It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.
Use multi-threading and multi-processing to complete conventional concurrency requirements. Steps such as start and join cannot be omitted during startup. For complex requirements, 1-2 queues are required.
As the requirements become more and more complex, without good design and functional level abstraction, the more code, the more difficult it will be to debug.
For tasks that require concurrent execution but do not have high real-time requirements, we can use the PoolExecutor class in the concurrent.futures package to implement it.
This package provides two executors: thread pool executor ThreadPoolExecutor and process pool executor ProcessPoolExecutor. The two executors provide the same API.
The main purpose of the concept of pool is for reuse: allowing threads or processes to be used multiple times during their life cycle. It reduces the overhead of creating threads and processes and improves program performance. Reuse is not a required rule, but it is the main reason why programmers use pools in their applications.
Pool has only a fixed number of threads/processes, specified by max_workers.
The task is submitted to the executor's task queue through executor.submit and a future object is returned.
Future is a common concurrency design pattern.
A Future object represents some results that are not yet ready (completed). This result can be obtained after it is ready at a certain time in the "future".
Tasks are scheduled to be executed among various workers.
But please note that once a task is executed, the worker will be occupied until the execution is completed! If there are not enough workers, other tasks will keep waiting! Therefore PoolExecutor is not suitable for real-time tasks.
import concurrent.futures import time from itertools import count number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): for i in count(x): # count 是无限迭代器,会一直递增。 print(f"{x} - {i}") time.sleep(0.01) if __name__ == "__main__": # 进程池 start_time_2 = time.time() # 使用 with 在离开此代码块时,自动调用 executor.shutdown(wait=true) 释放 executor 资源 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 将 10 个任务提交给 executor,并收集 futures futures = [executor.submit(evaluate_item, item) for item in number_list] # as_completed 方法等待 futures 中的 future 完成 # 一旦某个 future 完成,as_completed 就立即返回该 future # 这个方法,使每次返回的 future,总是最先完成的 future # 而不是先等待任务 1,再等待任务 2... for future in concurrent.futures.as_completed(futures): print(future.result()) print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
In the above code, the five tasks with items 1 2 3 4 5 will always occupy all workers, while the five tasks 6 7 8 9 10 will wait forever! ! !
API detailed description
concurrent.futures contains three parts of the API:
PoolExecutor: that is, the API of the two executors
Constructor: The main parameter is max_workers, which is used to specify the thread pool size (or the number of workers)
submit(fn, *args, **kwargs): Submit the task function fn to the executor, args and kwargs are fn required parameters.
Return a future for obtaining results
map(func, *iterables, timeout=None, chunksize=1): When the task is the same and only the parameters are different, you can use this method instead of submit. Each element of iterables corresponds to a set of parameters of func.
Return an iterator of futures
shutdown(wait=True): Shut down the executor. Generally, the with manager is used to automatically shut down.
Future: After the task is submitted to the executor, a future will be returned
future.result(timout=None): The most commonly used method, returns the result of the task. If the task has not yet ended, this method will wait forever!
timeout specifies the timeout period. If it is None, there is no timeout limit.
exception(timeout=None): Gives the exception thrown by the task. Like result(), it will also wait for the task to end.
cancel(): Cancel this task
add_done_callback(fn): After the future is completed, fn(future) will be executed.
running(): Whether it is running
done(): Whether the future has ended, boolean
...For details, please refer to the official documentation
Module With practical functions
concurrent.futures.as_completed(fs, timeout=None): Wait for the future in fs (futures iterable) to be completed
Once a future in fs is completed, this The function returns the future immediately.
This method makes the future returned each time always the first to complete. Instead of waiting for task 1 first, and then waiting for task 2...
Usually use this function through for future in as_completed(fs):.
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED): Wait until something specified by return_when occurs, or timeout
return_when has three options: ALL_COMPLETED (fs All futures in fs are completed), FIRST__COMPLETED (any future in fs is completed) and FIRST_EXCEPTION (a task throws an exception)
Future Design Pattern
The characteristic of PoolExecutor here is that it uses The Future design pattern makes task execution and result acquisition an asynchronous process.
We first put the task into the task queue through submit/map, and then the task has started to be executed! Then when we need it, we get the result through future, or add_done_callback(fn) directly.
The execution of the task here is in new workers. The main process/thread will not be blocked, so the main thread can do other things. This approach is called asynchronous programming.
Offscreen
concurrent.futures is implemented based on multiprocessing.pool, so it is actually a little slower than using the thread/process Pool directly. But it provides a more convenient and concise API.
The above is the detailed content of Introduction to PoolExecutor in Python concurrency (with examples). For more information, please follow other related articles on the PHP Chinese website!