Cet article vous présentera le multi-processus dans Node.js et en apprendra davantage sur le module Cluster et le modèle multi-processus Egg.js. J'espère qu'il vous sera utile !
Comme nous le savons tous, JS est exécuté dans un seul thread, et toute asynchronisation est complétée par la boucle d'événements. Si un service Web n'a qu'un seul thread, comment utiliser pleinement les ressources inactives de la machine. ou un conteneur ? Dans le même temps, lorsque le code plante et qu'aucune exception n'est détectée, le thread se ferme. Alors, comment le service Web basé sur Node.js garantit-il la robustesse de l'ensemble de l'application ?
Node.js fournit le module Cluster pour résoudre les problèmes ci-dessus. Grâce à ce module, les développeurs peuvent créer un cluster en créant un mode sous-processus pour utiliser pleinement les ressources de la machine ou du conteneur. en même temps, ce module permet plusieurs écoutes du processus enfant sur le même port. [Apprentissage recommandé : "Tutoriel Nodejs"]
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
Commencez d'abord par const cluster = require('cluster')</ code> À partir de , cette ligne de code importe le module Cluster de Node, mais à l'intérieur de Node, les fichiers introduits par le processus Master et le processus Worker sont différents Pour plus de détails, voir le code suivant : <code>const cluster = require('cluster')
说起,这行代码导入了 Node 的 Cluster 模块,而在 Node 内部,Master 进程与 Worker 进程引入的文件却不一样,详情见如下代码:
'use strict'; const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrPrimary}`);
不同的文件意味着两种进程在执行中的表现也不一样,例如:
// internal/cluster/master.js cluster.isWorker = false; cluster.isMaster = true; // internal/cluster/child.js cluster.isWorker = true; cluster.isMaster = false;
这也是为什么 Cluster 模块到处的变量能区分不同类型进程的原因,接下来让我们分别从主、子进程两个方向去了解具体的过程
在上述代码里,Master 进程并没有做太多事情,只是根据 CPU 数量去 fork 子进程,那么我们深入到源代码里大致来看一下,相关描述均在代码的注释内
// lib/internal/cluster/master.js // 初始化cluster const cluster = new EventEmitter(); // 创建监听地址与server对应的map const handles = new SafeMap(); // 初始化 cluster.isWorker = false; cluster.isMaster = true; cluster.workers = {}; cluster.settings = {}; cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR; // Master distributes connections. // 自增的子进程id let ids = 0; // 向cluster添加fork方法 cluster.fork = function(env) { // 初始化cluster.settings cluster.setupMaster(); // 为当前fork的子进程生成当前cluster内的唯一id const id = ++ids; // 创建子进程 const workerProcess = createWorkerProcess(id, env); // 创建对应的worker实例 const worker = new Worker({ id: id, process: workerProcess }); // 省略一些worker的事件监听.... // 监听内部消息事件,并交由onmessage处理 worker.process.on('internalMessage', internal(worker, onmessage)); // cluster发出fork事件 process.nextTick(emitForkNT, worker); // 将worker实例放在cluster.workers中维护 cluster.workers[worker.id] = worker; // 返回worker return worker; }; // 创建子进程函数 function createWorkerProcess(id, env) { // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; // 执行参数 const execArgv = [...cluster.settings.execArgv]; // 省略debug模式相关逻辑... // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成 return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid }); } // 内部消息事件处理函数 function onmessage(message, handle) { const worker = this; if (message.act === 'online') online(worker); // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server else if (message.act === 'queryServer') queryServer(worker, message); else if (message.act === 'listening') listening(worker, message); else if (message.act === 'exitedAfterDisconnect') exitedAfterDisconnect(worker, message); else if (message.act === 'close') close(worker, message); } // 获取server function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; // 创建当前子进程监听地址信息的key const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}`; // 在handles map中查询是否有已经创建好的该监听地址的server let handle = handles.get(key); // 没有对应的server则进行创建 if (handle === undefined) { let address = message.address; // Find shortest path for unix sockets because of the ~100 byte limit if (message.port < 0 && typeof address === 'string' && process.platform !== 'win32') { address = path.relative(process.cwd(), address); if (message.address.length < address.length) address = message.address; } // 主、子进程处理连接的方式,默认为轮询 let constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } // 将监听地址信息传入构造函数创建监听实例 handle = new constructor(key, address, message); // 缓存监听实例 handles.set(key, handle); } // 向server添加自定义信息,用于server发出listening事件后透传到worker if (!handle.data) handle.data = message.data; // 添加server发出listening事件后的回调函数通知子进程 handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); if (errno) handles.delete(key); // Gives other workers a chance to retry. send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); }); }
// lib/internal/cluster/round_robin_handle.js // 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息 function RoundRobinHandle(key, address, { port, fd, flags }) { // 初始化handle this.key = key; this.all = new SafeMap(); this.free = new SafeMap(); this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); // 监听文件描述符,不讨论 if (fd >= 0) this.server.listen({ fd }); // 监听ip:port else if (port >= 0) { this.server.listen({ port, host: address, // Currently, net module only supports `ipv6Only` option in `flags`. ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); // 监听UNIX socket,不讨论 } else this.server.listen(address); // UNIX socket path. // 注册server发出listening事件的回调函数 this.server.once('listening', () => { this.handle = this.server._handle; this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } // 添加worker,server发出listening事件后调用master.js中传入的回调函数 RoundRobinHandle.prototype.add = function(worker, send) { assert(this.all.has(worker.id) === false); this.all.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); this.server.once('error', (err) => { send(err.errno, null); }); }; // 删除worker,轮询时不再分配给该worker RoundRobinHandle.prototype.remove = function(worker) { const existed = this.all.delete(worker.id); if (!existed) return false; this.free.delete(worker.id); if (this.all.size !== 0) return false; for (const handle of this.handles) { handle.close(); } this.handles = []; this.handle.close(); this.handle = null; return true; }; // 轮询调度函数 RoundRobinHandle.prototype.distribute = function(err, handle) { ArrayPrototypePush(this.handles, handle); const [ workerEntry ] = this.free; // this.free is a SafeMap if (ArrayIsArray(workerEntry)) { const { 0: workerId, 1: worker } = workerEntry; this.free.delete(workerId); this.handoff(worker); } }; // 将handle交给worker RoundRobinHandle.prototype.handoff = function(worker) { if (!this.all.has(worker.id)) { return; // Worker is closing (or has closed) the server. } const handle = ArrayPrototypeShift(this.handles); if (handle === undefined) { this.free.set(worker.id, worker); // Add to ready queue again. return; } // 向该worker发出newconn事件 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. this.handoff(worker); }); };
在每个子进程中,我们都创建了一个 HTTP Server,然后执行 listen
函数监听 8000 端口,而 HTTP Server 实例是由 Net Server 原型链继承得到的,listen
函数即为 Net Server 原型上的 listen
函数,具体如下:
// lib/_http_server.js function Server(options, requestListener) { .... } ObjectSetPrototypeOf(Server.prototype, net.Server.prototype); ObjectSetPrototypeOf(Server, net.Server);
// lib/net.js Server.prototype.listen = function(...args) { // 由于篇幅原因,省略一些参数nomolize和其他监听的处理 // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口 if (typeof options.port === 'number' || typeof options.port === 'string') { validatePort(options.port, 'options.port'); backlog = options.backlog || backlogFromArgs; // start TCP server listening on host:port if (options.host) { lookupAndListen(this, options.port | 0, options.host, backlog, options.exclusive, flags); } else { // Undefined host, listens on unspecified address // Default addressType 4 will be used to search for master server listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive); } return this; } // 省略... }; // 集群监听函数 function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) { exclusive = !!exclusive; if (cluster === undefined) cluster = require('cluster'); // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回 if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); return; } // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数 function listenOnMasterHandle(err, handle) { err = checkBindError(err, port, handle); if (err) { const ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd, flags); } } // 启用监听handle function setupListenHandle(address, port, addressType, backlog, fd, flags) { debug('setupListenHandle', address, port, addressType, backlog, fd); // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过 // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); let rval = null; // 篇幅原因,创建监听句柄的代码... this._handle = rval; } // 在this上设置的faux handle上设置onconnection函数用于监听连接进入 this._handle.onconnection = onconnection; }
同时,在开始解析的时候我们说过,在引入 Cluster 模块的时候,会根据当前进程的env中是否包含NODE_UNIQUE_ID去判断是否为子进程,若为子进程,则执行 child.js
文件
Tips:IPC 通信中发送的message.cmd的值如果以NODE为前缀,它将响应一个内部事件internalMessage
// lib/internal/cluster/child.js // 初始化 const cluster = new EventEmitter(); // 存储生成的 faux handle const handles = new SafeMap(); // 存储监听地址与监听地址index的对应关系 const indexes = new SafeMap(); cluster.isWorker = true; cluster.isMaster = false; cluster.worker = null; cluster.Worker = Worker; // 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量 // 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数 cluster._setupWorker = function() { // 初始化worker实例 const worker = new Worker({ id: +process.env.NODE_UNIQUE_ID | 0, process: process, state: 'online' }); cluster.worker = worker; // 处理断开连接事件 process.once('disconnect', () => { worker.emit('disconnect'); if (!worker.exitedAfterDisconnect) { // Unexpected disconnect, master exited, or some such nastiness, so // worker exits immediately. process.exit(0); } }); // IPC 内部通信事件监听 process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') ReflectApply(_disconnect, worker, [true]); } }; // 添加获取server函数,会在net server监听端口时被执行 // `obj` is a net#Server or a dgram#Socket object. cluster._getServer = function(obj, options, cb) { let address = options.address; // Resolve unix socket paths to absolute paths if (options.port < 0 && typeof address === 'string' && process.platform !== 'win32') address = path.resolve(address); // 生成地址信息的的key const indexesKey = ArrayPrototypeJoin( [ address, options.port, options.addressType, options.fd, ], ':'); // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server let index = indexes.get(indexesKey); if (index === undefined) index = 0; else index++; // 设置 indexesKey 与 index的对应关系 indexes.set(indexesKey, index); // 传递地址信息及index const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); // 向主进程发送queryServer消息 send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数 // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内 if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = (address && address.port) || options.port; send(message); }); }; // 创建 faux handle,并保存其对应关系 // Round-robin. Master distributes handles across workers. function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); let key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } function getsockname(out) { if (key) ObjectAssign(out, message.sockname); return 0; } // 创建Faux handle // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles.has(key) === false); // 保存faux handle handles.set(key, handle); // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数 cb(0, handle); } // 处理请求 // Round-robin connection. function onconnection(message, handle) { // 获取faux handle的key const key = message.key; // 获取faux hadle const server = handles.get(key); const accepted = server !== undefined; send({ ack: message.seq, accepted }); // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求 if (accepted) server.onconnection(0, handle); }
至此,所有的内容都联系起来了。
在之前的代码分析中我们可以知道,Cluster 集群会在 Master 进程中创建 Net Server,在 Worker 进程运行创建 HTTP Server 的时候,会将监听地址的信息传入 cluster._getServer
函数创建一个 faux handle
并设置到子进程的 Net Server 上,在 Worker 进程初始化的时候会注册 IPC 通信回调函数,在回调函数内 ,调用在子进程中 Net Server 模块初始化后的 {faux handle}.onconnection
函数,并将传过来的连接的 handle 传入完成请求响应。
我们可以在 Master 进程中监听 Worker 进程的 error
、disconntect
、exit
事件,在这些事件中去做对应的处理,例如清理退出的进程并重新 fork
,或者使用已经封装好的 npm 包,例如 cfork
在 Egg.js 的多进程模型中,多了另外一个进程类型,即 Agent 进程,该进程主要用于处理多进程不好处理的一些事情还有减少长链接的数量,具体关系如下:
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|
在 egg-cluster
包内,使用了 cfork
包去保证 Worker 进程挂掉后自动重启
在我们的一个 Egg 应用内,日志系统并没有使用 Egg 原生的日志,使用了一个内部基于
Des fichiers différents signifient les performances. des deux processus en exécution Ils sont également différents, par exemple : 🎜rrreee🎜C'est pourquoi les variables partout dans le module Cluster peuvent distinguer différents types de processus. Ensuite, comprenons respectivement le processus spécifique des processus principal et enfant🎜.log4js
rrreee🎜Processus principal🎜
🎜Dans le code ci-dessus, le processus maître ne fait pas grand-chose, il divise simplement le processus enfant en fonction du nombre de processeurs. dans le code source et jetez un œil approximatif. Les descriptions pertinentes se trouvent dans les commentaires du code🎜rrreeerrreee🎜Sous-processus🎜
🎜Dans chaque sous-processus, nous créons un serveur HTTP. puis exécutezLa fonction d'écoute
écoute le port 8000, et l'instance du serveur HTTP est héritée de la chaîne de prototypes Net Server. La fonctionécouter
est la fonctionécoutercode> sur le prototype Net Server. Les détails sont les suivants : 🎜rrreeerrreee🎜En même temps, lorsque nous avons commencé l'analyse, nous avons dit que lors de l'introduction du module Cluster, il sera jugé s'il s'agit d'un processus enfant en fonction du fait qu'il s'agisse d'un processus enfant. l'environnement du processus actuel contient NODE_UNIQUE_ID. S'il s'agit d'un processus enfant, exécutez le fichier <code>child.js
🎜🎜🎜Conseils : Si la valeur du message.cmd envoyé dans la communication IPC est préfixée par NODE, il répondra à un événement interne internalMessage🎜🎜rrreee🎜À ce stade, tout le contenu est connecté . 🎜🎜Pourquoi plusieurs processus enfants peuvent écouter le même port🎜🎜🎜Dans l'analyse de code précédente, nous pouvons savoir que le cluster Cluster créera un Net Server dans le processus Maître et un serveur Net dans le processus Worker Lors de l'exécution de la création d'un serveur HTTP, les informations sur l'adresse d'écoute seront transmises à la fonction
cluster._getServer
pour créer unfaux handle
et le définir au serveur Net du processus enfant. Dans le processus Worker. Lors de l'initialisation, la fonction de rappel de communication IPC sera enregistrée dans la fonction de rappel, la fonction{faux handle}.onconnection
après l'initialisation du Net. Le module serveur dans le processus enfant est appelé et le handle de connexion transmis est transmis. Entrez la réponse à la demande d'achèvement. 🎜🎜Comment assurer la robustesse du travail du cluster🎜🎜🎜Nous pouvons surveiller l'
erreur
et ladéconnexion
du processus Worker dans le processus maître >, les événementsexit
, effectuez le traitement correspondant dans ces événements, comme le nettoyage du processus quitté et le re-fork
, ou l'utilisation de packages npm encapsulés, tels quecfork
🎜🎜Modèle multi-processus Egg.js🎜🎜🎜Dans le modèle multi-processus d'Egg.js, il existe un autre type de processus, à savoir le processus Agent. Ce processus est principalement utilisé pour gérer certaines choses difficiles à gérer avec plusieurs processus et pour réduire le nombre de liens longs. La relation spécifique est la suivante : 🎜rrreee🎜Dans le
egg-clustercode>, <code>cfork pour garantir que le processus Worker redémarre automatiquement après son blocage🎜<h3 data-id="heading-8">🎜Enregistrement du problème🎜🎜🎜🎜Dans l'un de nos Applications Egg, le système de journalisation n'utilise pas les journaux natifs d'Egg, en utilisant une bibliothèque de journaux interne basée sur le package <code>log4js
Lors de son utilisation, le Logger requis est étendu à l'objet Application, de sorte que chaque processus Worker. Logger en créera un nouveau lors de l'initialisation, c'est-à-dire qu'il y aura des problèmes avec plusieurs processus écrivant des journaux, mais il n'y a pas de problème d'erreur avec plusieurs processus écrivant des journaux🎜
Dans le processus de traçage du code source, nous avons constaté que bien que log4js
fournisse le mode Cluster, le mode Cluster de log4js
n'est pas activé dans le package de couche supérieure, donc l'appender de chaque Logger Ils utilisent tous flag a
pour ouvrir un flux d'écriture, mais je n'ai pas eu de réponse jusqu'à présentlog4js
虽然提供了 Cluster 模式,但是在上层封装中并没有开启 log4js
的 Cluster 模式,所以每个 Logger 的 appender 都使用 flag a
打开一个写入流,到这里并没有得到答案
后来在 CNode 中找到了答案,在 unix 下使用 flag a
打开的可写流对应的 libuv 文件池实现是 UV_FS_O_APPEND
,即 O_APPEND
,而 O_APPEND
flag a
sous Unix est UV_FS_O_APPEND
, c'est-à-dire O_APPEND
, et O_APPEND
lui-même est défini comme une opération atomique dans le manuel man. Le noyau garantit l'écriture simultanée de. ce flux inscriptible est sûr et ne nécessite pas de verrous supplémentaires au niveau de la couche application (sauf que l'écriture simultanée sur des systèmes de fichiers de type NFS entraînera la perte ou l'endommagement des informations sur les fichiers), les systèmes de fichiers montés en réseau de type NFS reposent principalement sur la simulation. l'API sous-jacente pour réaliser des opérations de type local, qui ne peuvent évidemment pas être effectuées dans des conditions de concurrence. Restaurez parfaitement ce type d'API d'opération atomique, donc si vos journaux sont écrits sur quelque chose comme le montage sur disque cloud OSS localement, vous ne pouvez pas le faire. écrivez dans plusieurs processus, vous devez la verrouiller manuellement au niveau de la couche d'application
Le tranchant d'une épée vient de l'affûtage, et le parfum des fleurs de prunier vient du froid glacial, et il se précipite vers le canard~ Pour plus de connaissances sur la programmation, veuillez visiter :
Introduction à la programmation🎜 ! ! 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!