Linux中一切皆文件,不論是我們儲存在磁碟上的字元文件,可執行檔案或是我們的存取電腦的I/O裝置等都被VFS抽象化了文件,例如標準輸入裝置預設是鍵盤,我們在操作標準輸入設備的時候,其實操作的是預設開啟的一個文件描述符是0的文件,而一切軟體操作硬體都需要通過OS,而OS操作一切硬體都需要相應的驅動程序,這個驅動程式裡配置了這個硬體的相應配置和使用方法。 Linux的I/O分為阻塞I/O,非阻塞I/O,I/O多工,訊號驅動I/O四種。對於I/O設備的驅動,一般都會提供關於阻塞和非阻塞兩種配置。我們最常見的I/O設備之一--鍵盤(標準輸入設備)的驅動程式預設是阻塞的。
多路復用就是為了讓進程能夠從多個阻塞I/O中獲得自己想要的資料並繼續執行接下來的任務。其主要的想法就是同時監視多個檔案描述符,如果有檔案描述符的設定狀態的被觸發,就繼續執行進程,如果沒有任何一個檔案描述符的設定狀態被觸發,則進程進入sleep
多路復用的一個主要用途就是實現"I/O多路復用並發伺服器",和多線程並發或者多進程並發相比,這種伺服器的系統開銷更低,更適合做web伺服器。
阻塞I/O
阻塞I/O,就是當進程試圖存取這個I/O裝置而這個裝置並沒有準備好的時候,裝置的驅動程式會透過核心讓這個試圖存取的進程進入sleep狀態。阻塞I/O的一個好處就是可以大大的節約CPU時間,因為一旦一個進程試圖存取一個沒有準備好的阻塞I/O,就會進入sleep狀態,而進入sleep狀態的進程是不在核心的進程調度鍊錶中,直到目標I/O準備好了將其喚醒並加入調度鍊錶,這樣就可以節省CPU時間。當然阻塞I/O也有其固有的缺點,如果進程試圖存取一個阻塞I/O,但是否存取成功並不對接下來的任務有決定性影響,那麼直接使其進入sleep狀態顯然會延誤其任務的完成。
典型的預設阻塞IO有標準輸入設備,socket設備,管道設備等,當我們使用gets(),scanf(),read()等操作請求這些IO時而IO並沒有資料流入,就會造成進程的sleep 。
假設一個程序希望透過三個管道中任一個讀取資料並顯示,偽代碼如下
read(pipe_0,buf,sizeof(buf)); //sleepprint buf; read(pipe_1,buf,sizeof(buf)); print buf;read(pipe_2,buf,sizeof(buf)); print buf;
由於管道是阻塞I/O,所以如果pipe_0沒有資料流入,進程就是在第一個read()處進入sleep狀態而即使pipe_1和pipe_2有資料流入也不會被讀取。
如果我們使用下述程式碼重新設定管道的阻塞屬性,顯然,如果三個管道都沒有資料流入,那麼進程就無法獲得請求的資料而繼續執行,倘若這些資料很重要(所以我們才要用阻塞I /O),那結果就會十分的糟糕,改為輪詢卻又大量的佔據CPU時間。
int fl = fcntl(pipe_fd, F_GETFL); fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);
如何讓進程同時監視三個管道,其中一個有資料就繼續執行而不會sleep,如果全部沒有資料流入再sleep,就是多路復用技術需要解決的問題。
非阻塞I/O
非阻塞I/O就是當一個進程試圖存取一個I/O裝置的時候,無論是否從中獲取了請求的資料都會返回並繼續執行接下來的任務。 ,但非常適合請求是否成功對接下來的任務影響不大的I/O請求。但如果存取一個非阻塞I/O,但這個請求如果失敗對進程接下來的任務有致命影響,最粗暴的就是使用while(1){read()}輪詢。顯然,這種方式會佔用大量的CPU時間。
select機制
select是一種非常"古老"的同步I/O接口,但是提供了一種很好的I/O多路復用的
模型
fd_set //创建fd_set对象,将来从中增减需要监视的 fdFD_ZERO() //清空fd_set对象FD_SET() //将一个fd加入 fd_set对象中 select() //监视 fd_set对象中的文件描述符 pselect() //先设定信号屏蔽,再监视 FD_ISSET() //测试fd是否属于 fd_set对象FD_CLR() //从 fd_set对象中删除fd
Note:
模型🎜#define BUFSIZE 100#define MAXNFD 1024 int main(){ /***********服务器的listenfd已经准本好了**************/ fd_set readfds; fd_set writefds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_SET(listenfd, &readfds); fd_set temprfds = readfds; fd_set tempwfds = writefds; int maxfd = listenfd; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ temprfds = readfds; tempwfds = writefds; nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL) if(FD_ISSET(listenfd, &temprfds)){ //如果监听到的是listenfd就进行accept int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //将新accept的scokfd加入监听集合,并保持maxfd为最大fd FD_SET(sockfd, &readfds); maxfd = maxfd>sockfd?maxfd:sockfd; //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环 if(--nready==0) continue; } int fd = 0; //遍历文件描述符表,处理接收到的消息 for(;fd<=maxfd; fd++){ if(fd == listenfd) continue; if(FD_ISSET(fd, &temprfds)){ int ret = read(fd, buf[fd], sizeof buf[0]); if(0 == ret){ //客户端链接已经断开 close(fd); FD_CLR(fd, &readfds); if(maxfd==fd) --maxfd; continue; } //将fd加入监听可写的集合 FD_SET(fd, &writefds); } //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中 //将在下一次while()循环开始监视 if(FD_ISSET(fd, &tempwfds)){ int ret = write(fd, buf[fd], sizeof buf[0]); printf("ret %d: %d\n", fd, ret); FD_CLR(fd, &writefds); } } } close(listenfd); }
上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次
select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备
例子_I/O多路复用并发服务器
关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型
#define BUFSIZE 100#define MAXNFD 1024 int main(){ /***********服务器的listenfd已经准本好了**************/ fd_set readfds; fd_set writefds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_SET(listenfd, &readfds); fd_set temprfds = readfds; fd_set tempwfds = writefds; int maxfd = listenfd; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ temprfds = readfds; tempwfds = writefds; nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL) if(FD_ISSET(listenfd, &temprfds)){ //如果监听到的是listenfd就进行accept int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //将新accept的scokfd加入监听集合,并保持maxfd为最大fd FD_SET(sockfd, &readfds); maxfd = maxfd>sockfd?maxfd:sockfd; //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环 if(--nready==0) continue; } int fd = 0; //遍历文件描述符表,处理接收到的消息 for(;fd<=maxfd; fd++){ if(fd == listenfd) continue; if(FD_ISSET(fd, &temprfds)){ int ret = read(fd, buf[fd], sizeof buf[0]); if(0 == ret){ //客户端链接已经断开 close(fd); FD_CLR(fd, &readfds); if(maxfd==fd) --maxfd; continue; } //将fd加入监听可写的集合 FD_SET(fd, &writefds); } //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中 //将在下一次while()循环开始监视 if(FD_ISSET(fd, &tempwfds)){ int ret = write(fd, buf[fd], sizeof buf[0]); printf("ret %d: %d\n", fd, ret); FD_CLR(fd, &writefds); } } } close(listenfd); }
poll机制
poll是System V提出的一种基于select的改良机制,其针对select的诸多明显的缺陷进行了重新设计,包括只遍历被触发个数个文件描述符,不需要备份fd_set等等
模型
struct pollfd fds //创建一个pollfd类型的数组fds[0]. fd //向fds[0]中放入需要监视的fdfds[0]. events //向fds[0]中放入需要监视的fd的触发事件 POLLIN //I/O有输入 POLLPRI //有紧急数据需要读取 POLLOUT //I/O可写 POLLRDHUP //流式套接字连接断开或套接字处于半关闭状态 POLLERR //错误条件(仅针对输出) POLLHUP //挂起(仅针对输出) POLLNVAL //无效的请求:fd没有被打开(仅针对输出)
例子_I/O多路复用并发服务器
/* ... */int main(){ /* ... */ struct pollfd myfds[MAXNFD] = {0}; myfds[0].fd = listenfd; myfds[0].events = POLLIN; int maxnum = 1; int nready; //准备二维数组buf,每个fd使用buf的一行,数据干扰 char buf[MAXNFD][BUFSIZE] = {0}; while(1){ //poll直接返回event被触发的fd的个数 nready = poll(myfds, maxnum, -1) int i = 0; for(;i<maxnum; i++){ //poll通过将相应的二进制位置一来表示已经设置 //如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了 if(myfds[i].revents & POLLIN){ if(myfds[i].fd == listenfd){ int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //将新accept的scokfd加入监听集合 myfds[maxnum].fd = sockfd; myfds[maxnum].events = POLLIN; maxnum++; //如果意见检查了nready个fd,就直接下一个循环 if(--nready==0) continue; } else{ int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]); if(0 == ret){ //如果连接断开了 close(myfds[i].fd); //初始化将文件描述符表所有的文件描述符标记为-1 //close的文件描述符也标记为-1 //打开新的描述符时从表中搜索第一个-1 //open()就是这样实现始终使用最小的fd //这里为了演示并没有使用这种机制 myfds[i].fd = -1; continue; } myfds[i].events = POLLOUT; } } else if(myfds[i].revents & POLLOUT){ int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]); myfds[i].events = POLLIN; } } } close(listenfd); }
epoll
epoll在poll基础上实现的更为健壮的接口,也是现在主流的web服务器使用的多路复用技术,epoll一大特色就是支持EPOLLET(边沿触发)和EPOLLLT (水平触发),前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是EPOLLLT
模型
epoll_create() //创建epoll对象struct epoll_event //准备事件结构体和事件结构体数组 event.events event.data.fd ... epoll_ctl() //配置epoll对象epoll_wait() //监控epoll对象中的fd及其相应的event
例子_I/O多路复用并发服务器
/* ... */int main(){ /* ... */ /* 创建epoll对象 */ int epoll_fd = epoll_create(1024); //准备一个事件结构体 struct epoll_event event = {0}; event.events = EPOLLIN; event.data.fd = listenfd; //data是一个共用体,除了fd还可以返回其他数据 //ctl是监控listenfd是否有event被触发 //如果发生了就把event通过wait带出。 //所以,如果event里不标明fd,我们将来获取就不知道哪个fd epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event); struct epoll_event revents[MAXNFD] = {0}; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ //wait返回等待的event发生的数目 //并把相应的event放到event类型的数组中 nready = epoll_wait(epoll_fd, revents, MAXNFD, -1) int i = 0; for(;i<nready; i++){ //wait通过在events中设置相应的位来表示相应事件的发生 //如果输入可用,那么下面的这个结果应该为真 if(revents[i].events & EPOLLIN){ //如果是listenfd有数据输入 if(revents[i].data.fd == listenfd){ int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); struct epoll_event event = {0}; event.events = EPOLLIN; event.data.fd = sockfd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event); } else{ int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]); if(0 == ret){ close(revents[i].data.fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]); } revents[i].events = EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]); } } else if(revents[i].events & EPOLLOUT){ int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]); revents[i].events = EPOLLIN; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]); } } } close(listenfd); }