PythonThe standard library provides us with threading and multiprocessing modules to write corresponding multi-threading/multi-process code, but when the project reaches a certain scale , Frequently creating/destroying processes or threads consumes a lot of resources. At this time, we have to write our own thread pool/process pool to trade space for time. But starting from Python3.2, the standard library provides us with the concurrent.futures module, which provides two classes: ThreadPoolExecutor and ProcessPoolExecutor, which implements Further abstraction of threading and multiprocessing provides direct support for writing thread pools/process pools.
The basis of the concurrent.futures module is Executor. Executor is an abstract class, which cannot be used directly. However, the two subclasses ThreadPoolExecutor and ProcessPoolExecutor it provides are very useful. As the names suggest, they are used to create thread pool and process pool codes respectively. We can put the corresponding tasks directly into the thread pool/process pool, and there is no need to maintain the Queue to worry about deadlocks. The thread pool/process pool will automatically schedule it for us.
Future I believe this concept will be familiar to friends who have experience in java and nodejsprogrammingYou can use it Understood as an operation completed in the future, this is the basis of asynchronous programming. In traditional programming mode, for example, when we operate queue.get, blocking will occur before waiting for the result to be returned, and the CPU cannot be freed to do other things. The introduction of Future helps us complete other operations while waiting. Regarding asynchronous IO in Python, you can refer to my Python concurrent programming coroutine/asynchronous IO after reading this article.
p.s: If you are still sticking to Python2.x, please install the futures module first.
pip install futures
Let’s first understand the concept of thread pool through the following code
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
We based on the running results Let’s analyze it. We use the submit method to add a task to the thread pool. Submit returns a Future object . The Future object can be simply understood as an operation completed in the future. . In the first print statement, it is obvious that our future1 has not been completed because of time.sleep(2), because we used time.sleep(3) to pause the main thread, so when it comes to the second print statement, our thread pool All tasks here have been completed.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
We can also rewrite the above code into the process pool form. api is exactly the same as the thread pool, so I won’t be wordy.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
The following are the running results
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
In addition to submit, Executor also provides us with The map method is similar to the built-in map usage. Let's compare the difference between the two through two examples.
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
As can be seen from the running results, as_completed is not returned in the order of the URLS list elements.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
As can be seen from the running results, map returns in the order of the URLS list elements, and the code written is more concise and intuitive. We You can choose any one according to your specific needs.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
The wait method will return a tuple (tuple). The tuple contains two set(sets), one is completed( Completed) and the other is uncompleted. One advantage of using the wait method is to gain greater freedom. It receives three parameters: FIRST_COMPLETED, FIRST_EXCEPTION and ALL_COMPLETE. The default setting is ALL_COMPLETED.
Let’s take a look at the difference between the three parameters through the following example
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
If the default ALL_COMPLETED is used, the program will block until all tasks in the thread pool are completed.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
If the FIRST_COMPLETED parameter is used, the program will not wait until all tasks in the thread pool are completed.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
The above is the detailed content of Detailed introduction to thread pool/process pool in Python concurrent programming. For more information, please follow other related articles on the PHP Chinese website!