Codebeispiele zum Schreiben nicht blockierender Programme mit dem Twisted-Framework von Python

高洛峰
Freigeben: 2017-02-03 16:33:30
Original
1278 Leute haben es durchsucht

Schauen wir uns zuerst einen Code an:

# ~*~ Twisted - A Python tale ~*~
 
from time import sleep
 
# Hello, I'm a developer and I mainly setup Wordpress.
def install_wordpress(customer):
  # Our hosting company Threads Ltd. is bad. I start installation and...
  print "Start installation for", customer
  # ...then wait till the installation finishes successfully. It is
  # boring and I'm spending most of my time waiting while consuming
  # resources (memory and some CPU cycles). It's because the process
  # is *blocking*.
  sleep(3)
  print "All done for", customer
 
# I do this all day long for our customers
def developer_day(customers):
  for customer in customers:
    install_wordpress(customer)
 
developer_day(["Bill", "Elon", "Steve", "Mark"])
Nach dem Login kopieren

Führen Sie ihn aus, das Ergebnis ist wie folgt:

   
$ ./deferreds.py 1
Nach dem Login kopieren
------ Running example 1 ------
Start installation for Bill
All done for Bill
Start installation
...
* Elapsed time: 12.03 seconds
Nach dem Login kopieren

Das ist eine Folge von Ausführungscode. Bei vier Verbrauchern dauert die Installation für eine Person 3 Sekunden, bei vier Personen also 12 Sekunden. Das ist nicht sehr zufriedenstellend, also schauen Sie sich das zweite Beispiel mit Threads an:

import threading
 
# The company grew. We now have many customers and I can't handle the
# workload. We are now 5 developers doing exactly the same thing.
def developers_day(customers):
  # But we now have to synchronize... a.k.a. bureaucracy
  lock = threading.Lock()
  #
  def dev_day(id):
    print "Goodmorning from developer", id
    # Yuck - I hate locks...
    lock.acquire()
    while customers:
      customer = customers.pop(0)
      lock.release()
      # My Python is less readable
      install_wordpress(customer)
      lock.acquire()
    lock.release()
    print "Bye from developer", id
  # We go to work in the morning
  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]
  [dev.start() for dev in devs]
  # We leave for the evening
  [dev.join() for dev in devs]
 
# We now get more done in the same time but our dev process got more
# complex. As we grew we spend more time managing queues than doing dev
# work. We even had occasional deadlocks when processes got extremely
# complex. The fact is that we are still mostly pressing buttons and
# waiting but now we also spend some time in meetings.
developers_day(["Customer %d" % i for i in xrange(15)])
Nach dem Login kopieren

Führen Sie es aus:

 $ ./deferreds.py 2
Nach dem Login kopieren
------ Running example 2 ------
Goodmorning from developer 0Goodmorning from developer
1Start installation forGoodmorning from developer 2
Goodmorning from developer 3Customer 0
...
from developerCustomer 13 3Bye from developer 2
* Elapsed time: 9.02 seconds
Nach dem Login kopieren

Diesmal ist ein Codestück, das parallel unter Verwendung von 5 Arbeitsthreads ausgeführt wird. 15 Verbraucher, die jeweils 3 Sekunden benötigen, bedeuten insgesamt 45 Sekunden, aber die Verwendung von 5 Threads zur parallelen Ausführung dauert nur insgesamt 9 Sekunden. Dieser Code ist etwas komplex und ein großer Teil des Codes wird zur Verwaltung der Parallelität verwendet, anstatt sich auf Algorithmen oder Geschäftslogik zu konzentrieren. Zudem sieht die Ausgabe des Programms sehr gemischt und schwer lesbar aus. Selbst einfacher Multithread-Code ist schwer gut zu schreiben, daher wechseln wir zur Verwendung von Twisted:

# For years we thought this was all there was... We kept hiring more
# developers, more managers and buying servers. We were trying harder
# optimising processes and fire-fighting while getting mediocre
# performance in return. Till luckily one day our hosting
# company decided to increase their fees and we decided to
# switch to Twisted Ltd.!
 
from twisted.internet import reactor
from twisted.internet import defer
from twisted.internet import task
 
# Twisted has a slightly different approach
def schedule_install(customer):
  # They are calling us back when a Wordpress installation completes.
  # They connected the caller recognition system with our CRM and
  # we know exactly what a call is about and what has to be done next.
  #
  # We now design processes of what has to happen on certain events.
  def schedule_install_wordpress():
      def on_done():
        print "Callback: Finished installation for", customer
    print "Scheduling: Installation for", customer
    return task.deferLater(reactor, 3, on_done)
  #
  def all_done(_):
    print "All done for", customer
  #
  # For each customer, we schedule these processes on the CRM
  # and that
  # is all our chief-Twisted developer has to do
  d = schedule_install_wordpress()
  d.addCallback(all_done)
  #
  return d
 
