首頁 後端開發 Python教學 剖析Python的Twisted框架的核心特性

剖析Python的Twisted框架的核心特性

Feb 03, 2017 pm 04:17 PM

一. reactor
twisted的核心是reactor,而提到reactor不可避免的是同步/異步,阻塞/非阻塞,在Dave的第一章概念性介紹中,對同步/異步的界限有點模糊,關於同步/異步,阻塞/非阻塞可參見知乎討論。而關於proactor(主動器)和reactor(反應器),這裡有一篇推薦部落格有比較詳細的介紹。
就reactor模式的網路IO而言,應該是同步IO而不是非同步IO。而Dave第一章中提到的非同步,核心在於:明確地放棄對任務的控制權而不是被操作系統隨機地停止,程式設計師必須將任務組織成序列來交替的小步完成。因此,若其中一個任務用到另一個任務的輸出,則依賴的任務(即接收輸出的任務)需要被設計成要接收系列位元或分片而不是一下全部接收。
顯式主動地放棄任務的控制權有點類似協程的思考方式,reactor可看作協程的調度器。 reactor是一個事件循環,我們可以向reactor註冊自己感興趣的事件(如套接字可讀/可寫)和處理器(如執行讀寫操作),reactor會在事件發生時回調我們的處理器,處理器執行完成之後,相當於協程掛起(yield),回到reactor的事件循環中,等待下一個事件來臨並回呼。 reactor本身有一個同步事件多路分解器(Synchronous Event Demultiplexer),可用select/epoll等機制實現,當然twisted reactor的事件觸發不一定是基於IO,也可以由定時器等其它機制觸發。
twisted的reactor無需我們主動註冊事件和回調函數,而是透過多態(繼承特定類,並實現所關心的事件接口,然後傳給twisted reactor)來實現。關於twisted的reactor,有幾個需要注意的地方:
twisted.internet.reactor是單例模式,每個程式只能有一個reactor;
盡量在reactor回調函數盡快完成操作,不要執行阻塞任務,reactor本質是單線程,使用者回呼程式碼與twisted程式碼運行在同一個上下文,某個回呼函數中阻塞,會導致reactor整個事件循環阻塞;
reactor會一直運行,除非透過reactor.stop()顯示停止它,但一般呼叫reactor.stop(),也意味著應用程式結束;

二. twisted簡單使用
twisted的本質是reactor,我們可以使用twisted的底層API(避開twisted便利的高層抽象)來使用reactor:

# 示例一 twisted底层API的使用
from twisted.internet import reacto
from twisted.internet import main
from twisted.internet.interfaces import IReadDescriptor
import socket
 
class MySocket(IReadDescriptor):
  def __init__(self, address):
    # 连接服务器
    self.address = address
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.sock.connect(address)
    self.sock.setblocking(0)
 
    # tell the Twisted reactor to monitor this socket for reading
    reactor.addReader(self)
  
 # 接口: 告诉reactor 监听的套接字描述符
  def fileno(self):
    try:
      return self.sock.fileno()
    except socket.error:
      return -1
       
 # 接口: 在连接断开时的回调
  def connectionLost(self, reason):
    self.sock.close()
 
    reactor.removeReader(self)
  
 # 当应用程序需要终止时 调用:
    # reactor.stop()
 
 # 接口: 当套接字描述符有数据可读时
  def doRead(self):
    bytes = ''
 
 # 尽可能多的读取数据
    while True:
      try:
        bytesread = self.sock.recv(1024)
        if not bytesread:
          break
        else:
          bytes += bytesread
      except socket.error, e:
        if e.args[0] == errno.EWOULDBLOCK:
          break
        return main.CONNECTION_LOST
 
    if not bytes: 
      return main.CONNECTION_DONE
    else:
      # 在这里解析协议并处理数据
      print bytes
登入後複製

範例一可以很清晰的看到twisted的reactor本質:新增監聽描述符,監聽可讀/可寫事件,當事件來臨時回調函數,回調完成之後繼續監聽事件。
需要注意:
套接字為非阻塞,如果為阻塞則失去了reactor的意義
我們透過繼承IReadDescriptor來提供reactor所需要的介面
透過reactor.addReader將套接字類別加入reactor的監聽物件中
main.CONNECTION_LOST是twisted預先定義的值,透過這些值它我們可以一定程度控制下一步回呼(類似於模擬一個事件)
但是上面的MySocket類別不夠好,主要有以下缺點:
需要我們自己去讀取數據,而不是框架幫我們讀好,並處理異常
網絡IO和數據處理混為一塊,沒有剝離開來

