首頁 > 後端開發 > Python教學 > 解析python select epoll poll的詳細說明

解析python select epoll poll的詳細說明

高洛峰
發布: 2017-03-15 13:20:01
原創
2282 人瀏覽過

select、poll、epoll三者的區別 

select 

select最早於1983年出現在4.2BSD中,它透過一個select()系統呼叫來監視多個文件描述符的數組(在linux中一切事物皆文件,塊設備,socket連接等。),當select()返回後,該數組中就緒的文件描述符便會被核心修改標誌位元(變成ready),使得進程可以獲得這些檔案描述符從而進行後續的讀寫操作(select會不斷監視網路介面的某個目錄下有多少檔案描述符變成ready狀態【在網路介面中,過來一個連接就會建立一個'檔案'】,變成ready狀態後,select就可以操作這個檔案描述子了)。

【socketserver是透過多執行緒來處理多個請求,每個連接過來分配一個執行緒來處理,但是select是單一進程的,一個行程執行程式碼肯定就是串列的,但是現在就要透過一個行程來實現並發的效果,一個行程下只有一個主線程,也就說說用一個執行緒來實現並發的效果。為什麼要用一個行程實現多並發而不採用多執行緒實作多並發呢?

==========答:因為一個行程實現多並發比多執行緒是實現多並發的效率還要高,因為啟動多執行緒會有很多的開銷,而且CPU要不斷的檢查每個執行緒的狀態,確定哪個執行緒是否可以執行。這個對系統來說也是有壓力的,用單一進程的話就可以避免這種開銷和給系統帶來的壓力,

那麼單一進程是如何實現多並發的呢? ? ?

========答:很巧妙的使用了生產者和消費者的模式(非同步),生產者和消費者可以實現非阻塞,一個socketserver透過select接收多個連接過來(之前的socket一個進程只能接收一個連接,當接收新的連接的時候產生阻塞,因為這個socket進程要先和客戶端進行通信,二者是彼此互相等待的【客戶端發一條消息,服務端收到,客戶端等著返回....服務端等著接收.........】一直在阻塞著,這個時候如果再來一個連接,要等之前的那個連接斷了,這個才可以連進來。 -----------也就是說用基本的socket實現多進程是阻塞的。執行緒數量過多的時候,對於cpu來說開銷和壓力是比較大的。為了避免這種情況,就出現了異步=============客戶端發起一個連接,會在服務端註冊一個文件句柄,服務端會不斷輪詢這些文件句柄的列表,主進程和客戶端建立連接而沒有啟動線程,這個時候主進程和客戶端進行交互,其他的客戶端是無法連接主進程的,為了實現主進程既能和已連接的客戶端收發訊息,又能和新的客戶端建立連接,就把輪詢變的非常快(死循環)去刷客戶端連接進來的文件句柄的列表,只要客戶端發訊息了,服務端讀取了訊息之後,有另一個列表去接收給客戶端回傳的訊息,也不斷的去刷這個列表,刷出來後返回給客戶端,這樣和客戶端的這次通信就完成了,但是跟客戶端的連接還沒有斷,但是就進入了下一次的輪詢。 】

  

select 優點

select目前幾乎在所有的平台上支持,良好跨平台性。

 

select 缺點

每次呼叫select,都需要把fd集合從使用者態拷貝到核心態,這個開銷在fd很多的時候會很大

單一進程能夠監視的fd數量存在最大限制,在linux上預設為1024(可以透過修改巨集定義或重新編譯內核的方式來提升這個限制)

並且由於select的fd是放在數組中,而且每次都要線性遍歷整個數組,當fd很多的時候,開銷也很大

#python  select 

#

呼叫select的函數為readable,writable,exceptional = select.select(rlist, wlist, xlist[, timeout]),前三個參數都分別是三個列表,數組中的物件皆為waitable object:皆是整數的檔案描述符(file descriptor)或一個擁有返回檔案描述符方法fileno()的物件;

rlist: 等待讀取就緒的list

# wlist: 等待寫就緒的list

errlist: 等待「例外」的list

 

select方法用來監視檔案描述符,如果文件描述符發生變化,則取得該描述符。

1、這三個list可以是一個空的list,但是接收3個空的list是依賴系統的(在Linux上是可以接受的,但是在window上是不可以的)。

2、當rlist 序列中的描述子發生可讀時(accetp和read),則取得變更的描述子並加入readable 序列中

3、當wlist 序列中含有描述子時,則將該序列中所有的描述子加入writable 序列中

4、當errlist序列中的句柄發生錯誤時,則將該發生錯誤的句柄加入exceptional 序列中

5、當逾時時間未設置,則select會一直阻塞,直到監聽的描述符發生變化

   當逾時時間= 1 時,那麼如果監聽的句柄均無任何變化,則select會阻塞 1 秒,之後返回三個空列表,如果監聽的描述符(fd)有變化,則直接執行。

6、在list中可以接受Ptython的的file物件(例如sys.stdin,或是會被open()和os.open()回傳的object),socket object將會回傳socket.socket( )。也可以自訂類,只要有一個合適的fileno()的方法(需要真實傳回一個檔案描述符,而不是一個隨機的整數)。

 

select 範例:

Python的select()方法直接呼叫作業系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,並且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接透過作業系統提供的C的網路介面進行操作,而不是透過Python的解釋器

#coding:UTF8

import select
import socket
import sys
import Queue

#创建一个TCP/IP 进程
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#连接地址和端口
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s prot %s' % server_address
server.bind(server_address)

#最大允许链接数
server.listen(5)

inputs = [ server ]
outputs = []

message_queues = {}

while inputs:
    print >>sys.stderr,'\nwaiting for the next event'
    readable,writable,exceptional = select.select(inputs,outputs,inputs)

    # Handle inputs
    for s in readable:
     
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print >>sys.stderr, 'new connection from', client_address
            #connection.setblocking(0)
            inputs.append(connection)
     
            # Give the connection a queue for data we want to send
            message_queues[connection] = Queue.Queue()
        
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername())
                message_queues[s].put(data)  #这个s相当于connection
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)

            else:
                # Interpret empty result as closed connection
                print >>sys.stderr, 'closing', client_address, 'after reading no data'
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把这个连接关闭掉
                 
                # Remove message queue
                del message_queues[s]
    
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            # No messages waiting so stop checking for writability.
            print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty'
            outputs.remove(s)
        else:
            print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
            s.send(next_msg.upper())
    # Handle "exceptional conditions"
    for s in exceptional:
        print >>sys.stderr, 'handling exceptional condition for', s.getpeername()
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
     
        # Remove message queue
        del message_queues[s]     

