Maison > interface Web > js tutoriel > le corps du texte

Découvrez le modèle multi-processus dans Node.js dans un article

青灯夜游
Libérer: 2021-10-20 10:00:26
avant
1638 Les gens l'ont consulté

Cet article vous présentera le multi-processus dans Node.js et en apprendra davantage sur le module Cluster et le modèle multi-processus Egg.js. J'espère qu'il vous sera utile !

Découvrez le modèle multi-processus dans Node.js dans un article

Comme nous le savons tous, JS est exécuté dans un seul thread, et toute asynchronisation est complétée par la boucle d'événements. Si un service Web n'a qu'un seul thread, comment utiliser pleinement les ressources inactives de la machine. ou un conteneur ? Dans le même temps, lorsque le code plante et qu'aucune exception n'est détectée, le thread se ferme. Alors, comment le service Web basé sur Node.js garantit-il la robustesse de l'ensemble de l'application ?

Module Cluster

Node.js fournit le module Cluster pour résoudre les problèmes ci-dessus. Grâce à ce module, les développeurs peuvent créer un cluster en créant un mode sous-processus pour utiliser pleinement les ressources de la machine ou du conteneur. en même temps, ce module permet plusieurs écoutes du processus enfant sur le même port. [Apprentissage recommandé : "Tutoriel Nodejs"]

Exemple

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on(&#39;exit&#39;, function(worker, code, signal) {
    console.log(&#39;worker &#39; + worker.process.pid + &#39; died&#39;);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
  }).listen(8000);
}
Copier après la connexion

Le processus de création d'un processus enfant via l'analyse de code

