Home > Web Front-end > JS Tutorial > Learn about the multi-process model in Node.js in one article

Learn about the multi-process model in Node.js in one article

青灯夜游
Release: 2021-10-20 10:00:26
forward
1778 people have browsed it

This article will introduce you to the multi-process in Node.js, and understand the Cluster module and Egg.js multi-process model. I hope it will be helpful to you!

Learn about the multi-process model in Node.js in one article

As we all know, JS is executed in a single thread, and all asynchrony is completed by the event loop. If a Web service has only one thread, how can it fully What about utilizing idle resources of machines or containers? At the same time, when the code crashes and no exception is caught, the thread will exit. So how does the web service based on Node.js ensure the robustness of the entire application?

Cluster module

Node.js provides the Cluster module to solve the above problems. Through this module, developers can create a cluster by creating a child process. , fully utilizing the resources of the machine or container, and this module allows multiple child processes to listen to the same port. [Recommended learning: "nodejs Tutorial"]

Example

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on(&#39;exit&#39;, function(worker, code, signal) {
    console.log(&#39;worker &#39; + worker.process.pid + &#39; died&#39;);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}
Copy after login

The process of creating a child process through code analysis

First of all, let’s start with const cluster = require('cluster'). This line of code imports the Cluster module of Node, but inside Node, the files introduced by the Master process and the Worker process are not the same. Same, see the following code for details:

&#39;use strict&#39;;

const childOrPrimary = &#39;NODE_UNIQUE_ID&#39; in process.env ? &#39;child&#39; : &#39;master&#39;;
module.exports = require(`internal/cluster/${childOrPrimary}`);
Copy after login

Different files mean that the two processes behave differently during execution, for example:

// internal/cluster/master.js
cluster.isWorker = false;
cluster.isMaster = true;

// internal/cluster/child.js
cluster.isWorker = true;
cluster.isMaster = false;
Copy after login

This is why the variables everywhere in the Cluster module can be distinguished The reasons for different types of processes. Next, let us understand the specific processes from the main and sub-processes respectively

Main process

In the above code, Master The process doesn't do much, it just forks the child process based on the number of CPUs. Let's go deep into the source code and take a rough look. The relevant descriptions are in the comments of the code

// lib/internal/cluster/master.js

// 初始化cluster
const cluster = new EventEmitter();
// 创建监听地址与server对应的map
const handles = new SafeMap();
// 初始化
cluster.isWorker = false;
cluster.isMaster = true;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.

// 自增的子进程id
let ids = 0;

// 向cluster添加fork方法
cluster.fork = function(env) {
  // 初始化cluster.settings
  cluster.setupMaster();
  // 为当前fork的子进程生成当前cluster内的唯一id
  const id = ++ids;
  // 创建子进程
  const workerProcess = createWorkerProcess(id, env);
  // 创建对应的worker实例
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 省略一些worker的事件监听....

  // 监听内部消息事件,并交由onmessage处理
  worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  // cluster发出fork事件
  process.nextTick(emitForkNT, worker);
  // 将worker实例放在cluster.workers中维护
  cluster.workers[worker.id] = worker;
  // 返回worker
  return worker;
};

// 创建子进程函数
function createWorkerProcess(id, env) {
  // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  // 执行参数
  const execArgv = [...cluster.settings.execArgv];

  // 省略debug模式相关逻辑...

  // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

// 内部消息事件处理函数
function onmessage(message, handle) {
  const worker = this;

  if (message.act === &#39;online&#39;)
    online(worker);
  // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server
  else if (message.act === &#39;queryServer&#39;)
    queryServer(worker, message);
  else if (message.act === &#39;listening&#39;)
    listening(worker, message);
  else if (message.act === &#39;exitedAfterDisconnect&#39;)
    exitedAfterDisconnect(worker, message);
  else if (message.act === &#39;close&#39;)
    close(worker, message);
}

// 获取server
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  // 创建当前子进程监听地址信息的key
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  // 在handles map中查询是否有已经创建好的该监听地址的server
  let handle = handles.get(key);

  // 没有对应的server则进行创建
  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === &#39;string&#39; &&
        process.platform !== &#39;win32&#39;) {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // 主、子进程处理连接的方式,默认为轮询
    let constructor = RoundRobinHandle;
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it&#39;s connectionless. There is nothing to send to
    // the workers except raw datagrams and that&#39;s pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === &#39;udp4&#39; ||
        message.addressType === &#39;udp6&#39;) {
      constructor = SharedHandle;
    }

    // 将监听地址信息传入构造函数创建监听实例
    handle = new constructor(key, address, message);
    // 缓存监听实例
    handles.set(key, handle);
  }

  // 向server添加自定义信息,用于server发出listening事件后透传到worker
  if (!handle.data)
    handle.data = message.data;

  // 添加server发出listening事件后的回调函数通知子进程
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}
Copy after login
// lib/internal/cluster/round_robin_handle.js

// 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息
function RoundRobinHandle(key, address, { port, fd, flags }) {
  // 初始化handle
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  // 监听文件描述符,不讨论
  if (fd >= 0)
    this.server.listen({ fd });
  // 监听ip:port
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  // 监听UNIX socket,不讨论
  } else
    this.server.listen(address);  // UNIX socket path.

  // 注册server发出listening事件的回调函数
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

// 添加worker,server发出listening事件后调用master.js中传入的回调函数
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  if (this.server === null)
    return done();

  // Still busy binding.
  this.server.once(&#39;listening&#39;, done);
  this.server.once(&#39;error&#39;, (err) => {
    send(err.errno, null);
  });
};

// 删除worker,轮询时不再分配给该worker
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);

  if (!existed)
    return false;

  this.free.delete(worker.id);

  if (this.all.size !== 0)
    return false;

  for (const handle of this.handles) {
    handle.close();
  }
  this.handles = [];

  this.handle.close();
  this.handle = null;
  return true;
};

// 轮询调度函数
RoundRobinHandle.prototype.distribute = function(err, handle) {
  ArrayPrototypePush(this.handles, handle);
  const [ workerEntry ] = this.free; // this.free is a SafeMap

  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};

// 将handle交给worker
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = ArrayPrototypeShift(this.handles);

  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }

  // 向该worker发出newconn事件
  const message = { act: &#39;newconn&#39;, key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};
Copy after login

child process

In each child process, we create an HTTP Server, and then execute the listen function to listen to port 8000, and the HTTP Server instance is inherited from the Net Server prototype chain. The listen function is the listen function on the Net Server prototype, as follows:

// lib/_http_server.js

function Server(options, requestListener) {
  ....
}

ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
Copy after login
// lib/net.js

Server.prototype.listen = function(...args) {

  // 由于篇幅原因,省略一些参数nomolize和其他监听的处理
  
  // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口
  if (typeof options.port === &#39;number&#39; || typeof options.port === &#39;string&#39;) {
    validatePort(options.port, &#39;options.port&#39;);
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  
  // 省略...
};

// 集群监听函数
function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require(&#39;cluster&#39;);

  // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回
  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数
  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port);
      return server.emit(&#39;error&#39;, ex);
    }

    // Reuse master&#39;s server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

// 启用监听handle
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd);

  // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;);
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;);

    let rval = null;
    
    // 篇幅原因,创建监听句柄的代码...
    
    this._handle = rval;
  }
  
  // 在this上设置的faux handle上设置onconnection函数用于监听连接进入
  this._handle.onconnection = onconnection;
}
Copy after login