# Yes, we don't need many developers anymore or any synchronization.
# ~~ Super-powered Twisted developer ~~
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  #
  # Here's what has to be done today
  work = [schedule_install(customer) for customer in customers]
  # Turn off the lights when done
  join = defer.DeferredList(work)
  join.addCallback(lambda _: reactor.stop())
  #
  print "Bye from Twisted developer!"
# Even his day is particularly short!
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
 
# Reactor, our secretary uses the CRM and follows-up on events!
reactor.run()
Nach dem Login kopieren


Laufergebnisse:

------ Running example 3 ------
Goodmorning from Twisted developer
Scheduling: Installation for Customer 0
....
Scheduling: Installation for Customer 14
Bye from Twisted developer!
Callback: Finished installation for Customer 0
All done for Customer 0
Callback: Finished installation for Customer 1
All done for Customer 1
...
All done for Customer 14
* Elapsed time: 3.18 seconds
Nach dem Login kopieren


Dieses Mal erhalten wir perfekt ausgeführten Code und eine lesbare Ausgabe, ohne Threads zu verwenden. Wir haben 15 Verbraucher parallel verarbeitet, was bedeutet, dass die Ausführungszeit, die ursprünglich 45 Sekunden dauerte, innerhalb von 3 Sekunden abgeschlossen war. Der Trick besteht darin, dass wir alle blockierenden Aufrufe von sleep() durch die entsprechenden task.deferLater()- und Callback-Funktionen in Twisted ersetzen. Da die Abwicklung nun woanders stattfindet, können wir problemlos 15 Verbraucher gleichzeitig bedienen.
Die zuvor genannten Verarbeitungsvorgänge finden an anderer Stelle statt. Um es nun zu erklären: Arithmetische Operationen finden immer noch in der CPU statt, aber die CPU-Verarbeitungsgeschwindigkeit ist jetzt im Vergleich zu Festplatten- und Netzwerkoperationen sehr hoch. Das Einspeisen von Daten in die CPU oder das Senden von Daten von der CPU an den Speicher oder eine andere CPU nimmt daher die meiste Zeit in Anspruch. Um in diesem Bereich Zeit zu sparen, verwenden wir nicht blockierende Operationen. Beispielsweise verwendet task.deferLater() eine Callback-Funktion, die aktiviert wird, wenn die Daten übertragen wurden.
Ein weiterer sehr wichtiger Punkt sind die Nachrichten „Guten Morgen vom Twisted-Entwickler“ und „Tschüs vom Twisted-Entwickler“ in der Ausgabe! Diese beiden Informationen wurden bereits gedruckt, wenn die Ausführung des Codes beginnt. Wenn der Code bereits so früh ausgeführt wird, wann beginnt unsere Anwendung dann tatsächlich mit der Ausführung? Die Antwort ist, dass eine Twisted-Anwendung (einschließlich Scrapy) in „reactor.run()“ ausgeführt wird. Bevor diese Methode aufgerufen wird, muss jede Deferred-Kette, die in der Anwendung verwendet werden kann, bereit sein. Anschließend überwacht und aktiviert die Methode „reactor.run()“ die Rückruffunktion.
Beachten Sie, dass eine der Hauptregeln des Reaktors darin besteht, dass Sie jede Operation ausführen können, solange sie schnell genug ist und nicht blockiert.
Okay, es gibt keinen Teil des Codes, der zum Verwalten mehrerer Threads verwendet wird, aber diese Rückruffunktionen sehen immer noch etwas chaotisch aus. Es kann wie folgt geändert werden:

# Twisted gave us utilities that make our code way more readable!
@defer.inlineCallbacks
def inline_install(customer):
  print "Scheduling: Installation for", customer
  yield task.deferLater(reactor, 3, lambda: None)
  print "Callback: Finished installation for", customer
  print "All done for", customer
 
def twisted_developer_day(customers):
  ... same as previously but using inline_install() instead of schedule_install()
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
Nach dem Login kopieren


