Alles in Linux ist eine Datei, egal ob es sich um die Zeichendateien handelt, die wir auf der Festplatte speichern, um ausführbare Dateien oder um unsere mit dem Computer verbundenen E/A-Geräte, sie werden alle von VFS in Dateien abstrahiert Das Gerät ist standardmäßig die Tastatur. Wenn wir das Standardeingabegerät bedienen, bedienen wir tatsächlich eine Datei mit dem Dateideskriptor 0, die standardmäßig geöffnet wird. Die gesamte Software muss über das Betriebssystem betrieben werden, und das Betriebssystem benötigt dazu einen entsprechenden Treiber Betreiben Sie die gesamte Hardware. Die entsprechende Konfiguration und Verwendung dieser Hardware wird im Programm konfiguriert. Linux-E/A ist in vier Typen unterteilt: blockierende E/A, nicht blockierende E/A, E/A-Multiplexing und signalgesteuerte E/A. Für E/A-Gerätetreiber bieten sie im Allgemeinen zwei Konfigurationen: blockierend und nicht blockierend. Der Treiber für eines unserer häufigsten E/A-Geräte, die Tastatur (Standardeingabegerät), blockiert standardmäßig.
Multiplexing soll es dem Prozess ermöglichen, die gewünschten Daten aus mehreren blockierenden E/A zu erhalten und mit der Ausführung der nächsten Aufgabe fortzufahren. Die Hauptidee besteht darin, mehrere Dateideskriptoren gleichzeitig zu überwachen. Wenn der Einstellungsstatus eines Dateideskriptors ausgelöst wird, wird der Prozess weiterhin ausgeführt. Wenn kein Einstellungsstatus eines Dateideskriptors ausgelöst wird, wechselt der Prozess in den Ruhezustand Eine der Hauptanwendungen des Multiplexings ist die Implementierung eines „E/A-Multiplexing-Parallelitätsservers“. Im Vergleich zur Multithread-Parallelität oder Multiprozess-Parallelität weist dieser Servertyp einen geringeren Systemaufwand auf und eignet sich besser für Webserver.
Typische standardmäßige blockierende E/A umfassen Standardeingabegeräte, Socket-Geräte, Pipe-Geräte usw. Wenn wir gets(), scanf(), read() und andere Vorgänge verwenden, um diese E/As anzufordern und keine Daten in die E/A fließen, wird sie Dadurch wird der Prozess in den Ruhezustand versetzt.
Angenommen, ein Prozess möchte Daten lesen und über eine der drei Pipes anzeigen. Der Pseudocode lautet wie folgt:
read(pipe_0,buf,sizeof(buf)); //sleepprint buf; read(pipe_1,buf,sizeof(buf)); print buf;read(pipe_2,buf,sizeof(buf)); print buf;
Wenn wir den folgenden Code verwenden, um die Blockierungseigenschaften der Pipe zurückzusetzen, kann der Prozess natürlich die angeforderten Daten nicht abrufen und die Ausführung nicht fortsetzen, wenn keine Daten in die drei Pipes fließen. Aus diesem Grund verwenden wir Blocking I/O. Das Ergebnis wird sehr schlecht sein und der Wechsel zum Polling wird viel CPU-Zeit in Anspruch nehmen.
int fl = fcntl(pipe_fd, F_GETFL); fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);
fd_set //创建fd_set对象,将来从中增减需要监视的 fdFD_ZERO() //清空fd_set对象FD_SET() //将一个fd加入 fd_set对象中 select() //监视 fd_set对象中的文件描述符 pselect() //先设定信号屏蔽,再监视 FD_ISSET() //测试fd是否属于 fd_set对象FD_CLR() //从 fd_set对象中删除fd
上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次
select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备
例子_I/O多路复用并发服务器
关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型
#define BUFSIZE 100#define MAXNFD 1024 int main(){ /***********服务器的listenfd已经准本好了**************/ fd_set readfds; fd_set writefds; FD_ZERO(&readfds); FD_ZERO(&writefds); FD_SET(listenfd, &readfds); fd_set temprfds = readfds; fd_set tempwfds = writefds; int maxfd = listenfd; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ temprfds = readfds; tempwfds = writefds; nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL) if(FD_ISSET(listenfd, &temprfds)){ //如果监听到的是listenfd就进行accept int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //将新accept的scokfd加入监听集合,并保持maxfd为最大fd FD_SET(sockfd, &readfds); maxfd = maxfd>sockfd?maxfd:sockfd; //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环 if(--nready==0) continue; } int fd = 0; //遍历文件描述符表,处理接收到的消息 for(;fd<=maxfd; fd++){ if(fd == listenfd) continue; if(FD_ISSET(fd, &temprfds)){ int ret = read(fd, buf[fd], sizeof buf[0]); if(0 == ret){ //客户端链接已经断开 close(fd); FD_CLR(fd, &readfds); if(maxfd==fd) --maxfd; continue; } //将fd加入监听可写的集合 FD_SET(fd, &writefds); } //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中 //将在下一次while()循环开始监视 if(FD_ISSET(fd, &tempwfds)){ int ret = write(fd, buf[fd], sizeof buf[0]); printf("ret %d: %d\n", fd, ret); FD_CLR(fd, &writefds); } } } close(listenfd); }
poll机制
poll是System V提出的一种基于select的改良机制,其针对select的诸多明显的缺陷进行了重新设计,包括只遍历被触发个数个文件描述符,不需要备份fd_set等等
模型
struct pollfd fds //创建一个pollfd类型的数组fds[0]. fd //向fds[0]中放入需要监视的fdfds[0]. events //向fds[0]中放入需要监视的fd的触发事件 POLLIN //I/O有输入 POLLPRI //有紧急数据需要读取 POLLOUT //I/O可写 POLLRDHUP //流式套接字连接断开或套接字处于半关闭状态 POLLERR //错误条件(仅针对输出) POLLHUP //挂起(仅针对输出) POLLNVAL //无效的请求:fd没有被打开(仅针对输出)
例子_I/O多路复用并发服务器
/* ... */int main(){ /* ... */ struct pollfd myfds[MAXNFD] = {0}; myfds[0].fd = listenfd; myfds[0].events = POLLIN; int maxnum = 1; int nready; //准备二维数组buf,每个fd使用buf的一行,数据干扰 char buf[MAXNFD][BUFSIZE] = {0}; while(1){ //poll直接返回event被触发的fd的个数 nready = poll(myfds, maxnum, -1) int i = 0; for(;i<maxnum; i++){ //poll通过将相应的二进制位置一来表示已经设置 //如果下面的条件成立,表示revent[i]里的POLLIN位已经是1了 if(myfds[i].revents & POLLIN){ if(myfds[i].fd == listenfd){ int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); //将新accept的scokfd加入监听集合 myfds[maxnum].fd = sockfd; myfds[maxnum].events = POLLIN; maxnum++; //如果意见检查了nready个fd,就直接下一个循环 if(--nready==0) continue; } else{ int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]); if(0 == ret){ //如果连接断开了 close(myfds[i].fd); //初始化将文件描述符表所有的文件描述符标记为-1 //close的文件描述符也标记为-1 //打开新的描述符时从表中搜索第一个-1 //open()就是这样实现始终使用最小的fd //这里为了演示并没有使用这种机制 myfds[i].fd = -1; continue; } myfds[i].events = POLLOUT; } } else if(myfds[i].revents & POLLOUT){ int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]); myfds[i].events = POLLIN; } } } close(listenfd); }
epoll
epoll在poll基础上实现的更为健壮的接口,也是现在主流的web服务器使用的多路复用技术,epoll一大特色就是支持EPOLLET(边沿触发)和EPOLLLT (水平触发),前者表示如果读取之后缓冲区还有数据,那么只要读取结束,剩余的数据也会丢弃,而后者表示里面的数据不会丢弃,下次读的时候还在,默认是EPOLLLT
模型
epoll_create() //创建epoll对象struct epoll_event //准备事件结构体和事件结构体数组 event.events event.data.fd ... epoll_ctl() //配置epoll对象epoll_wait() //监控epoll对象中的fd及其相应的event
例子_I/O多路复用并发服务器
/* ... */int main(){ /* ... */ /* 创建epoll对象 */ int epoll_fd = epoll_create(1024); //准备一个事件结构体 struct epoll_event event = {0}; event.events = EPOLLIN; event.data.fd = listenfd; //data是一个共用体,除了fd还可以返回其他数据 //ctl是监控listenfd是否有event被触发 //如果发生了就把event通过wait带出。 //所以,如果event里不标明fd,我们将来获取就不知道哪个fd epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event); struct epoll_event revents[MAXNFD] = {0}; int nready; char buf[MAXNFD][BUFSIZE] = {0}; while(1){ //wait返回等待的event发生的数目 //并把相应的event放到event类型的数组中 nready = epoll_wait(epoll_fd, revents, MAXNFD, -1) int i = 0; for(;i<nready; i++){ //wait通过在events中设置相应的位来表示相应事件的发生 //如果输入可用,那么下面的这个结果应该为真 if(revents[i].events & EPOLLIN){ //如果是listenfd有数据输入 if(revents[i].data.fd == listenfd){ int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len); struct epoll_event event = {0}; event.events = EPOLLIN; event.data.fd = sockfd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event); } else{ int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]); if(0 == ret){ close(revents[i].data.fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]); } revents[i].events = EPOLLOUT; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]); } } else if(revents[i].events & EPOLLOUT){ int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]); revents[i].events = EPOLLIN; epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]); } } } close(listenfd); }