Letting computer programs run concurrently is a topic that is often discussed. Today I want to discuss various concurrency methods under Python.
Concurrency method
Thread
Multi-threading is the tool that almost every programmer will first think of when using every language (JS programmers please avoid it), using multi-threading can be effective Utilize CPU resources (except for Python). However, the complexity of programs brought by multi-threading is inevitable, especially the synchronization problem of competing resources.
However, due to the use of Global Interpretation Lock (GIL) in Python, the code cannot run concurrently on multiple cores at the same time. In other words, Python's multi-threading cannot be concurrent. Many people will find that using multi-threading can improve themselves. After adding the Python code, the running efficiency of the program has dropped. What a pain in the ass! If you want to know more details, I recommend reading this article. In fact, it is very difficult to use a multi-threaded programming model, and programmers can easily make mistakes. This is not the programmer's fault, because parallel thinking is anti-human, and most of us think serially (schizophrenia is not discussed) , and the computer architecture designed by von Neumann is also based on sequential execution. So if you can't always handle your multi-threaded program, congratulations, you are a normal-thinking programmer:)
Python provides two sets of thread interfaces, one is the thread module, which provides basic, low-level (Low) Level) interface, using Function as the running body of the thread. There is also a group of threading modules, which provide an easier-to-use object-based interface (similar to Java). You can inherit the Thread object to implement threads, and also provide other thread-related objects, such as Timer, Lock
Using the thread module Example
1
2
3
4
5
import thread
def worker():
"""thread worker function"""
PRint 'Worker'
thread.start_new_thread(worker)
Example of using threading module
1
2
3
4
5
6
import threading
def worker():
“""thread worker function"""
Print 'Worker'
t = threading.Thread(target=worker)
t.start()
or Java Style
1
2
3
4
5
6
7
8
9
10
import threading
class worker(thread ing.Thread):
def __init__(self) :
PASSf DEF RUN ():
"" Thread Worker Function "" "
Print 'worker'
t = worker ()
t.start ()
Process
Due to the global interpretation lock problem mentioned above, a better parallel method in Python is to use multiple processes, which can use CPU resources very effectively and achieve true concurrency. Of course, the overhead of processes is greater than that of threads, which means that if you want to create an alarming number of concurrent processes, you need to consider whether your machine has a strong heart.
Python’s mutliprocess module has a similar interface to threading.
1
2
3
4
5
6
7
8
from multiprocessing import Process
def worker():
"""thread worker function"""
print 'Worker'
p = Process(target=worker)
p.start()
p.join()
🎜Since threads share the same address space and memory, So communication between threads is very easy, but communication between processes is more complicated. Common inter-process communications include pipes, message queues, Socket interfaces (TCP/IP), etc. 🎜🎜Python’s mutliprocess module provides encapsulated pipes and queues, which can easily transfer messages between processes. 🎜🎜The synchronization between Python processes uses locks, which is the same as threads. 🎜🎜In addition, Python also provides a process pool Pool object, which can easily manage and control threads. 🎜🎜Remote distributed host (Distributed Node)🎜
With the advent of the big data era, Moore's Theorem seems to have lost its effect on a single machine. Data calculation and processing require a distributed computer network to run. Programs run in parallel on multiple host nodes. This is already the case today. Issues that must be considered in software architecture.
There are several common methods of inter-process communication between remote hosts
TCP/IP
TCP/IP is the basis of all remote communication. However, the API is relatively low-level and cumbersome to use, so it is generally not considered
Remote Function Call
RPC is an early means of remote inter-process communication. There is an open source implementation RPyC under Python
Remote Object
Remote object is a higher-level encapsulation. The program can operate the local proxy of a remote object in the same way as a local object. CORBA is the most widely used specification for remote objects. The biggest benefit of CORBA is that it can communicate in different languages and platforms. Different languages and platforms also have their own remote object implementations, such as Java's RMI, MS's DCOM
Python's open source implementation, there are many supports for remote objects
Dopy
Fnorb (CORBA)
ICE
omniORB (CORBA)
Pyro
YAMI
Message Queue
Compared with RPC or remote objects, messages are a more flexible communication method. Common message mechanisms that support Python interfaces are
RabbitMQ
ZeroMQ
Kafka
AWS SQS + BOTO
There is no big difference between executing concurrency on the remote host and local multi-process, and both need to solve the problem of inter-process communication. Of course, the management and coordination of remote processes are more complicated than local ones.
There are many open source frameworks under Python to support distributed concurrency and provide effective management methods, including:
Celery
Celery is a very mature Python distributed framework that can execute asynchronously in a distributed system tasks and provide effective management and scheduling functions. Refer here
SCOOP
SCOOP (Scalable COncurrent Operations in Python) provides a simple and easy-to-use distributed calling interface, using the Future interface for concurrency.
Dispy
Compared with Celery and SCOOP, Dispy provides a more lightweight distributed parallel service
PP
PP (Parallel Python) is another lightweight Python parallel service, refer to here
Asyncoro
Asyncoro is another Python framework that uses Generator to achieve distributed concurrency.
Of course there are many other systems, I have not listed them one by one.
In addition, many distributed systems provide support for Python interfaces. For example, Spark
Pseudo-Thread
There is another concurrency method that is not common. We can call it pseudo-thread, which looks like a thread and uses an interface similar to a thread interface, but actually uses a non-threaded one. method, the corresponding thread overhead is not saved.
greenlet
greenlet provides lightweight coroutines to support in-process concurrency.
greenlet is a by-product of Stackless. It uses tasklet to support a technology called mirco-thread. Here is an example of pseudo-threading using greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from greenlet import greenlet
def test1 ():
print 12
gr2.switch()
print 34
def test2():
print 56
gr1.switch()
Print 78
gr1 = greenlet( test1)
gr2 = greenlet(test2)
gr1.switch()
Run the above program to get the following results:
1
2
3
12
56
34
Pseudo thread gr1 switch will print 12, then call gr2 switch to get 56, then switch back to gr1, print 34, then pseudo thread gr1 ends and the program exits, so 78 will never be printed. From this example, we can see that using pseudo-threads, we can effectively control the execution flow of the program, but pseudo-threads do not have real concurrency.
Eventlet, gevent and concurence all provide concurrency based on greenlet.
eventlet http://eventlet.net/
eventlet is a Python library that provides network call concurrency. Users can call blocking IO operations in a non-blocking manner.
1
2
3
4
5
6
7
8
9
10
11
12
import eventlet
from eventlet.green import urllib2
urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']
def fetch(url):
return urllib2.urlopen(url).read()
EPool = Eventlet.greenPool () FOR BODY in Pool.imap (FETCH, URLS): Print ("Got Body", Len (Body) The execution results are as follows
123 ('got body', 17629)('got body', 1270)('got body', 46949)
eventlet order urllib2 has been modified to support generator operations, and the interface is consistent with urllib2. The GreenPool here is consistent with Python's Pool interface. geventgevent is similar to eventlet. You can refer to this article about their differences.
import gevent
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)
print [job.value for job in jobs]
The execution result is as follows:
1
[ '206.169.145.226', '93.184.216.34', '23.235.39.223']
concurence https://github.com/concurrence/concurrence
concurence is another greenlet Provide network concurrency I have never used the open source library, so you can try it yourself.
Practical Application
There are usually two situations where concurrency is needed. One is computationally intensive, which means your program requires a lot of CPU resources; the other is IO intensive, and the program may have a large number of reads. Write operations include reading and writing files, sending and receiving network requests, etc.
Computing intensive
Corresponding to computationally intensive applications, we choose the famous Monte Carlo algorithm to calculate the PI value. The basic principle is as follows
The Monte Carlo algorithm uses statistical principles to simulate and calculate pi. In a square, the probability of a random point falling in the 1/4 circle area (red point) is proportional to its area. That is, the probability p = Pi * R * R / 4: R * R, where R is the side length of the square and the radius of the circle. That is to say, the probability is 1/4 of pi. Using this conclusion, as long as we simulate the probability of a point falling on a quarter circle, we can know the pi. In order to get this probability, we can do a lot of experiments, and It is to generate a large number of points, see which area the points are in, and then calculate the results. The basic algorithm is as follows: 12345from math import hypot
from random import random
def test(tries):
return sum(hypot(random(), random())
Here the test method does n (tries) tests and returns a quarter circle The number of points. The judgment method is to check the distance from the point to the center of the circle. If it is less than R, it is on the circle.
With a large amount of concurrency, we can quickly run multiple tests. The more tests we run, the closer the results will be to the true pi.
Here are the program codes for different concurrency methods
Non-concurrency
We first run in a single thread but process to see how the performance is
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from math import hypot
from random import random
import eventlet
import time
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, tries):
ts = time.time()
result = map(test, [tries] * nbFutures)
ret = 4. * sum(result) / float(nbFu tures * tries) span
Multi-threadingIn order to use thread pool , we use the dummy package of multiprocessing, which is an encapsulation of multi-threading. Note that although the code here does not mention threads at all, it is definitely multi-threaded.Through testing, we found that as expected, when the thread pool is 1, its running results are the same as when there is no concurrency. When we set the thread pool number to 5, it takes a long time. Almost 2 times faster than without concurrency, my test data went from 5 seconds to 9 seconds. So for computationally intensive tasks, it’s better to give up on multi-threading.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing.dummy import Pool
from math import hypot
from random import random
import time
def test( tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, tries):
ts = time.time()
p = Pool(1)
result = p.map(test, [tries] * nbFutures)
ret = 4. * sum(result) / float(nbFutures * tries)
span = time.time( ) - ts
print "time spend ", span
return ret
if __name__ == '__main__':
p = Pool()
print("pi = {}".format(calcPi( 3000, 4000)))
multiprocess multiprocess
Theoretically, for computationally intensive tasks, it is more appropriate to use multi-process concurrency. In the following example, the size of the process pool is set to 5. Modify The impact of the size of the process pool on the results can be seen. When the process pool is set to 1, the time required for the results of multi-threading is similar, because there is no concurrency at this time; when it is set to 2, the response time is significantly improved. The improvement is half of what it was before without concurrency; however, continuing to expand the process pool has little impact on performance, and may even decrease. Maybe my Apple Air's CPU only has two cores?
Be careful, if you set up a very large process pool, you will encounter Resource temporarily unavailable errors. The system cannot support the creation of too many processes. After all, resources are limited.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool
from math import hypot
from random import random
import time
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, tries):
ts = time.time()
p = Pool (5)
result = p.map(test, [tries] * nbFutures) ret = 4. * sum(result) / float(nbFutures * tries) span = time.time() - ts
print "time spend ", span
use use with use using out out off out’s' out's' out's ‐ out's 's ‐ to be printed
gevent (pseudo-thread)Whether it is gevent or eventlet, because there is no actual concurrency, the response time is not much different from that without concurrency. This is consistent with the test results. 123456789101112 13141516 1718 import geventfrom math import hypotfrom random import randomimport time def test(tries):Return sum(hypot(random(), random ()) def calcPi(nbFutures, tries): ts = time.time() jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures] gevent.joinall(jobs, timeout=2) ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)
span = time.time() - ts
print "time spend ", span
return ret
print calcPi(3000,4000)
eventlet (伪线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from math import hypot
from random import random
import eventlet
import time
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, tries):
ts = time.time()
pool = eventlet.GreenPool()
result = pool.imap(test, [tries] * nbFutures)
ret = 4. * sum(result) / float(nbFutures * tries)
span = time.time() - ts
print "time spend ", span
return ret
print calcPi(3000,4000)
SCOOP
SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。
在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from math import hypot
from random import random
from scoop import futures
import time
def test(tries):
return sum(hypot(random(), random())
def calcPi(nbFutures, tries):
ts = time.time()
expr = futures.map(test, [tries] * nbFutures)
ret = 4. * sum(expr) / float(nbFutures * tries)
span = time.time() - ts
print "time spend ", span
return ret
if __name__ == "__main__":
print("pi = {}".format(calcPi(3000, 4000)))
Celery
任务代码
1
2
3
4
5
6
7
8
9
10
11
from celery import Celery
from math import hypot
from random import random
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
@app.task
def test(tries):
return sum(hypot(random(), random())
客户端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import group
from tasks import test
import time
def calcPi(nbFutures, tries):
ts = time.time()
result = group(test.s(tries) for i in xrange(nbFutures))().get()
ret = 4. * sum(result) / float(nbFutures * tries)
span = time.time() - ts
print "time spend ", span
return ret
print calcPi(3000, 4000)
The results of the concurrency test using Celery are unexpected (the environment is a single machine, 4frefork concurrency, and the message broker is rabbitMQ). It is the worst among all test cases. The response time is 5 to 6 times that without concurrency. This may be because the overhead of control coordination is too high. For such computing tasks, Celery may not be a good choice.
asyncoro
The test results of Asyncoro are consistent with non-concurrency.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncoro
from math import hypot
from random import random
import time
def test(tries):
yield sum(hypot( random(), random())
def calcPi(nbFutures, tries):
ts = time.time()
coros = [ asyncoro.Coro (test,t) for t in [tries] * nbFutures]
ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)
span = time.time( ) - ts
print "time spend ", span
return ret
print calcPi(3000,4000)
IO-intensive
IO-intensive tasks are another common use cases For example, a network WEB server is an example. How many requests it can handle per second is an important indicator of the WEB server.
Let’s take web page reading as the simplest example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from math import hypot
import time
import urllib2
urls = ['http://www.google.com' , 'http://www.example.com', 'http://www.python.org']
def test(url):
return urllib2.urlopen(url).read()
def testIO(nbFutures):
ts = time.time()
map(test, urls * nbFutures)
span = time.time() - ts
print time spend ", span
testIO(10)
The codes under different concurrency libraries are relatively similar, so I won’t list them one by one. You can refer to the computationally intensive code for reference.
Through testing, we can find that for IO-intensive tasks, using multi-threads or multi-processes can effectively improve the efficiency of the program. The performance improvement of using pseudo-threads is very significant. The eventlet is more responsive than without concurrency. Time increased from 9 seconds to 0.03 seconds. At the same time, eventlet/gevent provides a non-blocking asynchronous calling mode, which is very convenient. It is recommended to use threads or pseudo-threads here, because threads and pseudo-threads consume fewer resources when the response time is similar.
Summary
Python provides different concurrency methods. Corresponding to different scenarios, we need to choose different methods for concurrency. To choose the appropriate method, you must not only understand the principles of the method, but also do some tests and experiments. Data is the best reference for you to make your choice.
The above is the content of concurrent programming using Python. For more related articles, please pay attention to the PHP Chinese website (www.php.cn)!