1. reactor
The core of twisted is reactor, and when it comes to reactor, it is inevitable to be synchronous/asynchronous, blocking/non-blocking. In Dave’s conceptual introduction in the first chapter, there is a little bit of boundary between synchronization/asynchronous Fuzzy, regarding synchronization/asynchronous, blocking/non-blocking, please refer to Zhihu discussion. As for proactor (activator) and reactor (reactor), here is a recommended blog with a more detailed introduction.
As far as reactor mode network IO is concerned, it should be synchronous IO rather than asynchronous IO. The core of asynchronousness mentioned in Dave's first chapter is: explicitly giving up control of the task instead of being randomly stopped by the operating system. The programmer must organize the task into a sequence to complete in alternating small steps. Therefore, if one of the tasks uses the output of another task, the dependent task (that is, the task that receives the output) needs to be designed to receive a series of bits or fragments rather than all at once.
Explicitly and proactively giving up control of a task is somewhat similar to the way of thinking about coroutines. Reactor can be regarded as the scheduler of coroutines. Reactor is an event loop. We can register the events we are interested in (such as the socket being readable/writable) and processors (such as performing read and write operations) with the reactor. The reactor will call back our processor when the event occurs. After the processor execution is completed, it is equivalent to the coroutine hanging (yield), returning to the reactor's event loop, waiting for the next event to come and calling back. The reactor itself has a Synchronous Event Demultiplexer, which can be implemented by select/epoll and other mechanisms. Of course, the event triggering of the twisted reactor is not necessarily based on IO, but can also be triggered by other mechanisms such as timers.
Twisted reactor does not require us to actively register events and callback functions, but is implemented through polymorphism (inheriting specific classes, implementing the event interface of concern, and then passing it to twisted reactor). Regarding twisted reactor, there are several things to note:
twisted.internet.reactor is a singleton mode, and each program can only have one reactor;
Try to complete the operation as soon as possible in the reactor callback function, and do not perform blocking Task, reactor is essentially single-threaded. User callback code and twisted code run in the same context. Blocking in a certain callback function will cause the reactor's entire event loop to block;
reactor will always run unless reactor.stop() is used Display to stop it, but generally calling reactor.stop() means that the application ends; Turn on twisted high-level abstraction) to use reactor:
# 示例一 twisted底层API的使用 from twisted.internet import reacto from twisted.internet import main from twisted.internet.interfaces import IReadDescriptor import socket class MySocket(IReadDescriptor): def __init__(self, address): # 连接服务器 self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0) # tell the Twisted reactor to monitor this socket for reading reactor.addReader(self) # 接口: 告诉reactor 监听的套接字描述符 def fileno(self): try: return self.sock.fileno() except socket.error: return -1 # 接口: 在连接断开时的回调 def connectionLost(self, reason): self.sock.close() reactor.removeReader(self) # 当应用程序需要终止时 调用: # reactor.stop() # 接口: 当套接字描述符有数据可读时 def doRead(self): bytes = '' # 尽可能多的读取数据 while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST if not bytes: return main.CONNECTION_DONE else: # 在这里解析协议并处理数据 print bytes
Example 1 can clearly see the essence of twisted reactor: add a listening descriptor, listen for readable/writable events, and callback functions when the event comes , continue to listen for events after the callback is completed.
Note:
We provide the interface required by reactor by inheriting IReadDescriptor.
Add the socket to reactor.addReader The class is added to the reactor's listening object
main.CONNECTION_LOST is a predefined value of twisted. Through these values, we can control the next callback to a certain extent (similar to simulating an event)
But the MySocket class above is not good enough. The main disadvantages are:
We need to read the data ourselves, rather than the framework reading it for us and handling exceptions
Network IO and data processing are mixed together and not separated
Three . twisted abstraction
twisted has established a higher abstraction based on reactor. For a network connection, twisted has established the following three concepts:
Protocols: Protocol layer, service business-related network protocols, convert the byte stream into data required by the application
Protocol Factories: Protocol factory, responsible for creating Protocols, each network connection has one Protocols object (because it is necessary to save the protocol parsing state)
These concepts of twisted are very similar to the ranch network framework in Erlang. The ranch framework also abstracts the concepts of Transports and Protocols. When there is a new network connection, ranch automatically creates Transports and Protocols. Protocols, where Protocols are passed in by the user when starting the ranch, is a module that implements the ranch_protocol behavior. When Protocols is initialized, the Transports corresponding to the connection will be received, so that we can process byte stream data in Protocols, according to our The protocol parses and processes the data. At the same time, data can be sent through Transports (ranch has already read the byte stream data for you).
Similar to ranch, twisted will also create Protocols and pass in the Transport when a new connection arrives. Twisted will help us read the byte stream data. We only need to process the byte stream in the dataReceived(self, data) interface. Just data. At this time, twisted can be regarded as truly asynchronous in network IO. It helps us deal with network IO and possible exceptions, and separates network IO and data processing, abstracting them into Transports and Protocols, which improves the efficiency of the program. Clarity and robustness.
# 示例二 twisted抽象的使用 from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory class MyProtocol(Protocol): # 接口: Protocols初始化时调用,并传入Transports # 另外 twisted会自动将Protocols的factory对象成员设为ProtocolsFactory实例的引用 # 如此就可以通过factory来与MyProtocolFactory交互 def makeConnection(self,trans): print 'make connection: get transport: ', trans print 'my factory is: ', self.factory # 接口: 有数据到达 def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getPeer()) # 接口: 连接断开 def connectionLost(self, reason): # 连接断开的处理 class MyProtocolFactory(ClientFactory): # 接口: 通过protocol类成员指出需要创建的Protocols protocol = PoetryProtocol # tell base class what proto to build def __init__(self, address): self.poetry_count = poetry_count self.poems = {} # task num -> poem # 接口: 在创建Protocols的回调 def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) # 在这里对proto做一些初始化.... return proto # 接口: 连接Server失败时的回调 def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() def main(address): factory = MyClientFactory(address) host, port = address # 连接服务端时传入ProtocolsFactory reactor.connectTCP(host, port, factory) reactor.run()
四. twisted Deferred
twisted Deferred对象用于解决这样的问题:有时候我们需要在ProtocolsFactory中嵌入自己的回调,以便Protocols中发生某个事件(如所有Protocols都处理完成)时,回调我们指定的函数(如TaskFinished)。如果我们自己来实现回调,需要处理几个问题:
如何区分回调的正确返回和错误返回?(我们在使用异步调用时,要尤其注意错误返回的重要性)
如果我们的正确返回和错误返回都需要执行一个公共函数(如关闭连接)呢?
如果保证该回调只被调用一次?
Deferred对象便用于解决这种问题,它提供两个回调链,分别对应于正确返回和错误返回,在正确返回或错误返回时,它会依次调用对应链中的函数,并且保证回调的唯一性。
d = Deferred() # 添加正确回调和错误回调 d.addCallbacks(your_ok_callback, your_err_callback) # 添加公共回调函数 d.addBoth(your_common_callback) # 正确返回 将依次调用 your_ok_callback(Res) -> common_callback(Res) d.callback(Res) # 错误返回 将依次调用 your_err_callback(Err) -> common_callback(Err) d.errback(Err) # 注意,对同一个Defered对象,只能返回一次,尝试多次返回将会报错
twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。
有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。
Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。
五.综合示例
下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8 #xiaorui.cc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread import os,sys from twisted.python import threadable; threadable.init(1) deferred =deferToThread.__get__ import time def todoprint_(result): print result def running(): "Prints a few dots on stdout while the reactor is running." # sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running) @deferred def sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '\nend sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我这里先做别的事情" print de print "go go end"
更多剖析Python的Twisted框架的核心特性相关文章请关注PHP中文网!