한 기사에서 Node.js의 다중 프로세스 모델에 대해 알아보세요.

青灯夜游
풀어 주다: 2021-10-20 10:00:26
앞으로
1714명이 탐색했습니다.

이 글에서는 Node.js의 다중 프로세스를 소개하고 Cluster 모듈과 Egg.js 다중 프로세스 모델에 대해 알아보는 것이 도움이 되기를 바랍니다.

한 기사에서 Node.js의 다중 프로세스 모델에 대해 알아보세요.

우리 모두 알고 있듯이 JS는 단일 스레드에서 실행되고 모든 비동기화는 이벤트 루프에 의해 완료됩니다. 웹 서비스에 스레드가 하나만 있는 경우 머신의 유휴 리소스를 최대한 활용하는 방법은 무엇입니까? 아니면 컨테이너? 동시에 코드가 충돌하고 예외가 발견되지 않으면 스레드가 종료됩니다. 그렇다면 Node.js 기반 웹 서비스는 어떻게 전체 애플리케이션의 견고성을 보장합니까?

클러스터 모듈

Node.js는 위의 문제를 해결하기 위해 클러스터 모듈을 제공합니다. 이 모듈을 통해 개발자는 머신이나 컨테이너의 리소스를 최대한 활용할 수 있는 하위 프로세스 모드를 생성하여 클러스터를 만들 수 있습니다. 동시에 이 모듈은 여러 하위 프로세스가 동일한 포트에서 수신 대기하는 것을 허용합니다. [추천 학습: "nodejs 튜토리얼"]

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on(&#39;exit&#39;, function(worker, code, signal) {
    console.log(&#39;worker &#39; + worker.process.pid + &#39; died&#39;);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}
로그인 후 복사

코드 파싱을 통해 하위 프로세스를 생성하는 과정

먼저 const Cluster = require('cluster')</로 시작하세요. code> 에서 시작하는 이 코드 줄은 Node의 Cluster 모듈을 가져오지만 Node 내부에서는 Master 프로세스와 Worker 프로세스에서 도입한 파일이 다릅니다. 자세한 내용은 다음 코드를 참조하세요. <code>const cluster = require(&#39;cluster&#39;) 说起,这行代码导入了 Node 的 Cluster 模块,而在 Node 内部,Master 进程与 Worker 进程引入的文件却不一样,详情见如下代码:

&#39;use strict&#39;;

const childOrPrimary = &#39;NODE_UNIQUE_ID&#39; in process.env ? &#39;child&#39; : &#39;master&#39;;
module.exports = require(`internal/cluster/${childOrPrimary}`);
로그인 후 복사

不同的文件意味着两种进程在执行中的表现也不一样,例如:

// internal/cluster/master.js
cluster.isWorker = false;
cluster.isMaster = true;

// internal/cluster/child.js
cluster.isWorker = true;
cluster.isMaster = false;
로그인 후 복사

这也是为什么 Cluster 模块到处的变量能区分不同类型进程的原因,接下来让我们分别从主、子进程两个方向去了解具体的过程

主进程

在上述代码里,Master 进程并没有做太多事情,只是根据 CPU 数量去 fork 子进程,那么我们深入到源代码里大致来看一下,相关描述均在代码的注释内

// lib/internal/cluster/master.js

// 初始化cluster
const cluster = new EventEmitter();
// 创建监听地址与server对应的map
const handles = new SafeMap();
// 初始化
cluster.isWorker = false;
cluster.isMaster = true;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.

// 自增的子进程id
let ids = 0;

// 向cluster添加fork方法
cluster.fork = function(env) {
  // 初始化cluster.settings
  cluster.setupMaster();
  // 为当前fork的子进程生成当前cluster内的唯一id
  const id = ++ids;
  // 创建子进程
  const workerProcess = createWorkerProcess(id, env);
  // 创建对应的worker实例
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 省略一些worker的事件监听....

  // 监听内部消息事件,并交由onmessage处理
  worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  // cluster发出fork事件
  process.nextTick(emitForkNT, worker);
  // 将worker实例放在cluster.workers中维护
  cluster.workers[worker.id] = worker;
  // 返回worker
  return worker;
};

// 创建子进程函数
function createWorkerProcess(id, env) {
  // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  // 执行参数
  const execArgv = [...cluster.settings.execArgv];

  // 省略debug模式相关逻辑...

  // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

// 内部消息事件处理函数
function onmessage(message, handle) {
  const worker = this;

  if (message.act === &#39;online&#39;)
    online(worker);
  // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server
  else if (message.act === &#39;queryServer&#39;)
    queryServer(worker, message);
  else if (message.act === &#39;listening&#39;)
    listening(worker, message);
  else if (message.act === &#39;exitedAfterDisconnect&#39;)
    exitedAfterDisconnect(worker, message);
  else if (message.act === &#39;close&#39;)
    close(worker, message);
}

// 获取server
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  // 创建当前子进程监听地址信息的key
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  // 在handles map中查询是否有已经创建好的该监听地址的server
  let handle = handles.get(key);

  // 没有对应的server则进行创建
  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === &#39;string&#39; &&
        process.platform !== &#39;win32&#39;) {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // 主、子进程处理连接的方式,默认为轮询
    let constructor = RoundRobinHandle;
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it&#39;s connectionless. There is nothing to send to
    // the workers except raw datagrams and that&#39;s pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === &#39;udp4&#39; ||
        message.addressType === &#39;udp6&#39;) {
      constructor = SharedHandle;
    }

    // 将监听地址信息传入构造函数创建监听实例
    handle = new constructor(key, address, message);
    // 缓存监听实例
    handles.set(key, handle);
  }

  // 向server添加自定义信息,用于server发出listening事件后透传到worker
  if (!handle.data)
    handle.data = message.data;

  // 添加server发出listening事件后的回调函数通知子进程
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}
로그인 후 복사
// lib/internal/cluster/round_robin_handle.js

// 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息
function RoundRobinHandle(key, address, { port, fd, flags }) {
  // 初始化handle
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  // 监听文件描述符,不讨论
  if (fd >= 0)
    this.server.listen({ fd });
  // 监听ip:port
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  // 监听UNIX socket,不讨论
  } else
    this.server.listen(address);  // UNIX socket path.

  // 注册server发出listening事件的回调函数
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

// 添加worker,server发出listening事件后调用master.js中传入的回调函数
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  if (this.server === null)
    return done();

  // Still busy binding.
  this.server.once(&#39;listening&#39;, done);
  this.server.once(&#39;error&#39;, (err) => {
    send(err.errno, null);
  });
};

// 删除worker,轮询时不再分配给该worker
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);

  if (!existed)
    return false;

  this.free.delete(worker.id);

  if (this.all.size !== 0)
    return false;

  for (const handle of this.handles) {
    handle.close();
  }
  this.handles = [];

  this.handle.close();
  this.handle = null;
  return true;
};

// 轮询调度函数
RoundRobinHandle.prototype.distribute = function(err, handle) {
  ArrayPrototypePush(this.handles, handle);
  const [ workerEntry ] = this.free; // this.free is a SafeMap

  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};

// 将handle交给worker
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = ArrayPrototypeShift(this.handles);

  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }

  // 向该worker发出newconn事件
  const message = { act: &#39;newconn&#39;, key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};
로그인 후 복사

子进程

在每个子进程中,我们都创建了一个 HTTP Server,然后执行 listen 函数监听 8000 端口,而 HTTP Server 实例是由 Net Server 原型链继承得到的,listen 函数即为 Net Server 原型上的 listen 函数,具体如下:

// lib/_http_server.js

function Server(options, requestListener) {
  ....
}

ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
로그인 후 복사
// lib/net.js

Server.prototype.listen = function(...args) {

  // 由于篇幅原因,省略一些参数nomolize和其他监听的处理
  
  // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口
  if (typeof options.port === &#39;number&#39; || typeof options.port === &#39;string&#39;) {
    validatePort(options.port, &#39;options.port&#39;);
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  
  // 省略...
};

// 集群监听函数
function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require(&#39;cluster&#39;);

  // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回
  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数
  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port);
      return server.emit(&#39;error&#39;, ex);
    }

    // Reuse master&#39;s server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

// 启用监听handle
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd);

  // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;);
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;);

    let rval = null;
    
    // 篇幅原因,创建监听句柄的代码...
    
    this._handle = rval;
  }
  
  // 在this上设置的faux handle上设置onconnection函数用于监听连接进入
  this._handle.onconnection = onconnection;
}
로그인 후 복사

同时,在开始解析的时候我们说过,在引入 Cluster 模块的时候,会根据当前进程的env中是否包含NODE_UNIQUE_ID去判断是否为子进程,若为子进程,则执行 child.js 文件

Tips:IPC 通信中发送的message.cmd的值如果以NODE为前缀,它将响应一个内部事件internalMessage

// lib/internal/cluster/child.js

// 初始化
const cluster = new EventEmitter();
// 存储生成的 faux handle
const handles = new SafeMap();
// 存储监听地址与监听地址index的对应关系
const indexes = new SafeMap();
cluster.isWorker = true;
cluster.isMaster = false;
cluster.worker = null;
cluster.Worker = Worker;

// 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量
// 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数
cluster._setupWorker = function() {
  // 初始化worker实例
  const worker = new Worker({
    id: +process.env.NODE_UNIQUE_ID | 0,
    process: process,
    state: &#39;online&#39;
  });

  cluster.worker = worker;

  // 处理断开连接事件
  process.once(&#39;disconnect&#39;, () => {
    worker.emit(&#39;disconnect&#39;);

    if (!worker.exitedAfterDisconnect) {
      // Unexpected disconnect, master exited, or some such nastiness, so
      // worker exits immediately.
      process.exit(0);
    }
  });

  // IPC 内部通信事件监听
  process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  send({ act: &#39;online&#39; });

  function onmessage(message, handle) {
    // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server
    if (message.act === &#39;newconn&#39;)
      onconnection(message, handle);
    else if (message.act === &#39;disconnect&#39;)
      ReflectApply(_disconnect, worker, [true]);
  }
};

// 添加获取server函数,会在net server监听端口时被执行
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === &#39;string&#39; &&
      process.platform !== &#39;win32&#39;)
    address = path.resolve(address);

  // 生成地址信息的的key
  const indexesKey = ArrayPrototypeJoin(
    [
      address,
      options.port,
      options.addressType,
      options.fd,
    ], &#39;:&#39;);

  // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server
  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  // 设置 indexesKey 与 index的对应关系
  indexes.set(indexesKey, index);

  // 传递地址信息及index
  const message = {
    act: &#39;queryServer&#39;,
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  // 向主进程发送queryServer消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;)
      obj._setServerData(reply.data);

    // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数
    // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内
    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once(&#39;listening&#39;, () => {
    cluster.worker.state = &#39;listening&#39;;
    const address = obj.address();
    message.act = &#39;listening&#39;;
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

// 创建 faux handle,并保存其对应关系
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: &#39;close&#39;, key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // 创建Faux handle
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  // 保存faux handle
  handles.set(key, handle);
  // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数
  cb(0, handle);
}

// 处理请求
// Round-robin connection.
function onconnection(message, handle) {
  // 获取faux handle的key
  const key = message.key;
  // 获取faux hadle
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求
  if (accepted)
    server.onconnection(0, handle);
}
로그인 후 복사

至此,所有的内容都联系起来了。

为什么多个子进程可以监听同一个端口

在之前的代码分析中我们可以知道,Cluster 集群会在 Master 进程中创建 Net Server,在 Worker 进程运行创建 HTTP Server 的时候,会将监听地址的信息传入 cluster._getServer 函数创建一个 faux handle 并设置到子进程的 Net Server 上,在 Worker 进程初始化的时候会注册 IPC 通信回调函数,在回调函数内 ,调用在子进程中 Net Server 模块初始化后的 {faux handle}.onconnection 函数,并将传过来的连接的 handle 传入完成请求响应。

如何保证集群工作的健壮性

我们可以在 Master 进程中监听 Worker 进程的 errordisconntectexit 事件,在这些事件中去做对应的处理,例如清理退出的进程并重新 fork,或者使用已经封装好的 npm 包,例如 cfork

Egg.js 多进程模型

在 Egg.js 的多进程模型中,多了另外一个进程类型,即 Agent 进程,该进程主要用于处理多进程不好处理的一些事情还有减少长链接的数量,具体关系如下:

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
로그인 후 복사

egg-cluster 包内,使用了 cfork 包去保证 Worker 进程挂掉后自动重启

问题记录

在我们的一个 Egg 应用内,日志系统并没有使用 Egg 原生的日志,使用了一个内部基于 log4jsrrreee

파일이 다르다는 것은 성능을 의미합니다. 실행 중인 두 프로세스도 다릅니다. 예: 🎜rrreee🎜이것이 클러스터 모듈의 모든 변수가 서로 다른 유형의 프로세스를 구별할 수 있는 이유입니다. 다음으로 기본 프로세스와 하위 프로세스의 특정 프로세스를 각각 이해하겠습니다🎜

🎜기본 프로세스🎜

🎜위 코드에서 마스터 프로세스는 많은 작업을 수행하지 않으며 단지 CPU 수에 따라 하위 프로세스를 포크합니다. 소스 코드를 살펴보고 관련 설명은 코드 주석에 있습니다.🎜rrreeerrreee

🎜Subprocess🎜

🎜각 하위 프로세스에서 HTTP 서버를 생성합니다. 그런 다음 를 실행합니다. Listen 함수는 포트 8000을 수신하고 HTTP Server 인스턴스는 Net Server 프로토타입 체인에서 상속됩니다. listen 함수는 listen입니다. code> 함수의 세부 내용은 다음과 같습니다. 🎜rrreeerrreee🎜동시에 파싱을 시작할 때 Cluster 모듈을 도입할 때 자식 프로세스인지 여부를 기준으로 판단한다고 했습니다. 현재 프로세스의 환경에는 NODE_UNIQUE_ID가 포함되어 있습니다. 하위 프로세스인 경우 <code>child.js 파일을 실행하세요🎜🎜🎜팁: IPC 통신으로 전송된 message.cmd 값 앞에 NODE가 붙는 경우 내부 이벤트에 응답합니다.ternalMessage🎜🎜rrreee🎜이 시점에서 모든 콘텐츠는 연결됩니다. 🎜

🎜여러 하위 프로세스가 동일한 포트를 수신할 수 있는 이유🎜🎜🎜이전 코드 분석에서 Cluster 클러스터가 Master 프로세스에 Net Server를 생성하고 Worker 프로세스의 Net Server HTTP 서버를 생성하기 위해 실행할 때 수신 주소 정보가 cluster._getServer 함수에 전달되어 faux 핸들을 생성하고 설정합니다. Worker 프로세스에서 초기화 중에 IPC 통신 콜백 함수가 등록됩니다. 콜백 함수에서는 Net 초기화 후 {faux handler}.onconnection 함수가 등록됩니다. 자식 프로세스의 서버 모듈이 호출되고, 전달된 연결 핸들이 완료 요청 응답을 전달받습니다. 🎜

🎜클러스터 작업의 견고성을 보장하는 방법🎜🎜🎜작업자 프로세스의 오류연결 끊김을 모니터링할 수 있습니다. 마스터 프로세스 >, exit 이벤트에서 종료된 프로세스를 정리하고 다시 fork하거나 cfork

🎜

🎜Egg.js 다중 프로세스 모델🎜🎜🎜Egg.js의 다중 프로세스 모델에는 또 다른 프로세스 유형인 Agent가 있습니다. 이 프로세스는 주로 여러 프로세스로 처리하기 어려운 일부 작업을 처리하고 긴 링크 수를 줄이기 위해 사용됩니다. 구체적인 관계는 다음과 같습니다. 🎜rrreee🎜 egg-cluster에서 패키지, cfork 패키지를 사용하여 작업자 프로세스가 중단된 후 자동으로 다시 시작되도록 합니다🎜<h3 data-id="heading-8">🎜문제 기록🎜🎜🎜🎜Egg 애플리케이션 중 하나에서 , 로그 시스템은 <code>log4js 패키지를 기반으로 하는 내부 로그 라이브러리를 사용하여 Egg의 기본 로그를 사용하지 않습니다. 이를 사용하면 필요한 Logger가 Application 객체로 확장되어 각 Worker 프로세스가 생성됩니다. 즉, 여러 프로세스가 로그를 작성하는 데 문제가 있지만 여러 프로세스가 로그를 작성하는 데는 오류 문제가 없습니다🎜

소스 코드를 추적하는 과정에서 log4js가 클러스터 모드를 제공하지만 상위 패키지에서는 log4js의 클러스터 모드가 활성화되어 있지 않은 것을 발견했습니다. 각 로거의 추가자 모두 flag a를 사용하여 쓰기 스트림을 열었지만 지금까지 답변을 얻지 못했습니다log4js 虽然提供了 Cluster 模式,但是在上层封装中并没有开启 log4js 的 Cluster 模式,所以每个 Logger 的 appender 都使用 flag a 打开一个写入流,到这里并没有得到答案

后来在 CNode 中找到了答案,在 unix 下使用 flag a 打开的可写流对应的 libuv 文件池实现是 UV_FS_O_APPEND,即 O_APPEND,而 O_APPEND

나중에 CNode

에서 찾을 수 있습니다. unix에서 flag a를 사용하여 열린 쓰기 가능 스트림에 해당하는 libuv 파일 풀 구현은 ​​UV_FS_O_APPEND, 즉 O_APPENDO_APPEND 자체는 man 매뉴얼에서 원자적 연산으로 정의되어 있습니다. 커널은 다음의 동시 쓰기를 보장합니다. 이 쓰기 가능한 스트림은 안전하며 애플리케이션 계층에서 추가 잠금이 필요하지 않습니다(NFS와 같은 파일 시스템에 동시 쓰기로 인해 파일 정보가 손실되거나 손상된다는 점을 제외하고), NFS와 같은 네트워크 마운트 파일 시스템은 주로 시뮬레이션에 의존합니다. 경쟁 조건에서 수행할 수 없는 로컬과 유사한 작업을 달성하기 위한 기본 API 이러한 유형의 원자 작업 API를 완벽하게 복원하므로 로그가 로컬에 마운트된 OSS 클라우드 디스크와 같은 것에 기록되면 이를 수행할 수 없습니다. 여러 공정으로 쓰려면 반드시 어플리케이션 레이어에서 수동으로 잠가야 합니다

결론

칼의 날카로운 날은 갈음에서 나오고, 매화향은 매서운 추위에서 나오며, 오리에게 달려갑니다~ 더 많은 프로그래밍 관련 지식을 보려면

프로그래밍 소개🎜를 방문하세요! ! 🎜

위 내용은 한 기사에서 Node.js의 다중 프로세스 모델에 대해 알아보세요.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:juejin.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
최신 이슈
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