server
登入後複製

程式碼解析:

 

select()方法接收並監控3個通訊列表, 第一個是所有的輸入的data,就是指外部發過來的數據,第2個是監控和接收所有要發出去的data(outgoing data),第3個監控錯誤訊息,接下來我們需要建立2個清單來包含輸入和輸出訊息來傳給select().

# Sockets from which we expect to readinputs = [ server ]# Sockets to which we expect to writeoutputs = [ ]

 

所有客戶端的進來的連接和資料將會被server的主循環程式放在上面的list中處理,我們現在的server端需要等待連接可寫(writable)之後才能過來,然後接收資料並返回(因此不是在接收到資料之後就立刻返回),因為每個連接要把輸入或輸出的資料先快取到queue裡,然後再由select取出來再發出去。

# Outgoing message queues (socket:Queue)message_queues = {}

The main portion of the server program loops, call# ing select() to block and wait for network activity.

#下方是此程式的主循環,呼叫select()時會阻塞並等待直到新的連線和資料進來

while inputs :    # Wait for at least one of the sockets to be ready for processing    print >>sys.stderr, '\nwaiting for the next event'    readable, writable, exceptional = select.select(

# 當你把inputs,outputs,exceptional(這裡跟inputs共用)傳給select()後,它返回3個新的list,我們上面將他們分別賦值為readable,writable,exceptional, 所有在readable list中的socket連接代表有資料可接收(recv),所有在writable list中的存放著你可以對其進行發送(send)操作的socket連接,當連接通信出現error時會把error寫到exceptional列表中。

 

Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。

 

第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。

 

第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。

 

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态

 

最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉

#coding:UTF8

import socket
import sys
 
messages = [ 'This is the message. ',
             'It will be sent ',
             'in parts.',
             ]
server_address = ('localhost', 10003)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
    s.connect(server_address)

for message in messages:
 
    # Send messages on both sockets
    for s in socks:
        print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
        if not data:
            print >>sys.stderr, 'closing socket', s.getsockname()

client
登入後複製

客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互,通过循环通过每个socket连接给server发送和接收数据。

server:
starting up on localhost prot 10000

waiting for the next event
new connection from ('127.0.0.1', 54812)

waiting for the next event
new connection from ('127.0.0.1', 54813)
received "This is the message. " from ('127.0.0.1', 54812)

waiting for the next event
received "This is the message. " from ('127.0.0.1', 54813)
sending "This is the message. " to ('127.0.0.1', 54812)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
sending "This is the message. " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "It will be sent " from ('127.0.0.1', 54812)
received "It will be sent " from ('127.0.0.1', 54813)

waiting for the next event
sending "It will be sent " to ('127.0.0.1', 54812)
sending "It will be sent " to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
received "in parts." from ('127.0.0.1', 54812)
received "in parts." from ('127.0.0.1', 54813)

waiting for the next event
sending "in parts." to ('127.0.0.1', 54812)
sending "in parts." to ('127.0.0.1', 54813)

waiting for the next event
output queue for ('127.0.0.1', 54812) is empty
output queue for ('127.0.0.1', 54813) is empty

waiting for the next event
closing ('127.0.0.1', 54813) after reading no data
closing ('127.0.0.1', 54813) after reading no data

waiting for the next event





client:
connecting to localhost port 10000
('127.0.0.1', 54812): sending "This is the message. "
('127.0.0.1', 54813): sending "This is the message. "
('127.0.0.1', 54812): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54813): received "THIS IS THE MESSAGE. "
('127.0.0.1', 54812): sending "It will be sent "
('127.0.0.1', 54813): sending "It will be sent "
('127.0.0.1', 54812): received "IT WILL BE SENT "
('127.0.0.1', 54813): received "IT WILL BE SENT "
('127.0.0.1', 54812): sending "in parts."
('127.0.0.1', 54813): sending "in parts."
('127.0.0.1', 54812): received "IN PARTS."
('127.0.0.1', 54813): received "IN PARTS."

