Detailed introduction to thread pool/process pool in Python concurrent programming

高洛峰
Release: 2017-03-17 17:38:52
Original
2091 people have browsed it

Introduction

PythonThe standard library provides us with threading and multiprocessing modules to write corresponding multi-threading/multi-process code, but when the project reaches a certain scale , Frequently creating/destroying processes or threads consumes a lot of resources. At this time, we have to write our own thread pool/process pool to trade space for time. But starting from Python3.2, the standard library provides us with the concurrent.futures module, which provides two classes: ThreadPoolExecutor and ProcessPoolExecutor, which implements Further abstraction of threading and multiprocessing provides direct support for writing thread pools/process pools.

Executor and Future

The basis of the concurrent.futures module is Executor. Executor is an abstract class, which cannot be used directly. However, the two subclasses ThreadPoolExecutor and ProcessPoolExecutor it provides are very useful. As the names suggest, they are used to create thread pool and process pool codes respectively. We can put the corresponding tasks directly into the thread pool/process pool, and there is no need to maintain the Queue to worry about deadlocks. The thread pool/process pool will automatically schedule it for us.

Future I believe this concept will be familiar to friends who have experience in java and nodejsprogrammingYou can use it Understood as an operation completed in the future, this is the basis of asynchronous programming. In traditional programming mode, for example, when we operate queue.get, blocking will occur before waiting for the result to be returned, and the CPU cannot be freed to do other things. The introduction of Future helps us complete other operations while waiting. Regarding asynchronous IO in Python, you can refer to my Python concurrent programming coroutine/asynchronous IO after reading this article.

p.s: If you are still sticking to Python2.x, please install the futures module first.

pip install futures
Copy after login

Use submit to operate thread pool/process pool

Let’s first understand the concept of thread pool through the following code

# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task
print(future1.done())  # 判断task1是否结束
time.sleep(3)
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果
Copy after login

We based on the running results Let’s analyze it. We use the submit method to add a task to the thread pool. Submit returns a Future object . The Future object can be simply understood as an operation completed in the future. . In the first print statement, it is obvious that our future1 has not been completed because of time.sleep(2), because we used time.sleep(3) to pause the main thread, so when it comes to the second print statement, our thread pool All tasks here have been completed.

ziwenxie :: ~ » python example1.py
False
True
hello
world
# 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8361  7557  8361  3    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8362  0    3 19:45 pts/0    00:00:00 python example1.py
ziwenxie      8361  7557  8363  0    3 19:45 pts/0    00:00:00 python example1.py
Copy after login

We can also rewrite the above code into the process pool form. api is exactly the same as the thread pool, so I won’t be wordy.

# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())
Copy after login

The following are the running results

ziwenxie :: ~ » python example2.py
False
True
hello
world
ziwenxie :: ~ » ps -eLf | grep python
ziwenxie      8560  7557  8560  3    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8563  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8560  7557  8564  0    3 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8561  8560  8561  0    1 19:53 pts/0    00:00:00 python example2.py
ziwenxie      8562  8560  8562  0    1 19:53 pts/0    00:00:00 python example2.py
Copy after login

Use map/wait to operate the thread pool/process pool

In addition to submit, Executor also provides us with The map method is similar to the built-in map usage. Let's compare the difference between the two through two examples.

Review of using submit operation

# example3.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
Copy after login

As can be seen from the running results, as_completed is not returned in the order of the URLS list elements.

ziwenxie :: ~ » python example3.py
'http://example.com/' page is 1270 byte
'https://api.github.com/' page is 2039 bytes
'http://httpbin.org' page is 12150 bytes
Copy after login

Use map

# example4.py
import concurrent.futures
import urllib.request
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/']
def load_url(url):
    with urllib.request.urlopen(url, timeout=60) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data)))
Copy after login

As can be seen from the running results, map returns in the order of the URLS list elements, and the code written is more concise and intuitive. We You can choose any one according to your specific needs.

ziwenxie :: ~ » python example4.py
'http://httpbin.org' page is 12150 bytes
'http://example.com/' page is 1270 bytes
'https://api.github.com/' page is 2039 bytes
Copy after login

The third option is wait

The wait method will return a tuple (tuple). The tuple contains two set(sets), one is completed( Completed) and the other is uncompleted. One advantage of using the wait method is to gain greater freedom. It receives three parameters: FIRST_COMPLETED, FIRST_EXCEPTION and ALL_COMPLETE. The default setting is ALL_COMPLETED.

Let’s take a look at the difference between the three parameters through the following example

from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_random_secs, x))
print(wait(futures))
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
Copy after login

If the default ALL_COMPLETED is used, the program will block until all tasks in the thread pool are completed.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f0b06c9bc88 state=finished returned str>,
<Future at 0x7f0b06cbaa90 state=finished returned str>,
<Future at 0x7f0b06373898 state=finished returned str>,
<Future at 0x7f0b06352ba8 state=finished returned str>,
<Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
Copy after login

If the FIRST_COMPLETED parameter is used, the program will not wait until all tasks in the thread pool are completed.

ziwenxie :: ~ » python example5.py
DoneAndNotDoneFutures(done={
<Future at 0x7f84109edb00 state=finished returned str>,
<Future at 0x7f840e2e9320 state=finished returned str>,
<Future at 0x7f840f25ccc0 state=finished returned str>},
not_done={<Future at 0x7f840e2e9ba8 state=running>,
<Future at 0x7f840e2e9940 state=running>})
Copy after login

The above is the detailed content of Detailed introduction to thread pool/process pool in Python concurrent programming. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template