1. 리액터
트위스티드의 핵심은 리액터인데, 리액터는 필연적으로 동기/비동기, 블로킹/비블로킹이 가능합니다. 첫 번째 장에서 데이브의 개념 소개에는 동기/비동기의 경계가 약간 있습니다. 퍼지, 동기화/비동기, 차단/비차단에 대해서는 Zhihu 토론을 참조하세요. 프로액터(activator)와 리액터(reactor)에 대해서는 좀 더 자세한 소개가 있는 추천 블로그를 추천드립니다.
리액터 모드의 네트워크 IO는 비동기식 IO가 아닌 동기식 IO여야 합니다. Dave의 첫 번째 장에서 언급된 비동기성의 핵심은 운영 체제에 의해 무작위로 중지되는 대신 작업 제어를 명시적으로 포기하는 것입니다. 프로그래머는 작은 단계를 번갈아 완료하기 위해 작업을 순서대로 구성해야 합니다. 따라서 작업 중 하나가 다른 작업의 출력을 사용하는 경우 종속 작업(즉, 출력을 수신하는 작업)은 한꺼번에 모두 수신하기보다는 일련의 비트 또는 조각을 수신하도록 설계해야 합니다.
명시적이고 적극적으로 작업 제어를 포기하는 것은 코루틴에 대한 사고 방식과 다소 유사합니다. 리액터는 코루틴의 스케줄러로 간주될 수 있습니다. 리액터는 이벤트 루프입니다. 관심 있는 이벤트(읽기/쓰기 가능한 소켓 등)와 프로세서(읽기 및 쓰기 작업 수행)를 리액터에 등록할 수 있습니다. 프로세서 실행이 완료된 후 이는 코루틴 정지(수율)와 동일하며 리액터의 이벤트 루프로 돌아가서 다음 이벤트가 올 때까지 기다렸다가 다시 호출합니다. 리액터 자체에는 선택/에폴링 및 기타 메커니즘으로 구현될 수 있는 동기식 이벤트 디멀티플렉서가 있습니다. 물론 트위스트 리액터의 이벤트 트리거링은 반드시 IO를 기반으로 하는 것은 아니지만 타이머와 같은 다른 메커니즘에 의해 트리거될 수도 있습니다.
Twisted Reactor는 이벤트와 콜백 함수를 적극적으로 등록할 것을 요구하지 않지만 다형성(특정 클래스를 상속하고 해당 이벤트 인터페이스를 구현한 다음 이를 Twisted Reactor에 전달)을 통해 구현됩니다. Twisted Reactor와 관련하여 몇 가지 참고할 사항이 있습니다.
Twisted.internet.reactor는 싱글톤 모드이며 각 프로그램에는 하나의 리액터만 있을 수 있습니다.
리액터 콜백 함수에서 최대한 빨리 작업을 완료하도록 노력하세요. , 차단을 수행하지 않습니다. 작업과 리액터는 기본적으로 단일 스레드입니다. 특정 콜백 함수를 차단하면 리액터의 전체 이벤트 루프가 차단됩니다. Reactor.stop()을 사용하지 않는 한 항상 실행합니다. 명시적으로 중지하지만 일반적으로 Reactor.stop()을 호출하면 애플리케이션이 종료됩니다.
twisted의 본질은 다음과 같습니다. 리액터를 사용하려면 (twisted의 편리한 고수준 추상화를 피하기 위해) Twisted의 기본 API를 사용하여 리액터를 사용할 수 있습니다.
# 示例一 twisted底层API的使用 from twisted.internet import reacto from twisted.internet import main from twisted.internet.interfaces import IReadDescriptor import socket class MySocket(IReadDescriptor): def __init__(self, address): # 连接服务器 self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0) # tell the Twisted reactor to monitor this socket for reading reactor.addReader(self) # 接口: 告诉reactor 监听的套接字描述符 def fileno(self): try: return self.sock.fileno() except socket.error: return -1 # 接口: 在连接断开时的回调 def connectionLost(self, reason): self.sock.close() reactor.removeReader(self) # 当应用程序需要终止时 调用: # reactor.stop() # 接口: 当套接字描述符有数据可读时 def doRead(self): bytes = '' # 尽可能多的读取数据 while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST if not bytes: return main.CONNECTION_DONE else: # 在这里解析协议并处理数据 print bytes
참고:
소켓은 비차단입니다. 차단하면 리액터의 의미가 사라집니다
IReadDescriptor를 상속하여 리액터에 필요한 인터페이스를 제공합니다
소켓을 Reactor.addReader에 추가합니다. 클래스는 리액터의 청취 객체에 추가됩니다.
main.CONNECTION_LOST는 미리 정의된 Twisted 값을 통해 다음 콜백을 어느 정도 제어할 수 있습니다(이벤트 시뮬레이션과 유사).
그러나 위의 MySocket 클래스는 주요 단점은 다음과 같습니다.
프레임워크가 데이터를 읽고 예외를 처리하는 대신 데이터를 직접 읽어야 합니다.
네트워크 IO와 데이터 처리가 함께 혼합되어 분리되지 않습니다.
twisted는 네트워크 연결을 위해 다음과 같은 세 가지 개념을 확립했습니다.
전송: 네트워크 연결 계층, 네트워크 연결 및 읽기/쓰기만 담당합니다. 바이트 데이터
프로토콜: 프로토콜 레이어, 서비스 비즈니스 관련 네트워크 프로토콜, 바이트 스트림을 애플리케이션에 필요한 데이터로 변환
프로토콜 팩토리: 프로토콜 생성을 담당하는 프로토콜 팩토리, 각 네트워크 연결에는 하나의 프로토콜 개체가 있습니다. 프로토콜 구문 분석 상태를 저장해야 합니다.
이러한 Twisted 개념은 Erlang의 Ranch 네트워크 프레임워크와 매우 유사합니다. 또한 Ranch 프레임워크는 새로운 네트워크 연결이 있을 때 자동으로 Ranch를 추상화합니다. Ranch_protocol 동작을 구현하는 모듈은 Ranch_protocol 동작을 구현하는 모듈입니다. 연결에 해당하는 Transport를 수신하여 바이트 스트림 데이터를 처리할 수 있습니다. 프로토콜에서는 당사의 프로토콜에 따라 데이터를 구문 분석하고 처리합니다. 동시에 전송을 통해 데이터를 전송할 수 있습니다(ranch는 이미 바이트 스트림 데이터를 읽었습니다).
ranch와 유사하게 Twisted는 프로토콜을 생성하고 새로운 연결이 도착할 때 전송을 전달합니다. Twisted는 바이트 스트림 데이터를 읽는 데 도움이 되며 dataReceived(self, data) 인터페이스에서 바이트 스트림만 처리하면 됩니다. . 그냥 데이터. 이때 Twisted는 네트워크 IO와 가능한 예외를 처리하는 데 도움이 되며, 네트워크 IO와 데이터 처리를 분리하여 전송 및 프로토콜로 추상화하여 프로그램의 효율성을 향상시킵니다. 그리고 견고성.
# 示例二 twisted抽象的使用 from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory class MyProtocol(Protocol): # 接口: Protocols初始化时调用,并传入Transports # 另外 twisted会自动将Protocols的factory对象成员设为ProtocolsFactory实例的引用 # 如此就可以通过factory来与MyProtocolFactory交互 def makeConnection(self,trans): print 'make connection: get transport: ', trans print 'my factory is: ', self.factory # 接口: 有数据到达 def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getPeer()) # 接口: 连接断开 def connectionLost(self, reason): # 连接断开的处理 class MyProtocolFactory(ClientFactory): # 接口: 通过protocol类成员指出需要创建的Protocols protocol = PoetryProtocol # tell base class what proto to build def __init__(self, address): self.poetry_count = poetry_count self.poems = {} # task num -> poem # 接口: 在创建Protocols的回调 def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) # 在这里对proto做一些初始化.... return proto # 接口: 连接Server失败时的回调 def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() def main(address): factory = MyClientFactory(address) host, port = address # 连接服务端时传入ProtocolsFactory reactor.connectTCP(host, port, factory) reactor.run()
四. twisted Deferred
twisted Deferred对象用于解决这样的问题:有时候我们需要在ProtocolsFactory中嵌入自己的回调,以便Protocols中发生某个事件(如所有Protocols都处理完成)时,回调我们指定的函数(如TaskFinished)。如果我们自己来实现回调,需要处理几个问题:
如何区分回调的正确返回和错误返回?(我们在使用异步调用时,要尤其注意错误返回的重要性)
如果我们的正确返回和错误返回都需要执行一个公共函数(如关闭连接)呢?
如果保证该回调只被调用一次?
Deferred对象便用于解决这种问题,它提供两个回调链,分别对应于正确返回和错误返回,在正确返回或错误返回时,它会依次调用对应链中的函数,并且保证回调的唯一性。
d = Deferred() # 添加正确回调和错误回调 d.addCallbacks(your_ok_callback, your_err_callback) # 添加公共回调函数 d.addBoth(your_common_callback) # 正确返回 将依次调用 your_ok_callback(Res) -> common_callback(Res) d.callback(Res) # 错误返回 将依次调用 your_err_callback(Err) -> common_callback(Err) d.errback(Err) # 注意,对同一个Defered对象,只能返回一次,尝试多次返回将会报错
twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。
有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。
Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。
五.综合示例
下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8 #xiaorui.cc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread import os,sys from twisted.python import threadable; threadable.init(1) deferred =deferToThread.__get__ import time def todoprint_(result): print result def running(): "Prints a few dots on stdout while the reactor is running." # sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running) @deferred def sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '\nend sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我这里先做别的事情" print de print "go go end"
更多剖析Python的Twisted框架的核心特性相关文章请关注PHP中文网!