Let’s look at a piece of code first:
# ~*~ Twisted - A Python tale ~*~ from time import sleep # Hello, I'm a developer and I mainly setup Wordpress. def install_wordpress(customer): # Our hosting company Threads Ltd. is bad. I start installation and... print "Start installation for", customer # ...then wait till the installation finishes successfully. It is # boring and I'm spending most of my time waiting while consuming # resources (memory and some CPU cycles). It's because the process # is *blocking*. sleep(3) print "All done for", customer # I do this all day long for our customers def developer_day(customers): for customer in customers: install_wordpress(customer) developer_day(["Bill", "Elon", "Steve", "Mark"])
Run it, the result is as follows:
$ ./deferreds.py 1
------ Running example 1 ------ Start installation for Bill All done for Bill Start installation ... * Elapsed time: 12.03 seconds
This is a piece of code that is executed sequentially. For four consumers, it takes 3 seconds to install for one person, so for four people it takes 12 seconds. This processing is not very satisfactory, so take a look at the second example using threads:
import threading # The company grew. We now have many customers and I can't handle the # workload. We are now 5 developers doing exactly the same thing. def developers_day(customers): # But we now have to synchronize... a.k.a. bureaucracy lock = threading.Lock() # def dev_day(id): print "Goodmorning from developer", id # Yuck - I hate locks... lock.acquire() while customers: customer = customers.pop(0) lock.release() # My Python is less readable install_wordpress(customer) lock.acquire() lock.release() print "Bye from developer", id # We go to work in the morning devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)] [dev.start() for dev in devs] # We leave for the evening [dev.join() for dev in devs] # We now get more done in the same time but our dev process got more # complex. As we grew we spend more time managing queues than doing dev # work. We even had occasional deadlocks when processes got extremely # complex. The fact is that we are still mostly pressing buttons and # waiting but now we also spend some time in meetings. developers_day(["Customer %d" % i for i in xrange(15)])
Run it:
$ ./deferreds.py 2
------ Running example 2 ------ Goodmorning from developer 0Goodmorning from developer 1Start installation forGoodmorning from developer 2 Goodmorning from developer 3Customer 0 ... from developerCustomer 13 3Bye from developer 2 * Elapsed time: 9.02 seconds
This time it is executed in parallel The code uses 5 worker threads. 15 consumers taking 3 seconds each means a total of 45 seconds, but using 5 threads to execute in parallel only takes a total of 9 seconds. This code is a bit complex, and a large part of the code is used to manage concurrency rather than focusing on algorithms or business logic. In addition, the program's output looks very mixed and difficult to read. Even simple multi-threaded code is difficult to write well, so we switch to using Twisted:
# For years we thought this was all there was... We kept hiring more # developers, more managers and buying servers. We were trying harder # optimising processes and fire-fighting while getting mediocre # performance in return. Till luckily one day our hosting # company decided to increase their fees and we decided to # switch to Twisted Ltd.! from twisted.internet import reactor from twisted.internet import defer from twisted.internet import task # Twisted has a slightly different approach def schedule_install(customer): # They are calling us back when a Wordpress installation completes. # They connected the caller recognition system with our CRM and # we know exactly what a call is about and what has to be done next. # # We now design processes of what has to happen on certain events. def schedule_install_wordpress(): def on_done(): print "Callback: Finished installation for", customer print "Scheduling: Installation for", customer return task.deferLater(reactor, 3, on_done) # def all_done(_): print "All done for", customer # # For each customer, we schedule these processes on the CRM # and that # is all our chief-Twisted developer has to do d = schedule_install_wordpress() d.addCallback(all_done) # return d # Yes, we don't need many developers anymore or any synchronization. # ~~ Super-powered Twisted developer ~~ def twisted_developer_day(customers): print "Goodmorning from Twisted developer" # # Here's what has to be done today work = [schedule_install(customer) for customer in customers] # Turn off the lights when done join = defer.DeferredList(work) join.addCallback(lambda _: reactor.stop()) # print "Bye from Twisted developer!" # Even his day is particularly short! twisted_developer_day(["Customer %d" % i for i in xrange(15)]) # Reactor, our secretary uses the CRM and follows-up on events! reactor.run()
------ Running example 3 ------ Goodmorning from Twisted developer Scheduling: Installation for Customer 0 .... Scheduling: Installation for Customer 14 Bye from Twisted developer! Callback: Finished installation for Customer 0 All done for Customer 0 Callback: Finished installation for Customer 1 All done for Customer 1 ... All done for Customer 14 * Elapsed time: 3.18 seconds
The processing operations mentioned earlier occur somewhere else. Now to explain, arithmetic operations still occur in the CPU, but the CPU processing speed is now very fast compared to disk and network operations. So feeding data to the CPU or sending data from the CPU to memory or another CPU takes most of the time. We use non-blocking operations to save time in this area. For example, task.deferLater() uses a callback function that will be activated when the data has been transferred.
Another very important point is the Goodmorning from Twisted developer and Bye from Twisted developer! information in the output. These two pieces of information have already been printed when the code starts executing. If the code executes to this point so early, when does our application actually start running? The answer is that for a Twisted application (including Scrapy) it is run in reactor.run(). Before calling this method, every Deferred chain that may be used in the application must be ready, and then the reactor.run() method will monitor and activate the callback function.
Note that one of the main rules of reactor is that you can perform any operation as long as it is fast enough and non-blocking.
Now, there is no part of the code that is used to manage multi-threads, but these callback functions still look a bit messy. It can be modified like this:
# Twisted gave us utilities that make our code way more readable! @defer.inlineCallbacks def inline_install(customer): print "Scheduling: Installation for", customer yield task.deferLater(reactor, 3, lambda: None) print "Callback: Finished installation for", customer print "All done for", customer def twisted_developer_day(customers): ... same as previously but using inline_install() instead of schedule_install() twisted_developer_day(["Customer %d" % i for i in xrange(15)]) reactor.run()
The only question now is, what if we don’t just have 15 consumers, but, for example, 10,000 consumers? This code will start 10,000 simultaneous execution sequences (such as HTTP requests, database writes, etc.). There may be nothing wrong with doing this, but it may also lead to various failures. In applications with huge concurrent requests, such as Scrapy, we often need to limit the number of concurrencies to an acceptable level. In the following example, we use task.Cooperator() to complete such a function. Scrapy also uses the same mechanism in its Item Pipeline to limit the number of concurrencies (ie, CONCURRENT_ITEMS setting):
@defer.inlineCallbacks def inline_install(customer): ... same as above # The new "problem" is that we have to manage all this concurrency to # avoid causing problems to others, but this is a nice problem to have. def twisted_developer_day(customers): print "Goodmorning from Twisted developer" work = (inline_install(customer) for customer in customers) # # We use the Cooperator mechanism to make the secretary not # service more than 5 customers simultaneously. coop = task.Cooperator() join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)]) # join.addCallback(lambda _: reactor.stop()) print "Bye from Twisted developer!" twisted_developer_day(["Customer %d" % i for i in xrange(15)]) reactor.run() # We are now more lean than ever, our customers happy, our hosting # bills ridiculously low and our performance stellar. # ~*~ THE END ~*~
$ ./deferreds.py 5 ------ Running example 5 ------ Goodmorning from Twisted developer Bye from Twisted developer! Scheduling: Installation for Customer 0 ... Callback: Finished installation for Customer 4 All done for Customer 4 Scheduling: Installation for Customer 5 ... Callback: Finished installation for Customer 14 All done for Customer 14 * Elapsed time: 9.19 seconds
wisted defer.Deferred (from twisted.internet import defer) can return a deferred object.
***Change the synchronous function into asynchronous (return a Deferred)***
twisted's deferToThread (from twisted.internet.threads import deferToThread) also returns a deferred object, but the callback The function is processed in another thread, mainly used for database/file reading operations
.. # 代码片段 def dataReceived(self, data): now = int(time.time()) for ftype, data in self.fpcodec.feed(data): if ftype == 'oob': self.msg('OOB:', repr(data)) elif ftype == 0x81: # 对服务器请求的心跳应答(这个是解析 防疲劳驾驶仪,发给gps上位机的,然后上位机发给服务器的) self.msg('FP.PONG:', repr(data)) else: self.msg('TODO:', (ftype, data)) d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid) d.addCallback(self._doResult, extra)
##The complete example here is for your reference
# -*- coding: utf-8 -*- from twisted.internet import defer, reactor from twisted.internet.threads import deferToThread import functools import time # 耗时操作 这是一个同步阻塞函数 def mySleep(timeout): time.sleep(timeout) # 返回值相当于加进了callback里 return 3 def say(result): print "耗时操作结束了, 并把它返回的结果给我了", result # 用functools.partial包装一下, 传递参数进去 cb = functools.partial(mySleep, 3) d = deferToThread(cb) d.addCallback(say) print "你还没有结束我就执行了, 哈哈" reactor.run()
更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!