Vorhersage
In der Praxis scheinen wir die asynchrone Programmierung immer auf ähnliche Weise zu verwenden:
Auf Ereignisse achten
Führen Sie die entsprechende Rückruffunktion aus, wenn das Ereignis eintritt
Rückruf abgeschlossen (neue Ereignisse können generiert und zur Abhörwarteschlange hinzugefügt werden)
Zurück zu 1, auf Ereignisse warten
So nennen wir einen solchen asynchronen Modus Reaktormodus, z Beispiel in der iOS-Entwicklung: Das Konzept der Run-Schleife ist der Reactor-Schleife sehr ähnlich. Sobald ein UI-Ereignis auftritt, kann auch der entsprechende Ereignisverarbeitungscode generiert werden GCD und andere Methoden zur Ausführung an den Hauptthread senden.
Das obige Bild ist die Darstellung des Reactor-Modus von Boost. Das Design von Twisted basiert auf diesem Reactor-Modus und wartet kontinuierlich auf Ereignisse Verarbeitung von Ereignissen.
from twisted.internet import reactor reactor.run()
reactor ist ein Singleton-Objekt im Twisted-Programm.
reactor
reactor ist ein Ereignismanager, der zum Registrieren und Aufheben der Registrierung von Ereignissen, zum Ausführen von Ereignisschleifen und zum Aufrufen von Rückruffunktionen beim Eintreten von Ereignissen verwendet wird. Es gibt mehrere Schlussfolgerungen zum Reaktor:
Twisted Reactor kann nur durch den Aufruf von „reactor.run()“ gestartet werden.
Der Reaktorkreislauf läuft in dem Prozess, von dem er ausgeht, also im Hauptprozess.
Sobald es gestartet ist, läuft es weiter. Der Reaktor steht unter der Kontrolle des Programms (oder insbesondere unter der Kontrolle eines Threads, der ihn gestartet hat).
Reaktorschleife verbraucht keine CPU-Ressourcen.
Es ist nicht notwendig, einen Reaktor explizit zu erstellen, sondern ihn einfach zu importieren.
Letztes muss klar erklärt werden. In Twisted ist der Reaktor ein Singleton (dh der Singleton-Modus), das heißt, es kann nur einen Reaktor in einem Programm geben, und solange Sie ihn einführen, wird entsprechend einer erstellt. Die oben vorgestellte Methode ist die von Twisted verwendete Standardmethode. Natürlich verfügt Twisted über andere Methoden, die den Reaktor einführen können. Beispielsweise können Sie zum Pollen den Systemaufruf in „twisted.internet.pollreactor“ anstelle der Methode „select“ verwenden.
Wenn Sie andere Reaktoren verwenden, müssen Sie diese vor der Einführung von Twisted.internet.reactor installieren. So installieren Sie pollreactor:
from twisted.internet import pollreactor pollreactor.install()
Wenn Sie keine anderen speziellen Reaktoren installieren und Twisted.internet.reactor einführen, installiert Twisted den Standardreaktor entsprechend dem Betriebssystem . Aus diesem Grund ist es üblich, den Reaktor nicht im Modul der obersten Ebene einzuführen, um die Installation des Standardreaktors zu vermeiden, sondern ihn in dem Bereich zu installieren, in dem Sie den Reaktor verwenden möchten.
Das Folgende ist eine Neufassung des obigen Programms mit Pollreactor:
from twited.internet import pollreactor pollreactor.install() from twisted.internet import reactor reactor.run()
Wie implementiert Reactor einen Singleton? Werfen wir einen Blick darauf, was der Importreaktor von Twisted.Internet macht, und Sie werden es verstehen.
Das Folgende ist Teil des Codes von twisted/internet/reactor.py:
# twisted/internet/reactor.py import sys del sys.modules['twisted.internet.reactor'] from twisted.internet import default default.install()
Hinweis: Alle in Python in den Speicher geladenen Module werden in sys platziert .modules , ein globales Wörterbuch. Beim Importieren eines Moduls wird zunächst geprüft, ob das Modul in dieser Liste geladen wurde. Beim Laden wird lediglich der Modulname zum Namensraum des Moduls hinzugefügt, das den Import aufruft. Wenn es nicht geladen ist, durchsuchen Sie die Moduldatei anhand des Modulnamens im Verzeichnis sys.path. Laden Sie das Modul nach dem Finden in den Speicher, fügen Sie es zu sys.modules hinzu und importieren Sie den Namen in den aktuellen Namespace.
Wenn wir den Twisted.internet-Importreaktor zum ersten Mal ausführen, wird der Code in Reactor.py ausgeführt und der Standardreaktor installiert, da in sys.modules kein Twisted.internet.reactor vorhanden ist . Danach wird Twisted.internet.reactor in sys.modules beim Import direkt in den aktuellen Namespace importiert, da das Modul bereits in sys.modules vorhanden ist.
Standardinstallation:
# twisted/internet/default.py def _getInstallFunction(platform): """ Return a function to install the reactor most suited for the given platform. @param platform: The platform for which to select a reactor. @type platform: L{twisted.python.runtime.Platform} @return: A zero-argument callable which will install the selected reactor. """ try: if platform.isLinux(): try: from twisted.internet.epollreactor import install except ImportError: from twisted.internet.pollreactor import install elif platform.getType() == 'posix' and not platform.isMacOSX(): from twisted.internet.pollreactor import install else: from twisted.internet.selectreactor import install except ImportError: from twisted.internet.selectreactor import install return install install = _getInstallFunction(platform)
Standardmäßig wird natürlich die entsprechende Installation entsprechend der Plattform durchgeführt. Unter Linux wird zuerst Epollreactor verwendet. Wenn der Kernel dies nicht unterstützt, kann nur Pollreactor verwendet werden. Die Mac-Plattform verwendet Pollreactor und Windows verwendet Selectreactor. Die Implementierung jeder Installation ist ähnlich. Hier extrahieren wir die Installation im Selectreactor, um einen Blick darauf zu werfen.
# twisted/internet/selectreactor.py: def install(): """Configure the twisted mainloop to be run using the select() reactor. """ # 单例 reactor = SelectReactor() from twisted.internet.main import installReactor installReactor(reactor) # twisted/internet/main.py: def installReactor(reactor): """ Install reactor C{reactor}. @param reactor: An object that provides one or more IReactor* interfaces. """ # this stuff should be common to all reactors. import twisted.internet import sys if 'twisted.internet.reactor' in sys.modules: raise error.ReactorAlreadyInstalledError("reactor already installed") twisted.internet.reactor = reactor sys.modules['twisted.internet.reactor'] = reactor
Fügen Sie in installReactor den Schlüssel „twisted.internet.reactor“ zu sys.modules hinzu, und der Wert ist der bei der Installation erstellte Singleton-Reaktor. Wenn Sie den Reaktor in Zukunft verwenden möchten, importieren Sie diesen Singleton.
SelectReactor # twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase)
Implementierer bedeutet, dass SelectReactor die Methode der IReactorFDSet-Schnittstelle verwendet, bei der es sich um die Schnittstellenimplementierung in Python handelt kann mal vorbeischauen.
Die IReactorFDSet-Schnittstelle bietet hauptsächlich Methoden zum Abrufen, Hinzufügen, Löschen und für andere Vorgänge an Deskriptoren. Sie können die Bedeutung dieser Methoden allein anhand ihrer Namen erkennen, daher habe ich keine Kommentare hinzugefügt.
# twisted/internet/interfaces.py class IReactorFDSet(Interface): def addReader(reader): def addWriter(writer): def removeReader(reader): def removeWriter(writer): def removeAll(): def getReaders(): def getWriters(): reactor.listenTCP()
reactor.listenTCP() im Beispiel registriert ein Listening-Ereignis, bei dem es sich um eine Methode in der übergeordneten Klasse PosixReactorBase handelt.
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase): def listenTCP(self, port, factory, backlog=50, interface=''): p = tcp.Port(port, factory, backlog, interface, self) p.startListening() return p # twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def __init__(self, port, factory, backlog=50, interface='', reactor=None): """Initialize with a numeric port to listen on. """ base.BasePort.__init__(self, reactor=reactor) self.port = port self.factory = factory self.backlog = backlog if abstract.isIPv6Address(interface): self.addressFamily = socket.AF_INET6 self._addressType = address.IPv6Address self.interface = interface ... def startListening(self): """Create and bind my socket, and begin listening on it. 创建并绑定套接字,开始监听。 This is called on unserialization, and must be called after creating a server to begin listening on the specified port. """ if self._preexistingSocket is None: # Create a new socket and make it listen try: # 创建套接字 skt = self.createInternetSocket() if self.addressFamily == socket.AF_INET6: addr = _resolveIPv6(self.interface, self.port) else: addr = (self.interface, self.port) # 绑定 skt.bind(addr) except socket.error as le: raise CannotListenError(self.interface, self.port, le) # 监听 skt.listen(self.backlog) else: # Re-use the externally specified socket skt = self._preexistingSocket self._preexistingSocket = None # Avoid shutting it down at the end. self._shouldShutdown = False # Make sure that if we listened on port 0, we update that to # reflect what the OS actually assigned us. self._realPortNumber = skt.getsockname()[1] log.msg("%s starting on %s" % ( self._getLogPrefix(self.factory), self._realPortNumber)) # The order of the next 5 lines is kind of bizarre. If no one # can explain it, perhaps we should re-arrange them. self.factory.doStart() self.connected = True self.socket = skt self.fileno = self.socket.fileno self.numberAccepts = 100 # startReading调用reactor的addReader方法将Port加入读集合 self.startReading()
整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。
reacotr.run()事件主循环
# twisted/internet/posixbase.py @implementer(IReactorTCP, IReactorUDP, IReactorMulticast) class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin, ReactorBase) # twisted/internet/base.py class _SignalReactorMixin(object): def startRunning(self, installSignalHandlers=True): """ PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是 _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。 """ self._installSignalHandlers = installSignalHandlers ReactorBase.startRunning(self) def run(self, installSignalHandlers=True): self.startRunning(installSignalHandlers=installSignalHandlers) self.mainLoop() def mainLoop(self): while self._started: try: while self._started: # Advance simulation time in delayed event # processors. self.runUntilCurrent() t2 = self.timeout() t = self.running and t2 # doIteration是关键,select,poll,epool实现各有不同 self.doIteration(t) except: log.msg("Unexpected error in main loop.") log.err() else: log.msg('Main loop terminated.')
mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。
# twisted/internet/selectreactor.py @implementer(IReactorFDSet) class SelectReactor(posixbase.PosixReactorBase, _extraBase): def __init__(self): """ Initialize file descriptor tracking dictionaries and the base class. """ self._reads = set() self._writes = set() posixbase.PosixReactorBase.__init__(self) def doSelect(self, timeout): """ Run one iteration of the I/O monitor loop. This will run all selectables who had input or output readiness waiting for them. """ try: # 调用select方法监控读写集合,返回准备好读写的描述符 r, w, ignored = _select(self._reads, self._writes, [], timeout) except ValueError: # Possibly a file descriptor has gone negative? self._preenDescriptors() return except TypeError: # Something *totally* invalid (object w/o fileno, non-integral # result) was passed log.err() self._preenDescriptors() return except (select.error, socket.error, IOError) as se: # select(2) encountered an error, perhaps while calling the fileno() # method of a socket. (Python 2.6 socket.error is an IOError # subclass, but on Python 2.5 and earlier it is not.) if se.args[0] in (0, 2): # windows does this if it got an empty list if (not self._reads) and (not self._writes): return else: raise elif se.args[0] == EINTR: return elif se.args[0] == EBADF: self._preenDescriptors() return else: # OK, I really don't know what's going on. Blow up. raise _drdw = self._doReadOrWrite _logrun = log.callWithLogger for selectables, method, fdset in ((r, "doRead", self._reads), (w,"doWrite", self._writes)): for selectable in selectables: # if this was disconnected in another thread, kill it. # ^^^^ --- what the !@#*? serious! -exarkun if selectable not in fdset: continue # This for pausing input when we're not ready for more. # 调用_doReadOrWrite方法 _logrun(selectable, _drdw, selectable, method) doIteration = doSelect def _doReadOrWrite(self, selectable, method): try: # 调用method,doRead或者是doWrite, # 这里的selectable可能是我们监听的tcp.Port why = getattr(selectable, method)() except: why = sys.exc_info()[1] log.err() if why: self._disconnectSelectable(selectable, why, method=="doRead")
那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.IListeningPort) class Port(base.BasePort, _SocketCloser): def doRead(self): """Called when my socket is ready for reading. 当套接字准备好读的时候调用 This accepts a connection and calls self.protocol() to handle the wire-level protocol. """ try: if platformType == "posix": numAccepts = self.numberAccepts else: numAccepts = 1 for i in range(numAccepts): if self.disconnecting: return try: # 调用accept skt, addr = self.socket.accept() except socket.error as e: if e.args[0] in (EWOULDBLOCK, EAGAIN): self.numberAccepts = i break elif e.args[0] == EPERM: continue elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED): log.msg("Could not accept new connection (%s)" % ( errorcode[e.args[0]],)) break raise fdesc._setCloseOnExec(skt.fileno()) protocol = self.factory.buildProtocol(self._buildAddr(addr)) if protocol is None: skt.close() continue s = self.sessionno self.sessionno = s+1 # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备 # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了 transport = self.transport(skt, protocol, addr, self, s, self.reactor) protocol.makeConnection(transport) else: self.numberAccepts = self.numberAccepts+20 except: log.deferr()
doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。
Connection是Server(transport实例的类)的父类,它实现了doRead方法。
# twisted/internet/tcp.py @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle) class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser, _AbortingMixin): def doRead(self): try: # 接收数据 data = self.socket.recv(self.bufferSize) except socket.error as se: if se.args[0] == EWOULDBLOCK: return else: return main.CONNECTION_LOST return self._dataReceived(data) def _dataReceived(self, data): if not data: return main.CONNECTION_DONE # 调用我们自定义protocol的dataReceived方法处理数据 rval = self.protocol.dataReceived(data) if rval is not None: offender = self.protocol.dataReceived warningFormat = ( 'Returning a value other than None from %(fqpn)s is ' 'deprecated since %(version)s.') warningString = deprecate.getDeprecationWarningString( offender, versions.Version('Twisted', 11, 0, 0), format=warningFormat) deprecate.warnAboutFunction(offender, warningString) return rval
_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。
至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。
更多Detaillierte Erläuterung der Verwendung des Reactor Event Managers im Twisted-Framework von Python相关文章请关注PHP中文网!