Commencez d'abord par const cluster = require('cluster')</ code> À partir de , cette ligne de code importe le module Cluster de Node, mais à l'intérieur de Node, les fichiers introduits par le processus Master et le processus Worker sont différents Pour plus de détails, voir le code suivant : <code>const cluster = require(&#39;cluster&#39;) 说起,这行代码导入了 Node 的 Cluster 模块,而在 Node 内部,Master 进程与 Worker 进程引入的文件却不一样,详情见如下代码:

&#39;use strict&#39;;

const childOrPrimary = &#39;NODE_UNIQUE_ID&#39; in process.env ? &#39;child&#39; : &#39;master&#39;;
module.exports = require(`internal/cluster/${childOrPrimary}`);
Copier après la connexion

不同的文件意味着两种进程在执行中的表现也不一样,例如:

// internal/cluster/master.js
cluster.isWorker = false;
cluster.isMaster = true;

// internal/cluster/child.js
cluster.isWorker = true;
cluster.isMaster = false;
Copier après la connexion

这也是为什么 Cluster 模块到处的变量能区分不同类型进程的原因,接下来让我们分别从主、子进程两个方向去了解具体的过程

主进程

在上述代码里,Master 进程并没有做太多事情,只是根据 CPU 数量去 fork 子进程,那么我们深入到源代码里大致来看一下,相关描述均在代码的注释内

// lib/internal/cluster/master.js

// 初始化cluster
const cluster = new EventEmitter();
// 创建监听地址与server对应的map
const handles = new SafeMap();
// 初始化
cluster.isWorker = false;
cluster.isMaster = true;
cluster.workers = {};
cluster.settings = {};
cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.

// 自增的子进程id
let ids = 0;

// 向cluster添加fork方法
cluster.fork = function(env) {
  // 初始化cluster.settings
  cluster.setupMaster();
  // 为当前fork的子进程生成当前cluster内的唯一id
  const id = ++ids;
  // 创建子进程
  const workerProcess = createWorkerProcess(id, env);
  // 创建对应的worker实例
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  
  // 省略一些worker的事件监听....

  // 监听内部消息事件,并交由onmessage处理
  worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  // cluster发出fork事件
  process.nextTick(emitForkNT, worker);
  // 将worker实例放在cluster.workers中维护
  cluster.workers[worker.id] = worker;
  // 返回worker
  return worker;
};

// 创建子进程函数
function createWorkerProcess(id, env) {
  // 将主进程的env、调用cluster.fork时传入的env以及NODE_UNIQUE_ID env构建成一个env对象
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  // 执行参数
  const execArgv = [...cluster.settings.execArgv];

  // 省略debug模式相关逻辑...

  // 调用child_process模块的fork函数创建子进程并返回,至此子进程实例创建完成
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

// 内部消息事件处理函数
function onmessage(message, handle) {
  const worker = this;

  if (message.act === &#39;online&#39;)
    online(worker);
  // 当子进程向主进程发出queryServer消息后,执行queryServer函数,创建server
  else if (message.act === &#39;queryServer&#39;)
    queryServer(worker, message);
  else if (message.act === &#39;listening&#39;)
    listening(worker, message);
  else if (message.act === &#39;exitedAfterDisconnect&#39;)
    exitedAfterDisconnect(worker, message);
  else if (message.act === &#39;close&#39;)
    close(worker, message);
}

// 获取server
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect)
    return;

  // 创建当前子进程监听地址信息的key
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  // 在handles map中查询是否有已经创建好的该监听地址的server
  let handle = handles.get(key);

  // 没有对应的server则进行创建
  if (handle === undefined) {
    let address = message.address;

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (message.port < 0 && typeof address === &#39;string&#39; &&
        process.platform !== &#39;win32&#39;) {

      address = path.relative(process.cwd(), address);

      if (message.address.length < address.length)
        address = message.address;
    }

    // 主、子进程处理连接的方式,默认为轮询
    let constructor = RoundRobinHandle;
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it&#39;s connectionless. There is nothing to send to
    // the workers except raw datagrams and that&#39;s pointless.
    if (schedulingPolicy !== SCHED_RR ||
        message.addressType === &#39;udp4&#39; ||
        message.addressType === &#39;udp6&#39;) {
      constructor = SharedHandle;
    }

    // 将监听地址信息传入构造函数创建监听实例
    handle = new constructor(key, address, message);
    // 缓存监听实例
    handles.set(key, handle);
  }

  // 向server添加自定义信息,用于server发出listening事件后透传到worker
  if (!handle.data)
    handle.data = message.data;

  // 添加server发出listening事件后的回调函数通知子进程
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.

    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}
Copier après la connexion
// lib/internal/cluster/round_robin_handle.js

// 构造函数,参数为server对应的key,ip地址(对于http(s)来说),监听相关信息
function RoundRobinHandle(key, address, { port, fd, flags }) {
  // 初始化handle
  this.key = key;
  this.all = new SafeMap();
  this.free = new SafeMap();
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  // 监听文件描述符,不讨论
  if (fd >= 0)
    this.server.listen({ fd });
  // 监听ip:port
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  // 监听UNIX socket,不讨论
  } else
    this.server.listen(address);  // UNIX socket path.

  // 注册server发出listening事件的回调函数
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    this.server._handle = null;
    this.server = null;
  });
}

// 添加worker,server发出listening事件后调用master.js中传入的回调函数
RoundRobinHandle.prototype.add = function(worker, send) {
  assert(this.all.has(worker.id) === false);
  this.all.set(worker.id, worker);

  const done = () => {
    if (this.handle.getsockname) {
      const out = {};
      this.handle.getsockname(out);
      // TODO(bnoordhuis) Check err.
      send(null, { sockname: out }, null);
    } else {
      send(null, null, null);  // UNIX socket.
    }

    this.handoff(worker);  // In case there are connections pending.
  };

  if (this.server === null)
    return done();

  // Still busy binding.
  this.server.once(&#39;listening&#39;, done);
  this.server.once(&#39;error&#39;, (err) => {
    send(err.errno, null);
  });
};

// 删除worker,轮询时不再分配给该worker
RoundRobinHandle.prototype.remove = function(worker) {
  const existed = this.all.delete(worker.id);

  if (!existed)
    return false;

  this.free.delete(worker.id);

  if (this.all.size !== 0)
    return false;

  for (const handle of this.handles) {
    handle.close();
  }
  this.handles = [];

  this.handle.close();
  this.handle = null;
  return true;
};

// 轮询调度函数
RoundRobinHandle.prototype.distribute = function(err, handle) {
  ArrayPrototypePush(this.handles, handle);
  const [ workerEntry ] = this.free; // this.free is a SafeMap

  if (ArrayIsArray(workerEntry)) {
    const { 0: workerId, 1: worker } = workerEntry;
    this.free.delete(workerId);
    this.handoff(worker);
  }
};

// 将handle交给worker
RoundRobinHandle.prototype.handoff = function(worker) {
  if (!this.all.has(worker.id)) {
    return;  // Worker is closing (or has closed) the server.
  }

  const handle = ArrayPrototypeShift(this.handles);

  if (handle === undefined) {
    this.free.set(worker.id, worker);  // Add to ready queue again.
    return;
  }

  // 向该worker发出newconn事件
  const message = { act: &#39;newconn&#39;, key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted)
      handle.close();
    else
      this.distribute(0, handle);  // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};
Copier après la connexion

子进程

在每个子进程中,我们都创建了一个 HTTP Server,然后执行 listen 函数监听 8000 端口,而 HTTP Server 实例是由 Net Server 原型链继承得到的,listen 函数即为 Net Server 原型上的 listen 函数,具体如下:

// lib/_http_server.js

function Server(options, requestListener) {
  ....
}

ObjectSetPrototypeOf(Server.prototype, net.Server.prototype);
ObjectSetPrototypeOf(Server, net.Server);
Copier après la connexion
// lib/net.js

Server.prototype.listen = function(...args) {

  // 由于篇幅原因,省略一些参数nomolize和其他监听的处理
  
  // 经过这段逻辑中,会调用listenInCluster函数去真正的监听端口
  if (typeof options.port === &#39;number&#39; || typeof options.port === &#39;string&#39;) {
    validatePort(options.port, &#39;options.port&#39;);
    backlog = options.backlog || backlogFromArgs;
    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  
  // 省略...
};

// 集群监听函数
function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive, flags) {
  exclusive = !!exclusive;

  if (cluster === undefined) cluster = require(&#39;cluster&#39;);

  // 判断是否是master,单进程中cluster.isMaster默认为true,然后进行监听并返回
  if (cluster.isMaster || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  // 在子进程中,会将监听地址信息传入cluster实例中的_getServer函数从而获取一个faux handle
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // Get the master&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // 获取net server回调函数,拿到faux handle之后,调用_listen2函数,即setupListenHandle函数
  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port);
      return server.emit(&#39;error&#39;, ex);
    }

    // Reuse master&#39;s server handle
    server._handle = handle;
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

// 启用监听handle
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd);

  // 如同英文注释所说的那样,如果没有监听句柄,则创建,有监听句柄则跳过
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;);
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;);

    let rval = null;
    
    // 篇幅原因,创建监听句柄的代码...
    
    this._handle = rval;
  }
  
  // 在this上设置的faux handle上设置onconnection函数用于监听连接进入
  this._handle.onconnection = onconnection;
}
Copier après la connexion

同时,在开始解析的时候我们说过,在引入 Cluster 模块的时候,会根据当前进程的env中是否包含NODE_UNIQUE_ID去判断是否为子进程,若为子进程,则执行 child.js 文件

Tips:IPC 通信中发送的message.cmd的值如果以NODE为前缀,它将响应一个内部事件internalMessage

// lib/internal/cluster/child.js

// 初始化
const cluster = new EventEmitter();
// 存储生成的 faux handle
const handles = new SafeMap();
// 存储监听地址与监听地址index的对应关系
const indexes = new SafeMap();
cluster.isWorker = true;
cluster.isMaster = false;
cluster.worker = null;
cluster.Worker = Worker;

// 子进程启动时会执行该函数,进行初始化,同时在执行完毕后,会删除 env 中的 NODE_UNIQUE_ID 环境变量
// 详细代码见 lib/internal/bootstrap/pre_excution.js 中的 initializeClusterIPC 函数
cluster._setupWorker = function() {
  // 初始化worker实例
  const worker = new Worker({
    id: +process.env.NODE_UNIQUE_ID | 0,
    process: process,
    state: &#39;online&#39;
  });

  cluster.worker = worker;

  // 处理断开连接事件
  process.once(&#39;disconnect&#39;, () => {
    worker.emit(&#39;disconnect&#39;);

    if (!worker.exitedAfterDisconnect) {
      // Unexpected disconnect, master exited, or some such nastiness, so
      // worker exits immediately.
      process.exit(0);
    }
  });

  // IPC 内部通信事件监听
  process.on(&#39;internalMessage&#39;, internal(worker, onmessage));
  send({ act: &#39;online&#39; });

  function onmessage(message, handle) {
    // 如果为新连接,则执行 onconnection 函数将得到的句柄传入子进程中启动的HTTP Server
    if (message.act === &#39;newconn&#39;)
      onconnection(message, handle);
    else if (message.act === &#39;disconnect&#39;)
      ReflectApply(_disconnect, worker, [true]);
  }
};

// 添加获取server函数,会在net server监听端口时被执行
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function(obj, options, cb) {
  let address = options.address;

  // Resolve unix socket paths to absolute paths
  if (options.port < 0 && typeof address === &#39;string&#39; &&
      process.platform !== &#39;win32&#39;)
    address = path.resolve(address);

  // 生成地址信息的的key
  const indexesKey = ArrayPrototypeJoin(
    [
      address,
      options.port,
      options.addressType,
      options.fd,
    ], &#39;:&#39;);

  // 检查是否缓存了indexedKey,如果没有,则表明是新的监听地址,在 master.js 中会生成新的net server
  let index = indexes.get(indexesKey);

  if (index === undefined)
    index = 0;
  else
    index++;

  // 设置 indexesKey 与 index的对应关系
  indexes.set(indexesKey, index);

  // 传递地址信息及index
  const message = {
    act: &#39;queryServer&#39;,
    index,
    data: null,
    ...options
  };

  message.address = address;

  // Set custom data on handle (i.e. tls tickets key)
  if (obj._getServerData)
    message.data = obj._getServerData();

  // 向主进程发送queryServer消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;)
      obj._setServerData(reply.data);

    // 根据相应负载均衡handle添加worker时的处理,执行相应的负载均衡代码,并执行 cb 函数
    // 轮询是没有传递handle的,对应代码在 RoundRobinHandle.prototype.add 内
    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

  obj.once(&#39;listening&#39;, () => {
    cluster.worker.state = &#39;listening&#39;;
    const address = obj.address();
    message.act = &#39;listening&#39;;
    message.port = (address && address.port) || options.port;
    send(message);
  });
};

// 创建 faux handle,并保存其对应关系
// Round-robin. Master distributes handles across workers.
function rr(message, indexesKey, cb) {
  if (message.errno)
    return cb(message.errno, null);

  let key = message.key;

  function listen(backlog) {
    // TODO(bnoordhuis) Send a message to the master that tells it to
    // update the backlog size. The actual backlog should probably be
    // the largest requested size by any worker.
    return 0;
  }

  function close() {
    // lib/net.js treats server._handle.close() as effectively synchronous.
    // That means there is a time window between the call to close() and
    // the ack by the master process in which we can still receive handles.
    // onconnection() below handles that by sending those handles back to
    // the master.
    if (key === undefined)
      return;

    send({ act: &#39;close&#39;, key });
    handles.delete(key);
    indexes.delete(indexesKey);
    key = undefined;
  }

  function getsockname(out) {
    if (key)
      ObjectAssign(out, message.sockname);

    return 0;
  }

  // 创建Faux handle
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = { close, listen, ref: noop, unref: noop };

  if (message.sockname) {
    handle.getsockname = getsockname;  // TCP handles only.
  }

  assert(handles.has(key) === false);
  // 保存faux handle
  handles.set(key, handle);
  // 执行 net 模块调用 cluster._getServer 函数传进来的回调函数
  cb(0, handle);
}

// 处理请求
// Round-robin connection.
function onconnection(message, handle) {
  // 获取faux handle的key
  const key = message.key;
  // 获取faux hadle
  const server = handles.get(key);
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  // 调用在 net 模块中 setupListenHandle 函数里为该 faux handle 设置的连接处理函数处理请求
  if (accepted)
    server.onconnection(0, handle);
}
Copier après la connexion

至此,所有的内容都联系起来了。

为什么多个子进程可以监听同一个端口

在之前的代码分析中我们可以知道,Cluster 集群会在 Master 进程中创建 Net Server,在 Worker 进程运行创建 HTTP Server 的时候,会将监听地址的信息传入 cluster._getServer 函数创建一个 faux handle 并设置到子进程的 Net Server 上,在 Worker 进程初始化的时候会注册 IPC 通信回调函数,在回调函数内 ,调用在子进程中 Net Server 模块初始化后的 {faux handle}.onconnection 函数,并将传过来的连接的 handle 传入完成请求响应。

如何保证集群工作的健壮性

我们可以在 Master 进程中监听 Worker 进程的 errordisconntectexit 事件,在这些事件中去做对应的处理,例如清理退出的进程并重新 fork,或者使用已经封装好的 npm 包,例如 cfork

Egg.js 多进程模型

在 Egg.js 的多进程模型中,多了另外一个进程类型,即 Agent 进程,该进程主要用于处理多进程不好处理的一些事情还有减少长链接的数量,具体关系如下:

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
Copier après la connexion

egg-cluster 包内,使用了 cfork 包去保证 Worker 进程挂掉后自动重启

问题记录

在我们的一个 Egg 应用内,日志系统并没有使用 Egg 原生的日志,使用了一个内部基于 log4jsrrreee

Des fichiers différents signifient les performances. des deux processus en exécution Ils sont également différents, par exemple : 🎜rrreee🎜C'est pourquoi les variables partout dans le module Cluster peuvent distinguer différents types de processus. Ensuite, comprenons respectivement le processus spécifique des processus principal et enfant🎜.

🎜Processus principal🎜

🎜Dans le code ci-dessus, le processus maître ne fait pas grand-chose, il divise simplement le processus enfant en fonction du nombre de processeurs. dans le code source et jetez un œil approximatif. Les descriptions pertinentes se trouvent dans les commentaires du code🎜rrreeerrreee

🎜Sous-processus🎜

🎜Dans chaque sous-processus, nous créons un serveur HTTP. puis exécutez La fonction d'écoute écoute le port 8000, et l'instance du serveur HTTP est héritée de la chaîne de prototypes Net Server. La fonction écouter est la fonction écoutercode> sur le prototype Net Server. Les détails sont les suivants : 🎜rrreeerrreee🎜En même temps, lorsque nous avons commencé l'analyse, nous avons dit que lors de l'introduction du module Cluster, il sera jugé s'il s'agit d'un processus enfant en fonction du fait qu'il s'agisse d'un processus enfant. l'environnement du processus actuel contient NODE_UNIQUE_ID. S'il s'agit d'un processus enfant, exécutez le fichier <code>child.js🎜🎜🎜Conseils : Si la valeur du message.cmd envoyé dans la communication IPC est préfixée par NODE, il répondra à un événement interne internalMessage🎜🎜rrreee🎜À ce stade, tout le contenu est connecté . 🎜

🎜Pourquoi plusieurs processus enfants peuvent écouter le même port🎜🎜🎜Dans l'analyse de code précédente, nous pouvons savoir que le cluster Cluster créera un Net Server dans le processus Maître et un serveur Net dans le processus Worker Lors de l'exécution de la création d'un serveur HTTP, les informations sur l'adresse d'écoute seront transmises à la fonction cluster._getServer pour créer un faux handle et le définir au serveur Net du processus enfant. Dans le processus Worker. Lors de l'initialisation, la fonction de rappel de communication IPC sera enregistrée dans la fonction de rappel, la fonction {faux handle}.onconnection après l'initialisation du Net. Le module serveur dans le processus enfant est appelé et le handle de connexion transmis est transmis. Entrez la réponse à la demande d'achèvement. 🎜

🎜Comment assurer la robustesse du travail du cluster🎜🎜🎜Nous pouvons surveiller l'erreur et la déconnexion du processus Worker dans le processus maître >, les événements exit, effectuez le traitement correspondant dans ces événements, comme le nettoyage du processus quitté et le re-fork, ou l'utilisation de packages npm encapsulés, tels que cfork🎜

🎜Modèle multi-processus Egg.js🎜🎜🎜Dans le modèle multi-processus d'Egg.js, il existe un autre type de processus, à savoir le processus Agent. Ce processus est principalement utilisé pour gérer certaines choses difficiles à gérer avec plusieurs processus et pour réduire le nombre de liens longs. La relation spécifique est la suivante : 🎜rrreee🎜Dans le egg-clustercode>, <code>cfork pour garantir que le processus Worker redémarre automatiquement après son blocage🎜<h3 data-id="heading-8">🎜Enregistrement du problème🎜🎜🎜🎜Dans l'un de nos Applications Egg, le système de journalisation n'utilise pas les journaux natifs d'Egg, en utilisant une bibliothèque de journaux interne basée sur le package <code>log4js Lors de son utilisation, le Logger requis est étendu à l'objet Application, de sorte que chaque processus Worker. Logger en créera un nouveau lors de l'initialisation, c'est-à-dire qu'il y aura des problèmes avec plusieurs processus écrivant des journaux, mais il n'y a pas de problème d'erreur avec plusieurs processus écrivant des journaux🎜

Dans le processus de traçage du code source, nous avons constaté que bien que log4js fournisse le mode Cluster, le mode Cluster de log4js n'est pas activé dans le package de couche supérieure, donc l'appender de chaque Logger Ils utilisent tous flag a pour ouvrir un flux d'écriture, mais je n'ai pas eu de réponse jusqu'à présentlog4js 虽然提供了 Cluster 模式,但是在上层封装中并没有开启 log4js 的 Cluster 模式,所以每个 Logger 的 appender 都使用 flag a 打开一个写入流,到这里并没有得到答案

后来在 CNode 中找到了答案,在 unix 下使用 flag a 打开的可写流对应的 libuv 文件池实现是 UV_FS_O_APPEND,即 O_APPEND,而 O_APPEND

Plus tard CNode

. L'implémentation du pool de fichiers libuv correspondant au flux inscriptible ouvert à l'aide de flag a sous Unix est ​​UV_FS_O_APPEND, c'est-à-dire O_APPEND, et O_APPEND lui-même est défini comme une opération atomique dans le manuel man. Le noyau garantit l'écriture simultanée de. ce flux inscriptible est sûr et ne nécessite pas de verrous supplémentaires au niveau de la couche application (sauf que l'écriture simultanée sur des systèmes de fichiers de type NFS entraînera la perte ou l'endommagement des informations sur les fichiers), les systèmes de fichiers montés en réseau de type NFS reposent principalement sur la simulation. l'API sous-jacente pour réaliser des opérations de type local, qui ne peuvent évidemment pas être effectuées dans des conditions de concurrence. Restaurez parfaitement ce type d'API d'opération atomique, donc si vos journaux sont écrits sur quelque chose comme le montage sur disque cloud OSS localement, vous ne pouvez pas le faire. écrivez dans plusieurs processus, vous devez la verrouiller manuellement au niveau de la couche d'application

Conclusion

Le tranchant d'une épée vient de l'affûtage, et le parfum des fleurs de prunier vient du froid glacial, et il se précipite vers le canard~ Pour plus de connaissances sur la programmation, veuillez visiter :

Introduction à la programmation🎜 ! ! 🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal
À propos de nous Clause de non-responsabilité Sitemap
Site Web PHP chinois:Formation PHP en ligne sur le bien-être public,Aidez les apprenants PHP à grandir rapidement!