The Python standard library provides us with the threading and multiprocessing modules to write corresponding multi-threading/multi-process code. However, when the project reaches a certain scale, frequent creation/destruction of processes or threads is very resource intensive. Yes, at this time we have to write our own thread pool/process pool to trade space for time. But starting from Python3.2, the standard library provides us with the concurrent.futures module, which provides two classes: ThreadPoolExecutor and ProcessPoolExecutor, which realizes further abstraction of threading and multiprocessing, and is useful for writing thread pools/processes. Pool provides direct support.
The basis of the concurrent.futures module is Executor. Executor is an abstract class and cannot be used directly. However, the two subclasses ThreadPoolExecutor and ProcessPoolExecutor it provides are very useful. As the names suggest, they are used to create thread pool and process pool codes respectively. We can put the corresponding tasks directly into the thread pool/process pool, and there is no need to maintain the Queue to worry about deadlocks. The thread pool/process pool will automatically schedule it for us.
Future I believe that friends who have experience in programming under java and nodejs will be familiar with this concept. You can understand it as an operation to be completed in the future, this It is the basis of asynchronous programming. In traditional programming mode, for example, when we operate queue.get, blocking will occur before waiting for the result to be returned, and the CPU cannot be freed to do other things. The introduction of Future helps us to complete the task during the waiting period. Other operations. Regarding asynchronous IO in Python, you can refer to my Python concurrent programming coroutine/asynchronous IO after reading this article.
p.s: If you are still sticking to Python2.x, please install the futures module first.
pip install futures
Let’s first understand the concept of thread pool through the following code
# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # Create a pool that can accommodate up to 2 tasks Thread pool
future1 = pool.submit(return_future_result, ("hello")) # Add a task to the thread pool
future2 = pool.submit(return_future_result, ("world")) # Add to the thread pool A task
print(future1.done()) # Determine whether task1 ends
time.sleep(3)
print(future2.done()) # Determine whether task2 ends
print(future1. result()) # View the results returned by task1
print(future2.result()) # View the results returned by task2
Let’s analyze it based on the running results. We use the submit method to add a task to the thread pool, and submit returns a Future object. The Future object can be simply understood as an operation completed in the future. In the first print statement, it is obvious that our future1 has not been completed because of time.sleep(2), because we used time.sleep(3) to pause the main thread, so when it comes to the second print statement, our thread pool All tasks here have been completed.
ziwenxie :: ~ » python example1.py
False
True
hello
world
# During the execution of the above program, we can see through the ps command Three threads are running in the background at the same time
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py
ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py
ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
We have the above code It can also be rewritten in the form of a process pool. The API and thread pool are exactly the same, so I won’t go into details.
# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done ())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())
The following is Running results
ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py
ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py
ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py
ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py
ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
In addition to submit, Exectuor also provides us with The map method is provided, which is similar to the built-in map usage. Let's compare the difference between the two through two examples.
# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
从运行结果可以看出,as_completed不是按照URLS列表元素的顺序返回的。
ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes
# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('%r page is %d bytes' % (url, len(data)))
从运行结果可以看出,map是按照URLS列表元素的顺序返回的,并且写出的代码更加简洁直观,我们可以根据具体的需求任选一种。
ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes
wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED。
我们通过下面这个例子来看一下三个参数的区别
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成。
ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成。
ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
not_done={
写一个小程序对比multiprocessing.pool(ThreadPool)和ProcessPollExecutor(ThreadPoolExecutor)在执行效率上的差距,结合上面提到的Future思考为什么会造成这样的结果。
The above is the detailed content of Python concurrent programming thread pool/process pool. For more information, please follow other related articles on the PHP Chinese website!