At the same time, we said when we started parsing that when introducing the Cluster module At this time, it will be judged whether it is a child process based on whether the env of the current process contains NODE_UNIQUE_ID. If it is a child process, the child.js file

Tips: IPC communication in progress If the value of message.cmd sent is prefixed with NODE, it will respond to an internal event internalMessage

// lib/internal/cluster/child.js

// 初始化
const cluster = new EventEmitter();
// 存储生成的 faux handle
const handles = new SafeMap();
// 存储监听地址与监听地址index的对应关系
const indexes = new SafeMap();
cluster.isWorker = true;
cluster.isMaster = false;
cluster.worker = null;
cluster.Worker = Worker;

// 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量
// 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数
cluster._setupWorker = function() {
  // 初始化worker实例
  const worker = new Worker({
    id: +process.env.NODE_UNIQUE_ID | 0,
    process: process,
    state: &#39;online&#39;
  });

  cluster.worker = worker;

  // 处理断开连接事件
  process.once(&#39;disconnect&#39;, () => {
    worker.emit(&#39;disconnect&#39;);

    if (!worker.exitedAfterDisconnect) {
      // Unexpected disconnect, master exited, or some such nastiness, so
      // worker exits immediately.
      process.exit(0);
    }
  });

  // IPC 内部通信事件监听
  process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  send({ act: &#39;online&#39; });

  function onmessage(message, handle) {
    // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server
    if (message.act === &#39;newconn&#39;)
      onconnection(message, handle);
    else if (message.act === &#39;disconnect&#39;)
      ReflectApply(_disconnect, worker, [true]);
  }
};

// 添加获取server函数,会在net server监听端口时被执行
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === &#39;string&#39; &&
      process.platform !== &#39;win32&#39;)
    address = path.resolve(address);

  // 生成地址信息的的key
  const indexesKey = ArrayPrototypeJoin(
    [
      address,
      options.port,
      options.addressType,
      options.fd,
    ], &#39;:&#39;);

  // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server
  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  // 设置 indexesKey 与 index的对应关系
  indexes.set(indexesKey, index);

  // 传递地址信息及index
  const message = {
    act: &#39;queryServer&#39;,
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  // 向主进程发送queryServer消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;)
      obj._setServerData(reply.data);

    // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数
    // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内
    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once(&#39;listening&#39;, () => {
    cluster.worker.state = &#39;listening&#39;;
    const address = obj.address();
    message.act = &#39;listening&#39;;
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

// 创建 faux handle,并保存其对应关系
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: &#39;close&#39;, key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // 创建Faux handle
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  // 保存faux handle
  handles.set(key, handle);
  // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数
  cb(0, handle);
}

// 处理请求
// Round-robin connection.
function onconnection(message, handle) {
  // 获取faux handle的key
  const key = message.key;
  // 获取faux hadle
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求
  if (accepted)
    server.onconnection(0, handle);
}
Copy after login

At this point, all the contents are connected.

Why multiple child processes can listen to the same port

In the previous code analysis, we can know that the Cluster cluster will create a Net Server in the Master process, and a Net Server in the Worker process When the process runs to create an HTTP Server, the listening address information will be passed into the cluster._getServer function to create a faux handle and set it to the Net Server of the child process, and initialize it in the Worker process The IPC communication callback function will be registered. In the callback function, the {faux handle}.onconnection function after initialization of the Net Server module in the child process is called, and the passed connection handle is passed in to complete. Request response.

How to ensure the robustness of cluster work

We can monitor the error and disconntect of the Worker process in the Master process , exit events, perform corresponding processing in these events, such as cleaning up the exited process and re fork, or using encapsulated npm packages, such as cfork

Egg.js multi-process model

In the multi-process model of Egg.js, there is another process type, the Agent process, which is mainly It is used to deal with some things that are difficult to handle with multiple processes and reduce the number of long links. The specific relationship is as follows:

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
Copy after login

In the egg-cluster package, cfork# is used ## Package to ensure that the Worker process automatically restarts after it hangs

Problem Record

In one of our Egg applications, the logging system does not use Egg native For logs, an internal log library based on the

log4js package is used. When used, the required Logger is extended to the Application object, so that each Worker process will create a new one during initialization. Logger, that is, there will be problems with multiple processes writing logs, but there is no error problem with multiple processes writing logs

In the process of tracing the source code, we found that although log4js provides Cluster mode, the Cluster mode of log4js is not enabled in the upper-layer package, so each Logger The appenders all use flag a to open a write stream. I didn’t get the answer until here.

I later found the answer in CNode and used under unix. The libuv file pool implementation corresponding to the writable stream opened by flag a is UV_FS_O_APPEND, that is, O_APPEND, and O_APPEND itself is defined as an atom in the man manual Operation, the kernel ensures that concurrent writes to this writable stream are safe and does not require additional locks at the application layer (except that concurrent writes on NFS-like file systems will cause file information to be lost or damaged), NFS-like network mounts The file system mainly relies on simulating the underlying API to implement quasi-local operations. Obviously, this type of atomic operation API cannot be perfectly restored under competitive conditions, so if your logs are written to something like the oss cloud disk mounted locally, You can't do this. If you write in multiple processes, you must manually lock it at the application layer.

Conclusion

The edge of a sword comes from sharpening, and the fragrance of plum blossoms comes from the bitter cold. Duck~

For more programming-related knowledge, please visit: Introduction to Programming! !

The above is the detailed content of Learn about the multi-process model in Node.js in one article. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:juejin.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template