运行结果
登入後複製

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

在Python中调用poll

select.poll(),返回一个poll的对象,支持注册和注销文件描述符。

poll.register(fd[, eventmask])注册一个文件描述符,注册后,可以通过poll()方法来检查是否有对应的I/O事件发生。fd可以是i 个整数,或者有返回整数的fileno()方法对象。如果File对象实现了fileno(),也可以当作参数使用。

eventmask是一个你想去检查的事件类型,它可以是常量POLLIN, POLLPRI和 POLLOUT的组合。如果缺省,默认会去检查所有的3种事件类型。

事件常量 意义

POLLIN 有数据读取

POLLPRT 有数据紧急读取

POLLOUT 准备输出:输出不会阻塞

POLLERR 某些错误情况出现

POLLHUP 挂起

POLLNVAL 无效请求:描述无法打开

poll.modify(fd, eventmask) 修改一个已经存在的fd,和poll.register(fd, eventmask)有相同的作用。如果去尝试修改一个未经注册的fd,会引起一个errno为ENOENT的IOError。

poll.unregister(fd)从poll对象中注销一个fd。尝试去注销一个未经注册的fd,会引起KeyError。

poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的list,list中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。如果返回的是一个空的list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,如果设置了timeout,系统将会等待对应的时间。如果timeout缺省或者是None,这个方法将会阻塞直到对应的poll对象有一个事件发生。

#coding: utf-8 

import select, socket

response = b"hello world"

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

#
poll = select.poll()
poll.register(serversocket.fileno(), select.POLLIN)

connections = {}
while True:
    for fd, event in poll.poll():
        if event == select.POLLIN:
            if fd == serversocket.fileno():
                con, addr = serversocket.accept()
                poll.register(con.fileno(), select.POLLIN)
                connections[con.fileno()] = con
            else:
                con = connections[fd]
                data = con.recv(1024)
                if data:
                    poll.modify(con.fileno(), select.POLLOUT)
        elif event == select.POLLOUT:
            con = connections[fd]
            con.send(response)
            poll.unregister(con.fileno())
            con.close()
登入後複製

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表 就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了 这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描 述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调 机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

在Python中调用epoll

select.epoll([sizehint=-1])返回一个epoll对象。

eventmask

事件常量 意义

EPOLLIN 读就绪

EPOLLOUT 写就绪

EPOLLPRI 有数据紧急读取

EPOLLERR assoc. fd有错误情况发生

EPOLLHUP assoc. fd发生挂起

EPOLLRT 设置边缘触发(ET)(默认的是水平触发)

EPOLLONESHOT 设置为 one-short 行为,一个事件(event)被拉出后,对应的fd在内部被禁用

EPOLLRDNORM 和 EPOLLIN 相等

EPOLLRDBAND 优先读取的数据带(data band)

EPOLLWRNORM 和 EPOLLOUT 相等

EPOLLWRBAND 优先写的数据带(data band)

EPOLLMSG 忽视

epoll.close()关闭epoll对象的文件描述符。

epoll.fileno返回control fd的文件描述符number。

epoll.fromfd(fd)用给予的fd来创建一个epoll对象。

epoll.register(fd[, eventmask])在epoll对象中注册一个文件描述符。(如果文件描述符已经存在,将会引起一个IOError)

epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。

epoll.unregister(fd)注销一个文件描述符。

epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float)的单位是秒(second)。

#coding:Utf8
import socket, select

EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response  = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('localhost', 10000))
serversocket.listen(1)
serversocket.setblocking(0)

epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)

try:
   connections = {}; requests = {}; responses = {}
   while True:
      events = epoll.poll(1)
      for fileno, event in events:
         if fileno == serversocket.fileno():
            connection, address = serversocket.accept()
            connection.setblocking(0)
            epoll.register(connection.fileno(), select.EPOLLIN)
            connections[connection.fileno()] = connection
            requests[connection.fileno()] = b''
            responses[connection.fileno()] = response
         elif event & select.EPOLLIN:
            requests[fileno] += connections[fileno].recv(1024)
            if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
               epoll.modify(fileno, select.EPOLLOUT)
               print('-'*40 + '\n' + requests[fileno].decode()[:-2])
         elif event & select.EPOLLOUT:
            byteswritten = connections[fileno].send(responses[fileno])
            responses[fileno] = responses[fileno][byteswritten:]
            if len(responses[fileno]) == 0:
               epoll.modify(fileno, 0)
               connections[fileno].shutdown(socket.SHUT_RDWR)
         elif event & select.EPOLLHUP:
            epoll.unregister(fileno)
            connections[fileno].close()
            del connections[fileno]
finally:
   epoll.unregister(serversocket.fileno())
   epoll.close()
   serversocket.close()
登入後複製


以上是解析python select epoll poll的詳細說明的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板