Schauen wir uns zuerst einen Code an:
# ~*~ Twisted - A Python tale ~*~ from time import sleep # Hello, I'm a developer and I mainly setup Wordpress. def install_wordpress(customer): # Our hosting company Threads Ltd. is bad. I start installation and... print "Start installation for", customer # ...then wait till the installation finishes successfully. It is # boring and I'm spending most of my time waiting while consuming # resources (memory and some CPU cycles). It's because the process # is *blocking*. sleep(3) print "All done for", customer # I do this all day long for our customers def developer_day(customers): for customer in customers: install_wordpress(customer) developer_day(["Bill", "Elon", "Steve", "Mark"])
Führen Sie ihn aus, das Ergebnis ist wie folgt:
$ ./deferreds.py 1
------ Running example 1 ------ Start installation for Bill All done for Bill Start installation ... * Elapsed time: 12.03 seconds
Das ist eine Folge von Ausführungscode. Bei vier Verbrauchern dauert die Installation für eine Person 3 Sekunden, bei vier Personen also 12 Sekunden. Das ist nicht sehr zufriedenstellend, also schauen Sie sich das zweite Beispiel mit Threads an:
import threading # The company grew. We now have many customers and I can't handle the # workload. We are now 5 developers doing exactly the same thing. def developers_day(customers): # But we now have to synchronize... a.k.a. bureaucracy lock = threading.Lock() # def dev_day(id): print "Goodmorning from developer", id # Yuck - I hate locks... lock.acquire() while customers: customer = customers.pop(0) lock.release() # My Python is less readable install_wordpress(customer) lock.acquire() lock.release() print "Bye from developer", id # We go to work in the morning devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)] [dev.start() for dev in devs] # We leave for the evening [dev.join() for dev in devs] # We now get more done in the same time but our dev process got more # complex. As we grew we spend more time managing queues than doing dev # work. We even had occasional deadlocks when processes got extremely # complex. The fact is that we are still mostly pressing buttons and # waiting but now we also spend some time in meetings. developers_day(["Customer %d" % i for i in xrange(15)])
Führen Sie es aus:
$ ./deferreds.py 2
------ Running example 2 ------ Goodmorning from developer 0Goodmorning from developer 1Start installation forGoodmorning from developer 2 Goodmorning from developer 3Customer 0 ... from developerCustomer 13 3Bye from developer 2 * Elapsed time: 9.02 seconds
Diesmal ist ein Codestück, das parallel unter Verwendung von 5 Arbeitsthreads ausgeführt wird. 15 Verbraucher, die jeweils 3 Sekunden benötigen, bedeuten insgesamt 45 Sekunden, aber die Verwendung von 5 Threads zur parallelen Ausführung dauert nur insgesamt 9 Sekunden. Dieser Code ist etwas komplex und ein großer Teil des Codes wird zur Verwaltung der Parallelität verwendet, anstatt sich auf Algorithmen oder Geschäftslogik zu konzentrieren. Zudem sieht die Ausgabe des Programms sehr gemischt und schwer lesbar aus. Selbst einfacher Multithread-Code ist schwer gut zu schreiben, daher wechseln wir zur Verwendung von Twisted:
# For years we thought this was all there was... We kept hiring more # developers, more managers and buying servers. We were trying harder # optimising processes and fire-fighting while getting mediocre # performance in return. Till luckily one day our hosting # company decided to increase their fees and we decided to # switch to Twisted Ltd.! from twisted.internet import reactor from twisted.internet import defer from twisted.internet import task # Twisted has a slightly different approach def schedule_install(customer): # They are calling us back when a Wordpress installation completes. # They connected the caller recognition system with our CRM and # we know exactly what a call is about and what has to be done next. # # We now design processes of what has to happen on certain events. def schedule_install_wordpress(): def on_done(): print "Callback: Finished installation for", customer print "Scheduling: Installation for", customer return task.deferLater(reactor, 3, on_done) # def all_done(_): print "All done for", customer # # For each customer, we schedule these processes on the CRM # and that # is all our chief-Twisted developer has to do d = schedule_install_wordpress() d.addCallback(all_done) # return d # Yes, we don't need many developers anymore or any synchronization. # ~~ Super-powered Twisted developer ~~ def twisted_developer_day(customers): print "Goodmorning from Twisted developer" # # Here's what has to be done today work = [schedule_install(customer) for customer in customers] # Turn off the lights when done join = defer.DeferredList(work) join.addCallback(lambda _: reactor.stop()) # print "Bye from Twisted developer!" # Even his day is particularly short! twisted_developer_day(["Customer %d" % i for i in xrange(15)]) # Reactor, our secretary uses the CRM and follows-up on events! reactor.run()
Laufergebnisse:
------ Running example 3 ------ Goodmorning from Twisted developer Scheduling: Installation for Customer 0 .... Scheduling: Installation for Customer 14 Bye from Twisted developer! Callback: Finished installation for Customer 0 All done for Customer 0 Callback: Finished installation for Customer 1 All done for Customer 1 ... All done for Customer 14 * Elapsed time: 3.18 seconds
Dieses Mal erhalten wir perfekt ausgeführten Code und eine lesbare Ausgabe, ohne Threads zu verwenden. Wir haben 15 Verbraucher parallel verarbeitet, was bedeutet, dass die Ausführungszeit, die ursprünglich 45 Sekunden dauerte, innerhalb von 3 Sekunden abgeschlossen war. Der Trick besteht darin, dass wir alle blockierenden Aufrufe von sleep() durch die entsprechenden task.deferLater()- und Callback-Funktionen in Twisted ersetzen. Da die Abwicklung nun woanders stattfindet, können wir problemlos 15 Verbraucher gleichzeitig bedienen.
Die zuvor genannten Verarbeitungsvorgänge finden an anderer Stelle statt. Um es nun zu erklären: Arithmetische Operationen finden immer noch in der CPU statt, aber die CPU-Verarbeitungsgeschwindigkeit ist jetzt im Vergleich zu Festplatten- und Netzwerkoperationen sehr hoch. Das Einspeisen von Daten in die CPU oder das Senden von Daten von der CPU an den Speicher oder eine andere CPU nimmt daher die meiste Zeit in Anspruch. Um in diesem Bereich Zeit zu sparen, verwenden wir nicht blockierende Operationen. Beispielsweise verwendet task.deferLater() eine Callback-Funktion, die aktiviert wird, wenn die Daten übertragen wurden.
Ein weiterer sehr wichtiger Punkt sind die Nachrichten „Guten Morgen vom Twisted-Entwickler“ und „Tschüs vom Twisted-Entwickler“ in der Ausgabe! Diese beiden Informationen wurden bereits gedruckt, wenn die Ausführung des Codes beginnt. Wenn der Code bereits so früh ausgeführt wird, wann beginnt unsere Anwendung dann tatsächlich mit der Ausführung? Die Antwort ist, dass eine Twisted-Anwendung (einschließlich Scrapy) in „reactor.run()“ ausgeführt wird. Bevor diese Methode aufgerufen wird, muss jede Deferred-Kette, die in der Anwendung verwendet werden kann, bereit sein. Anschließend überwacht und aktiviert die Methode „reactor.run()“ die Rückruffunktion.
Beachten Sie, dass eine der Hauptregeln des Reaktors darin besteht, dass Sie jede Operation ausführen können, solange sie schnell genug ist und nicht blockiert.
Okay, es gibt keinen Teil des Codes, der zum Verwalten mehrerer Threads verwendet wird, aber diese Rückruffunktionen sehen immer noch etwas chaotisch aus. Es kann wie folgt geändert werden:
# Twisted gave us utilities that make our code way more readable! @defer.inlineCallbacks def inline_install(customer): print "Scheduling: Installation for", customer yield task.deferLater(reactor, 3, lambda: None) print "Callback: Finished installation for", customer print "All done for", customer def twisted_developer_day(customers): ... same as previously but using inline_install() instead of schedule_install() twisted_developer_day(["Customer %d" % i for i in xrange(15)]) reactor.run()
Das Laufergebnis ist das gleiche wie im vorherigen Beispiel. Dieser Code macht dasselbe wie das vorherige Beispiel, sieht jedoch prägnanter und klarer aus. Der inlineCallbacks-Generator kann einige Python-Mechanismen verwenden, um die Ausführung der Funktion inline_install() anzuhalten oder fortzusetzen. Die Funktion inline_install() wird zu einem verzögerten Objekt und wird für jeden Verbraucher parallel ausgeführt. Jedes Mal, wenn ein Yield auftritt, wird der Vorgang auf der aktuellen inline_install()-Instanz angehalten, bis das Deferred-Objekt des Yields abgeschlossen ist, und dann fortgesetzt.
Die einzige Frage ist jetzt: Was wäre, wenn wir nicht nur 15 Verbraucher hätten, sondern beispielsweise 10.000 Verbraucher? Dieser Code startet 10.000 gleichzeitige Ausführungssequenzen (z. B. HTTP-Anfragen, Datenbankschreibvorgänge usw.). Daran ist möglicherweise nichts auszusetzen, es kann jedoch auch zu verschiedenen Fehlern führen. Bei Anwendungen mit großen gleichzeitigen Anforderungen, wie etwa Scrapy, müssen wir die Anzahl der Parallelitäten häufig auf ein akzeptables Maß begrenzen. Im folgenden Beispiel verwenden wir task.Cooperator(), um eine solche Funktion abzuschließen. Scrapy verwendet auch den gleichen Mechanismus in seiner Item-Pipeline, um die Anzahl der Parallelitäten zu begrenzen (d. h. CONCURRENT_ITEMS-Einstellung):
@defer.inlineCallbacks def inline_install(customer): ... same as above # The new "problem" is that we have to manage all this concurrency to # avoid causing problems to others, but this is a nice problem to have. def twisted_developer_day(customers): print "Goodmorning from Twisted developer" work = (inline_install(customer) for customer in customers) # # We use the Cooperator mechanism to make the secretary not # service more than 5 customers simultaneously. coop = task.Cooperator() join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)]) # join.addCallback(lambda _: reactor.stop()) print "Bye from Twisted developer!" twisted_developer_day(["Customer %d" % i for i in xrange(15)]) reactor.run() # We are now more lean than ever, our customers happy, our hosting # bills ridiculously low and our performance stellar. # ~*~ THE END ~*~
Running results ::
$ ./deferreds.py 5 ------ Running example 5 ------ Goodmorning from Twisted developer Bye from Twisted developer! Scheduling: Installation for Customer 0 ... Callback: Finished installation for Customer 4 All done for Customer 4 Scheduling: Installation for Customer 5 ... Callback: Finished installation for Customer 14 All done for Customer 14 * Elapsed time: 9.19 seconds
Wie Sie der obigen Ausgabe entnehmen können, scheint es 5 Consumer-Verarbeitungsslots zu geben, wenn das Programm ausgeführt wird. Solange kein Slot freigegeben ist, kann mit der Verarbeitung der nächsten Verbraucheranfrage nicht begonnen werden. In diesem Beispiel beträgt die Verarbeitungszeit insgesamt 3 Sekunden, es sieht also so aus, als würde die Verarbeitung in 5er-Batches erfolgen. Die endgültige Leistung ist dieselbe wie bei der Verwendung von Threads, aber dieses Mal gibt es nur einen Thread, und der Code ist einfacher und es ist einfacher, korrekten Code zu schreiben.
PS: deferToThread ermöglicht nicht blockierende Synchronisierungsfunktionen
wisted defer.Deferred (von twisted.internet import defer) kann ein verzögertes Objekt zurückgeben.
Hinweis: deferToThread wird mit implementiert Threads, übermäßige Verwendung wird nicht empfohlen. Die Funktion wird in einem anderen Thread verarbeitet und hauptsächlich für Datenbank-/Dateilesevorgänge verwendet.
.. # 代码片段 def dataReceived(self, data): now = int(time.time()) for ftype, data in self.fpcodec.feed(data): if ftype == 'oob': self.msg('OOB:', repr(data)) elif ftype == 0x81: # 对服务器请求的心跳应答(这个是解析 防疲劳驾驶仪,发给gps上位机的,然后上位机发给服务器的) self.msg('FP.PONG:', repr(data)) else: self.msg('TODO:', (ftype, data)) d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid) d.addCallback(self._doResult, extra)
Das vollständige Beispiel Hier ist als Referenz
# -*- coding: utf-8 -*- from twisted.internet import defer, reactor from twisted.internet.threads import deferToThread import functools import time # 耗时操作 这是一个同步阻塞函数 def mySleep(timeout): time.sleep(timeout) # 返回值相当于加进了callback里 return 3 def say(result): print "耗时操作结束了, 并把它返回的结果给我了", result # 用functools.partial包装一下, 传递参数进去 cb = functools.partial(mySleep, 3) d = deferToThread(cb) d.addCallback(say) print "你还没有结束我就执行了, 哈哈" reactor.run()
更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!