三. twisted抽象
twisted在reactor的基礎上,建立了更高的抽象,對一個網路連接而言,twisted建立瞭如下三個概念:
Transports:網路連接層,僅負責網路連接和讀取/寫入位元組資料
Protocols: 協定層,服務業務相關的網路協議,將位元組流轉換成應用所需資料
Protocol Factories:協議工廠,負責創建Protocols,每個網路連線都有一個Protocols物件(因為要保存協定解析狀態)
twisted的這些概念和erlang中的ranch網路框架很像,ranch框架也抽象化了Transports和Protocols概念,當有新的網路連線時,ranch自動建立Transports和Protocols,其中Protocols由使用者在啟動ranch時傳入,是實作了ranch_protocol behaviour的模組,Protocols初始化時,會收到該連接對應的Transports,這樣我們就可以在Protocols中處理位元組流數據,按照我們的協定解析並處理數據。同時可透過Transports來傳送資料(ranch已經幫你讀取了位元組流資料了)。
和ranch類似,twisted也會在新連接到達時創建Protocols並且將Transport傳入,twisted會幫我們讀取字節流數據,我們只需在dataReceived(self, data)接口中處理字節流數據即可。此時的twisted在網路IO上可以算是真正的非同步了,它幫我們處理了網路IO和可能遇到的異常,並且將網路IO和資料處理剝離開來,抽象化為Transports和Protocols,提高了程式的清晰性和健壯性。

# 示例二 twisted抽象的使用
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ClientFactory
class MyProtocol(Protocol):
  
 # 接口: Protocols初始化时调用,并传入Transports
 # 另外 twisted会自动将Protocols的factory对象成员设为ProtocolsFactory实例的引用
 #   如此就可以通过factory来与MyProtocolFactory交互
 def makeConnection(self,trans):
    print 'make connection: get transport: ', trans
    print 'my factory is: ', self.factory
     
 # 接口: 有数据到达
  def dataReceived(self, data):
    self.poem += data
    msg = 'Task %d: got %d bytes of poetry from %s'
    print msg % (self.task_num, len(data), self.transport.getPeer())
  
 # 接口: 连接断开
  def connectionLost(self, reason):
    # 连接断开的处理
 
 
class MyProtocolFactory(ClientFactory):
 
 # 接口: 通过protocol类成员指出需要创建的Protocols
  protocol = PoetryProtocol # tell base class what proto to build
 
  def __init__(self, address):
    self.poetry_count = poetry_count
    self.poems = {} # task num -> poem
     
 # 接口: 在创建Protocols的回调
  def buildProtocol(self, address):
    proto = ClientFactory.buildProtocol(self, address)
    # 在这里对proto做一些初始化....
    return proto
     
 # 接口: 连接Server失败时的回调
  def clientConnectionFailed(self, connector, reason):
    print 'Failed to connect to:', connector.getDestination()
     
def main(address):
 factory = MyClientFactory(address)
  host, port = address
  # 连接服务端时传入ProtocolsFactory
  reactor.connectTCP(host, port, factory) 
  reactor.run()
登入後複製

範例二要比範例一要簡單清晰很多,因為它無需處理網路IO,並且邏輯上更為清晰,實際上ClientFactory和Protocol提供了更多的介面用於實現更靈活強大的邏輯控制,具體的介面可參見twisted原始碼。

四. twisted Deferred
twisted Deferred对象用于解决这样的问题:有时候我们需要在ProtocolsFactory中嵌入自己的回调,以便Protocols中发生某个事件(如所有Protocols都处理完成)时,回调我们指定的函数(如TaskFinished)。如果我们自己来实现回调,需要处理几个问题:
如何区分回调的正确返回和错误返回?(我们在使用异步调用时,要尤其注意错误返回的重要性)
如果我们的正确返回和错误返回都需要执行一个公共函数(如关闭连接)呢?
如果保证该回调只被调用一次?
Deferred对象便用于解决这种问题,它提供两个回调链,分别对应于正确返回和错误返回,在正确返回或错误返回时,它会依次调用对应链中的函数,并且保证回调的唯一性。