Das Laufergebnis ist das gleiche wie im vorherigen Beispiel. Dieser Code macht dasselbe wie das vorherige Beispiel, sieht jedoch prägnanter und klarer aus. Der inlineCallbacks-Generator kann einige Python-Mechanismen verwenden, um die Ausführung der Funktion inline_install() anzuhalten oder fortzusetzen. Die Funktion inline_install() wird zu einem verzögerten Objekt und wird für jeden Verbraucher parallel ausgeführt. Jedes Mal, wenn ein Yield auftritt, wird der Vorgang auf der aktuellen inline_install()-Instanz angehalten, bis das Deferred-Objekt des Yields abgeschlossen ist, und dann fortgesetzt.
Die einzige Frage ist jetzt: Was wäre, wenn wir nicht nur 15 Verbraucher hätten, sondern beispielsweise 10.000 Verbraucher? Dieser Code startet 10.000 gleichzeitige Ausführungssequenzen (z. B. HTTP-Anfragen, Datenbankschreibvorgänge usw.). Daran ist möglicherweise nichts auszusetzen, es kann jedoch auch zu verschiedenen Fehlern führen. Bei Anwendungen mit großen gleichzeitigen Anforderungen, wie etwa Scrapy, müssen wir die Anzahl der Parallelitäten häufig auf ein akzeptables Maß begrenzen. Im folgenden Beispiel verwenden wir task.Cooperator(), um eine solche Funktion abzuschließen. Scrapy verwendet auch den gleichen Mechanismus in seiner Item-Pipeline, um die Anzahl der Parallelitäten zu begrenzen (d. h. CONCURRENT_ITEMS-Einstellung):

@defer.inlineCallbacks
def inline_install(customer):
  ... same as above
 
# The new "problem" is that we have to manage all this concurrency to
# avoid causing problems to others, but this is a nice problem to have.
def twisted_developer_day(customers):
  print "Goodmorning from Twisted developer"
  work = (inline_install(customer) for customer in customers)
  #
  # We use the Cooperator mechanism to make the secretary not
  # service more than 5 customers simultaneously.
  coop = task.Cooperator()
  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])
  #
  join.addCallback(lambda _: reactor.stop())
  print "Bye from Twisted developer!"
 
twisted_developer_day(["Customer %d" % i for i in xrange(15)])
reactor.run()
 
# We are now more lean than ever, our customers happy, our hosting
# bills ridiculously low and our performance stellar.
# ~*~ THE END ~*~
Nach dem Login kopieren


Running results ::

$ ./deferreds.py 5
------ Running example 5 ------
Goodmorning from Twisted developer
Bye from Twisted developer!
Scheduling: Installation for Customer 0
...
Callback: Finished installation for Customer 4
All done for Customer 4
Scheduling: Installation for Customer 5
...
Callback: Finished installation for Customer 14
All done for Customer 14
* Elapsed time: 9.19 seconds
Nach dem Login kopieren


Wie Sie der obigen Ausgabe entnehmen können, scheint es 5 Consumer-Verarbeitungsslots zu geben, wenn das Programm ausgeführt wird. Solange kein Slot freigegeben ist, kann mit der Verarbeitung der nächsten Verbraucheranfrage nicht begonnen werden. In diesem Beispiel beträgt die Verarbeitungszeit insgesamt 3 Sekunden, es sieht also so aus, als würde die Verarbeitung in 5er-Batches erfolgen. Die endgültige Leistung ist dieselbe wie bei der Verwendung von Threads, aber dieses Mal gibt es nur einen Thread, und der Code ist einfacher und es ist einfacher, korrekten Code zu schreiben.

PS: deferToThread ermöglicht nicht blockierende Synchronisierungsfunktionen
wisted defer.Deferred (von twisted.internet import defer) kann ein verzögertes Objekt zurückgeben.

Hinweis: deferToThread wird mit implementiert Threads, übermäßige Verwendung wird nicht empfohlen. Die Funktion wird in einem anderen Thread verarbeitet und hauptsächlich für Datenbank-/Dateilesevorgänge verwendet.

..
 
# 代码片段
 
  def dataReceived(self, data):
    now = int(time.time())
 
    for ftype, data in self.fpcodec.feed(data):
      if ftype == 'oob':
        self.msg('OOB:', repr(data))
      elif ftype == 0x81: # 对服务器请求的心跳应答(这个是解析 防疲劳驾驶仪,发给gps上位机的,然后上位机发给服务器的)
        self.msg('FP.PONG:', repr(data))
      else:
        self.msg('TODO:', (ftype, data))
      d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)
      d.addCallback(self._doResult, extra)
Nach dem Login kopieren


Das vollständige Beispiel Hier ist als Referenz

# -*- coding: utf-8 -*-
 
from twisted.internet import defer, reactor
from twisted.internet.threads import deferToThread
 
import functools
import time
 
# 耗时操作 这是一个同步阻塞函数
def mySleep(timeout):
  time.sleep(timeout)
 
  # 返回值相当于加进了callback里
  return 3
 
def say(result):
  print "耗时操作结束了, 并把它返回的结果给我了", result
 
# 用functools.partial包装一下, 传递参数进去
cb = functools.partial(mySleep, 3)
d = deferToThread(cb)
d.addCallback(say)
 
print "你还没有结束我就执行了, 哈哈"
 
reactor.run()
Nach dem Login kopieren

更多使用Python的Twisted框架编写非阻塞程序的代码示例相关文章请关注PHP中文网!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage