進程間怎麼進行通訊?以下這篇文章跟大家介紹一下Node進程間通訊的原理,希望對大家有幫助!
在Linux 系統中,一切都看成文件,當進程開啟現有文件時,會傳回一個文件描述符。 文件描述符是作業系統為了管理已經被進程開啟的檔案所建立的索引,用來指向被開啟的檔案。 當我們的進程啟動之後,作業系統會為每一個進程分配一個 PCB 控制塊,PCB 中會有一個檔案描述符表,存放當前進程所有的檔案描述符,也就是當前進程開啟的所有檔案。
? 進程中的檔案描述子是如何和系統檔案對應起來的? 在核心中,系統會維護另外兩種表
檔案描述子就是陣列的下標,從0開始往上遞增,0/1/2 預設是我們的輸入/輸出/錯誤流的檔案描述符 在 PCB 中維護的文件描述表中,可以根據文件描述符找到對應了文件指針,找到對應的開啟文件表 開啟檔案表中維護了:檔案偏移量(讀寫檔案的時候會更新);對於檔案的狀態標識;指向 i-node 表的指針 想要真正的操作文件,還得靠i-node 表,能夠取得到真實文件的相關資訊
他們之間的關係
圖解
總結
每次讀寫進程的時候,都是從檔案描述子下手,找到對應的開啟檔案表項,再找到對應的i-node 表
?如何實作檔案描述符重定向? 因為在文件描述表中,能夠找到對應的文件指針,如果我們改變了文件指針,是不是後續的兩個表內容就改變 例如:文件描述符1所指向的顯示器,那麼將文件描述符1指向log.txt 文件,那麼文件描述符1 也就和log.txt 對應起來了
cat hello.txt時,預設會將結果輸出到顯示器上,使用> 來重定向。
cat hello.txt 1 > log.txt以輸出的方式開啟檔案log.txt,並綁定到檔案描述符1上
int main(int argc, char const *argv[]) { int fd = open("log.txt"); int copyFd = dup(fd); //将fd阅读文件置于文件末尾,计算偏移量。 cout <p></p><p><img src="https://img.php.cn/upload/image/781/781/846/166237504695671%E4%B8%80%E6%96%87%E8%81%8A%E8%81%8ANode%E4%B8%AD%E7%9A%84%E9%80%B2%E7%A8%8B%E9%96%93%E9%80%9A%E4%BF%A1" title="166237504695671一文聊聊Node中的進程間通信" alt="一文聊聊Node中的進程間通信">調用dup(3) 的時候,會打開新的最小描述符,也就是4,這個4指向了3所指向的文件,操作任一fd 都是修改的一個文件</p><p></p>dup2<h5 data-id="heading-6"> <strong></strong>dup2 函數,把指定的newfd 也指向oldfd 所指向的文件。執行完dup2之後,newfd 和 oldfd 同時指向同一個文件,共享文件偏移量和文件狀態</h5><pre class="brush:php;toolbar:false">int main(int argc, char const *argv[]) { int fd = open("log.txt"); int copyFd = dup(fd); //将fd阅读文件置于文件末尾,计算偏移量。 cout <p></p><p></p><h2 data-id="heading-7"><strong>Node中通信原理</strong></h2><p>Node 中的 IPC 通道具体实现是由 <a href="https://link.juejin.cn?target=https%3A%2F%2Fluohaha.github.io%2FChinese-uvbook%2Fsource%2Fintroduction.html" target="_blank" rel="nofollow noopener noreferrer" title="https://luohaha.github.io/Chinese-uvbook/source/introduction.html" ref="nofollow noopener noreferrer">libuv</a> 提供的。根据系统的不同实现方式不同,window 下采用命名管道实现,*nix 下采用 Domain Socket 实现。在应用层只体现为 message 事件和 send 方法。【相关教程推荐:<a href="https://www.php.cn/course/list/24.html" target="_blank">nodejs视频教程</a>】</p><p><img src="https://img.php.cn/upload/image/215/623/242/166237505660155%E4%B8%80%E6%96%87%E8%81%8A%E8%81%8ANode%E4%B8%AD%E7%9A%84%E9%80%B2%E7%A8%8B%E9%96%93%E9%80%9A%E4%BF%A1" title="166237505660155一文聊聊Node中的進程間通信" alt="一文聊聊Node中的進程間通信"></p><p>父进程在实际创建子进程之前,会创建 IPC 通道并监听它,等到创建出真实的子进程后,通过环境变量(NODE_CHANNEL_FD)告诉子进程该 IPC 通道的文件描述符。</p><p>子进程在启动的过程中,会根据该文件描述符去连接 IPC 通道,从而完成父子进程的连接。</p><p>建立连接之后可以自由的通信了,IPC 通道是使用命名管道或者 Domain Socket 创建的,属于双向通信。并且它是在系统内核中完成的进程通信</p><p><img src="https://img.php.cn/upload/image/895/159/347/166237506175944%E4%B8%80%E6%96%87%E8%81%8A%E8%81%8ANode%E4%B8%AD%E7%9A%84%E9%80%B2%E7%A8%8B%E9%96%93%E9%80%9A%E4%BF%A1" title="166237506175944一文聊聊Node中的進程間通信" alt="一文聊聊Node中的進程間通信"></p><p>⚠️ 只有在启动的子进程是 Node 进程时,子进程才会根据环境变量去连接对应的 IPC 通道,对于其他类型的子进程则无法实现进程间通信,除非其他进程也按着该约定去连接这个 IPC 通道。</p><h3 data-id="heading-8"><strong>unix domain socket</strong></h3><h4 data-id="heading-9"><strong>是什么</strong></h4><p>我们知道经典的通信方式是有 Socket,我们平时熟知的 Socket 是基于网络协议的,用于两个不同主机上的两个进程通信,通信需要指定 IP/Host 等。 但如果我们同一台主机上的两个进程想要通信,如果使用 Socket 需要指定 IP/Host,经过网络协议等,会显得过于繁琐。所以 Unix Domain Socket 诞生了。</p><p>UDS 的优势:</p>
int main(int argc, char *argv[]) { int server_fd ,ret, client_fd; struct sockaddr_un serv, client; socklen_t len = sizeof(client); char buf[1024] = {0}; int recvlen; // 创建 socket server_fd = socket(AF_LOCAL, SOCK_STREAM, 0); // 初始化 server 信息 serv.sun_family = AF_LOCAL; strcpy(serv.sun_path, "server.sock"); // 绑定 ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv)); //设置监听,设置能够同时和服务端连接的客户端数量 ret = listen(server_fd, 36); //等待客户端连接 client_fd = accept(server_fd, (struct sockaddr *)&client, &len); printf("=====client bind file:%s\n", client.sun_path); while (1) { recvlen = recv(client_fd, buf, sizeof(buf), 0); if (recvlen == -1) { perror("recv error"); return -1; } else if (recvlen == 0) { printf("client disconnet...\n"); close(client_fd); break; } else { printf("recv buf %s\n", buf); send(client_fd, buf, recvlen, 0); } } close(client_fd); close(server_fd); return 0; }
int main(int argc, char *argv[]) { int client_fd ,ret; struct sockaddr_un serv, client; socklen_t len = sizeof(client); char buf[1024] = {0}; int recvlen; //创建socket client_fd = socket(AF_LOCAL, SOCK_STREAM, 0); //给客户端绑定一个套接字文件 client.sun_family = AF_LOCAL; strcpy(client.sun_path, "client.sock"); ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client)); //初始化server信息 serv.sun_family = AF_LOCAL; strcpy(serv.sun_path, "server.sock"); //连接 connect(client_fd, (struct sockaddr *)&serv, sizeof(serv)); while (1) { fgets(buf, sizeof(buf), stdin); send(client_fd, buf, strlen(buf)+1, 0); recv(client_fd, buf, sizeof(buf), 0); printf("recv buf %s\n", buf); } close(client_fd); return 0; }
命名管道是可以在同一台计算机的不同进程之间,或者跨越一个网络的不同计算机的不同进程之间的可靠的单向或者双向的数据通信。 创建命名管道的进程被称为管道服务端(Pipe Server),连接到这个管道的进程称为管道客户端(Pipe Client)。
命名管道的命名规范:\server\pipe[\path]\name
void ServerTest() { HANDLE serverNamePipe; char pipeName[MAX_PATH] = {0}; char szReadBuf[MAX_BUFFER] = {0}; char szWriteBuf[MAX_BUFFER] = {0}; DWORD dwNumRead = 0; DWORD dwNumWrite = 0; strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest"); // 创建管道实例 serverNamePipe = CreateNamedPipeA(pipeName, PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH, PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL); WriteLog("创建管道成功..."); // 等待客户端连接 BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL ); WriteLog( "收到客户端的连接成功..."); // 接收数据 memset( szReadBuf, 0, MAX_BUFFER ); bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL ); // 业务逻辑处理 (只为测试用返回原来的数据) WriteLog( "收到客户数据:[%s]", szReadBuf); // 发送数据 if( !WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL ) ) { WriteLog("向客户写入数据失败:[%#x]", GetLastError()); return ; } WriteLog("写入数据成功..."); }
void ClientTest() { char pipeName[MAX_PATH] = {0}; HANDLE clientNamePipe; DWORD dwRet; char szReadBuf[MAX_BUFFER] = {0}; char szWriteBuf[MAX_BUFFER] = {0}; DWORD dwNumRead = 0; DWORD dwNumWrite = 0; strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest"); // 检测管道是否可用 if(!WaitNamedPipeA(pipeName, 10000)){ WriteLog("管道[%s]无法打开", pipeName); return ; } // 连接管道 clientNamePipe = CreateFileA(pipeName, GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); WriteLog("管道连接成功..."); scanf( "%s", szWritebuf ); // 发送数据 if( !WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){ WriteLog("发送数据失败,GetLastError=[%#x]", GetLastError()); return ; } printf("发送数据成功:%s\n", szWritebuf ); // 接收数据 if( !ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){ WriteLog("接收数据失败,GetLastError=[%#x]", GetLastError() ); return ; } WriteLog( "接收到服务器返回:%s", szReadBuf ); // 关闭管道 CloseHandle(clientNamePipe); }
对于创建子进程、创建管道、重定向管道均是在 c++ 层实现的
int main(int argc,char *argv[]){ pid_t pid = fork(); if (pid <h4 data-id="heading-23"><strong>创建管道</strong></h4><p>使用 socketpair 创建管道,其创建出来的管道是全双工的,返回的文件描述符中的任何一个都可读和可写</p><pre class="brush:php;toolbar:false">int main () { int fd[2]; int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd); if (fork()){ /* 父进程 */ int val = 0; close(fd[1]); while (1){ sleep(1); ++val; printf("发送数据: %d\n", val); write(fd[0], &val, sizeof(val)); read(fd[0], &val, sizeof(val)); printf("接收数据: %d\n", val); } } else { /*子进程*/ int val; close(fd[0]); while(1){ read(fd[1], &val, sizeof(val)); ++val; write(fd[1], &val, sizeof(val)); } } }
当我们使用 socketpair 创建了管道之后,父进程关闭了 fd[1],子进程关闭了 fd[0]。子进程可以通过 fd[1] 读写数据;同理主进程通过 fd[0]读写数据完成通信。
对应代码:https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c#L344
fork 函数开启一个子进程的流程
初始化参数中的 options.stdio,并且调用 spawn 函数
function spawn(file, args, options) { const child = new ChildProcess(); child.spawn(options); }
创建 ChildProcess 实例,创建子进程也是调用 C++ 层 this._handle.spawn 方法
function ChildProcess() { // C++层定义 this._handle = new Process(); }
通过 child.spawn 调用到 ChildProcess.prototype.spawn 方法中。其中 getValidStdio 方法会根据 options.stdio 创建和 C++ 交互的 Pipe 对象,并获得对应的文件描述符,将文件描述符写入到环境变量 NODE_CHANNEL_FD 中,调用 C++ 层创建子进程,在调用 setupChannel 方法
ChildProcess.prototype.spawn = function(options) { // 预处理进程间通信的数据结构 stdio = getValidStdio(stdio, false); const ipc = stdio.ipc; const ipcFd = stdio.ipcFd; //将文件描述符写入环境变量中 if (ipc !== undefined) { ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`); } // 创建进程 const err = this._handle.spawn(options); // 添加send方法和监听IPC中数据 if (ipc !== undefined) setupChannel(this, ipc, serialization); }
子进程启动时,会根据环境变量中是否存在 NODE_CHANNEL_FD 判断是否调用 _forkChild 方法,创建一个 Pipe 对象, 同时调用 open 方法打开对应的文件描述符,在调用setupChannel
function _forkChild(fd, serializationMode) { const p = new Pipe(PipeConstants.IPC); p.open(fd); p.unref(); const control = setupChannel(process, p, serializationMode); }
setupChannel 主要是完成了处理接收的消息、发送消息、处理文件描述符传递等
function setipChannel(){ channel.onread = function(arrayBuffer){ //... } target.on('internalMessage', function(message, handle){ //... }) target.send = function(message, handle, options, callback){ //... } target._send = function(message, handle, options, callback){ //... } function handleMessage(message, handle, internal){ //... } }
进程间消息传递
父进程通过 child.send 发送消息 和 server/socket 句柄对象
普通消息直接 JSON.stringify 序列化;对于句柄对象来说,需要先包装成为内部对象
message = { cmd: 'NODE_HANDLE', type: null, msg: message };
通过 handleConversion.[message.type].send 的方法取出句柄对象对应的 C++ 层面的 TCP 对象,在采用JSON.stringify 序列化
const handleConversion = { 'net.Server': { simultaneousAccepts: true, send(message, server, options) { return server._handle; }, got(message, handle, emit) { const server = new net.Server(); server.listen(handle, () => { emit(server); }); } } //.... }
最后将序列化后的内部对象和 TCP 对象写入到 IPC 通道中
子进程在接收到消息之后,使用 JSON.parse 反序列化消息,如果为内部对象触发 internalMessage 事件
检查是否带有 TCP 对象,通过 handleConversion.[message.type].got 得到和父进程一样的句柄对象
最后发触发 message 事件传递处理好的消息和句柄对象,子进程通过 process.on 接收
更多node相关知识,请访问:nodejs 教程!
以上是一文聊聊Node中的進程間通信的詳細內容。更多資訊請關注PHP中文網其他相關文章!