d = Deferred()
# 添加正确回调和错误回调
d.addCallbacks(your_ok_callback, your_err_callback)
# 添加公共回调函数
d.addBoth(your_common_callback)
 
# 正确返回 将依次调用 your_ok_callback(Res) -> common_callback(Res)
d.callback(Res)
# 错误返回 将依次调用 your_err_callback(Err) -> common_callback(Err)
d.errback(Err)
 
# 注意,对同一个Defered对象,只能返回一次,尝试多次返回将会报错
登入後複製

twisted的defer是异步的一种变现方式,可以这么理解,他和thread的区别是,他是基于时间event的。
有了deferred,即可对任务的执行进行管理控制。防止程序的运行,由于等待某项任务的完成而陷入阻塞停滞,提高整体运行的效率。
Deferred能帮助你编写异步代码,但并不是为自动生成异步或无阻塞的代码!要想将一个同步函数编程异步函数,必须在函数中返回Deferred并正确注册回调。

五.综合示例

下面的例子,你们自己跑跑,我上面说的都是一些个零散的例子,大家对照下面完整的,走一遍。 twisted理解其实却是有点麻烦,大家只要知道他是基于事件的后,慢慢理解就行了。

#coding:utf-8
#xiaorui.cc
from twisted.internet import reactor, defer
from twisted.internet.threads import deferToThread
import os,sys
from twisted.python import threadable; threadable.init(1)
deferred =deferToThread.__get__
import time
def todoprint_(result):
  print result
def running():
  "Prints a few dots on stdout while the reactor is running."
#   sys.stdout.write("."); sys.stdout.flush()
  print '.'
  reactor.callLater(.1, running)
@deferred
def sleep(sec):
  "A blocking function magically converted in a non-blocking one."
  print 'start sleep %s'%sec
  time.sleep(sec)
  print '\nend sleep %s'%sec
  return "ok"
def test(n,m):
  print "fun test() is start"
  m=m
  vals = []
  keys = []
  for i in xrange(m):
    vals.append(i)
    keys.append('a%s'%i)
  d = None
  for i in xrange(n):
    d = dict(zip(keys, vals))
  print "fun test() is end"
  return d
if __name__== "__main__":
#one
  sleep(10).addBoth(todoprint_)
  reactor.callLater(.1, running)
  reactor.callLater(3, reactor.stop)
  print "go go !!!"
  reactor.run()
#two
  aa=time.time()
  de = defer.Deferred()
  de.addCallback(test)
  reactor.callInThread(de.callback,10000000,100 )
  print time.time()-aa
  print "我这里先做别的事情"
  print de
  print "go go end"
登入後複製

更多剖析Python的Twisted框架的核心特性相关文章请关注PHP中文网!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何解決Linux終端中查看Python版本時遇到的權限問題? 如何解決Linux終端中查看Python版本時遇到的權限問題? Apr 01, 2025 pm 05:09 PM

Linux終端中查看Python版本時遇到權限問題的解決方法當你在Linux終端中嘗試查看Python的版本時,輸入python...

如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? 如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? 在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? Apr 01, 2025 pm 11:15 PM

在使用Python的pandas庫時,如何在兩個結構不同的DataFrame之間進行整列複製是一個常見的問題。假設我們有兩個Dat...

Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Apr 01, 2025 pm 10:51 PM

Uvicorn是如何持續監聽HTTP請求的? Uvicorn是一個基於ASGI的輕量級Web服務器,其核心功能之一便是監聽HTTP請求並進�...

在Linux終端中使用python --version命令時如何解決權限問題? 在Linux終端中使用python --version命令時如何解決權限問題? Apr 02, 2025 am 06:36 AM

Linux終端中使用python...

如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? 如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? Apr 02, 2025 am 07:18 AM

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

如何繞過Investing.com的反爬蟲機制獲取新聞數據? 如何繞過Investing.com的反爬蟲機制獲取新聞數據? Apr 02, 2025 am 07:03 AM

攻克Investing.com的反爬蟲策略許多人嘗試爬取Investing.com(https://cn.investing.com/news/latest-news)的新聞數據時,常常�...

See all articles