ホームページ > ウェブフロントエンド > jsチュートリアル > Node.js のクラスターについて説明する記事

Node.js のクラスターについて説明する記事

青灯夜游
リリース: 2023-01-22 05:30:01
転載
2941 人が閲覧しました

Node.js のクラスターについて説明する記事

日常業務では、Node.js の使用は比較的表面的です。若いうちにもう少し高度なことを学びましょう。クラスターから始めましょう。

ニコラス・チャン・サンは「質問をしながら勉強するのが良い方法だ」と言っているので、ぜひ試してみましょう。

私が初めてクラスターを使用したとき、次のコードのように、複数の子プロセスが競合せずに同じポートをリッスンできるようにする方法について常に興味がありました。

const cluster = require('cluster')
const net = require('net')
const cpus = require('os').cpus()

if (cluster.isPrimary) {
  for (let i = 0; i < cpus.length; i++) {
    cluster.fork()
  }
} else {
  net
    .createServer(function (socket) {
      socket.on(&#39;data&#39;, function (data) {
        socket.write(`Reply from ${process.pid}: ` + data.toString())
      })
      socket.on(&#39;end&#39;, function () {
        console.log(&#39;Close&#39;)
      })
      socket.write(&#39;Hello!\n&#39;)
    })
    .listen(9999)
}
ログイン後にコピー

このコードはパススルーします。親プロセスfork 複数の子プロセスが出現し、これらの子プロセスはすべてポート 9999 をリッスンし、正常にサービスを提供できます。勉強しましょう。 [関連チュートリアルの推奨事項: nodejs ビデオ チュートリアル プログラミング教育 ]

デバッグ環境の準備

Node.js 公式を学習するライブラリを提供する最善の方法はもちろんデバッグすることなので、最初に環境を準備しましょう。 ※この記事のOSはmacOS Big Sur 11.6.6ですが、それ以外の場合は対応する環境をご用意ください。

Node.js のコンパイル

  • Node.js ソース コードのダウンロード

  • git clone https://github.com/nodejs/node.git
    ログイン後にコピー
次に、後のデバッグを容易にするために、次の 2 つの場所にブレークポイントを追加します:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  debugger;
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return;

  ...
}
ログイン後にコピー
// lib/internal/cluster/child.js
send(message, (reply, handle) => {
  debugger
  if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

  if (handle) {
    // Shared listen socket
    shared(reply, {handle, indexesKey, index}, cb)
  } else {
    // Round-robin.
    rr(reply, {indexesKey, index}, cb)
  }
})
ログイン後にコピー

  • ディレクトリに入り、

  • ./configure --debug
    make -j4
    ログイン後にコピー
を実行します。 ##out/Debug/node

IDE 環境の準備vscode を使用してデバッグと構成を行う

launch.json

で十分です (他の IDE も同様です。自分で解決してください): <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">{ &quot;version&quot;: &quot;0.2.0&quot;, &quot;configurations&quot;: [ { &quot;name&quot;: &quot;Debug C++&quot;, &quot;type&quot;: &quot;cppdbg&quot;, &quot;program&quot;: &quot;/Users/youxingzhi/ayou/node/out/Debug/node&quot;, &quot;request&quot;: &quot;launch&quot;, &quot;args&quot;: [&quot;/Users/youxingzhi/ayou/node/index.js&quot;], &quot;stopAtEntry&quot;: false, &quot;cwd&quot;: &quot;${workspaceFolder}&quot;, &quot;environment&quot;: [], &quot;externalConsole&quot;: false, &quot;MIMode&quot;: &quot;lldb&quot; }, { &quot;name&quot;: &quot;Debug Node&quot;, &quot;type&quot;: &quot;node&quot;, &quot;runtimeExecutable&quot;: &quot;/Users/youxingzhi/ayou/node/out/Debug/node&quot;, &quot;request&quot;: &quot;launch&quot;, &quot;args&quot;: [&quot;--expose-internals&quot;, &quot;--nolazy&quot;], &quot;skipFiles&quot;: [], &quot;program&quot;: &quot;${workspaceFolder}/index.js&quot; } ] }</pre><div class="contentsignin">ログイン後にコピー</div></div> 最初のものは C コードのデバッグに使用され (C/C プラグインをインストールする必要があります)、2 番目のものは C コードをデバッグするために使用されます (C/C プラグインをインストールする必要があります)。 JSコードのデバッグに使用されます。次に、デバッグを開始します。当面は、JS コードをデバッグするための構成を使用できます。

クラスター ソース コードのデバッグ

デバッグ コードを準備します (デバッグ目的のみ。ここで子プロセスを開始するだけで十分です):

debugger
const cluster = require(&#39;cluster&#39;)
const net = require(&#39;net&#39;)

if (cluster.isPrimary) {
  debugger
  cluster.fork()
} else {
  const server = net.createServer(function (socket) {
    socket.on(&#39;data&#39;, function (data) {
      socket.write(`Reply from ${process.pid}: ` + data.toString())
    })
    socket.on(&#39;end&#39;, function () {
      console.log(&#39;Close&#39;)
    })
    socket.write(&#39;Hello!\n&#39;)
  })
  debugger
  server.listen(9999)
}
ログイン後にコピー

明らかに、プログラムは親プロセスと子プロセスの 2 つの部分で分析できます。

親プロセスが最初に入力されます:

require('cluster')

を実行すると、lib/cluster.js が次のようになります。入力された このファイル: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">const childOrPrimary = &amp;#39;NODE_UNIQUE_ID&amp;#39; in process.env ? &amp;#39;child&amp;#39; : &amp;#39;primary&amp;#39; module.exports = require(`internal/cluster/${childOrPrimary}`)</pre><div class="contentsignin">ログイン後にコピー</div></div> は、現在の

process.env

NODE_UNIQUE_ID があるかどうかに基づいて、さまざまなモジュールを導入します。 internal/cluster/primary.js このモジュール: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">... const cluster = new EventEmitter(); ... module.exports = cluster const handles = new SafeMap() cluster.isWorker = false cluster.isMaster = true // Deprecated alias. Must be same as isPrimary. cluster.isPrimary = true cluster.Worker = Worker cluster.workers = {} cluster.settings = {} cluster.SCHED_NONE = SCHED_NONE // Leave it to the operating system. cluster.SCHED_RR = SCHED_RR // Primary distributes connections. ... cluster.schedulingPolicy = schedulingPolicy cluster.setupPrimary = function (options) { ... } // Deprecated alias must be same as setupPrimary cluster.setupMaster = cluster.setupPrimary function setupSettingsNT(settings) { ... } function createWorkerProcess(id, env) { ... } function removeWorker(worker) { ... } function removeHandlesForWorker(worker) { ... } cluster.fork = function (env) { ... }</pre><div class="contentsignin">ログイン後にコピー</div></div>このモジュールは主に

cluster

オブジェクトにいくつかのプロパティとメソッドをマウントします振り返ってみると、私たちはデバッグを続けています。デバッグするとき、if (cluster.isPrimary) ブランチに入ります。コードは非常に単純です。fork で新しい子プロセスを作成するだけです: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">// lib/internal/cluster/primary.js cluster.fork = function (env) { cluster.setupPrimary() const id = ++ids const workerProcess = createWorkerProcess(id, env) const worker = new Worker({ id: id, process: workerProcess, }) ... worker.process.on(&amp;#39;internalMessage&amp;#39;, internal(worker, onmessage)) process.nextTick(emitForkNT, worker) cluster.workers[worker.id] = worker return worker }</pre><div class="contentsignin">ログイン後にコピー</div></div>

cluster.setupPrimary()

: 比較的単純で、いくつかのパラメータなどを初期化します。

createWorkerProcess(id, env)

:<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">// lib/internal/cluster/primary.js function createWorkerProcess(id, env) { const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}`} const execArgv = [...cluster.settings.execArgv] ... return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid, }) }</pre><div class="contentsignin">ログイン後にコピー</div></div>ご覧のとおり、このメソッドは主に子プロセスを開始し、

fork

index.js であり、子プロセスの開始時に環境変数 NODE_UNIQUE_ID が設定されるため、index.jsrequire('cluster')internal/cluster/child.js モジュールが導入されます。

worker.process.on('internalMessage', external(worker, onmessage))

: 子プロセスによって渡されたメッセージをリッスンして処理します。

次に、子プロセスのロジックを入力します。

前述したように、この時点で導入されるのは

internal/cluster/child.js# です。 ## モジュールでは、最初にスキップして下に進みます。

server.listen(9999) を実行すると、Server 上のメソッドが実際に呼び出されます:

// lib/net.js
Server.prototype.listen = function (...args) {
  ...
      listenInCluster(
        this,
        null,
        options.port | 0,
        4,
        backlog,
        undefined,
        options.exclusive
      );
}
ログイン後にコピー
最後に、listenInCluster

が呼び出されます:

// lib/net.js
function listenInCluster(
  server,
  address,
  port,
  addressType,
  backlog,
  fd,
  exclusive,
  flags,
  options
) {
  exclusive = !!exclusive

  if (cluster === undefined) cluster = require(&#39;cluster&#39;)

  if (cluster.isPrimary || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
    return
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
    backlog,
    ...options,
  }
  // Get the primary&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnPrimaryHandle)

  function listenOnPrimaryHandle(err, handle) {
    err = checkBindError(err, port, handle)

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port)
      return server.emit(&#39;error&#39;, ex)
    }

    // Reuse primary&#39;s server handle
    server._handle = handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
  }
}
ログイン後にコピー
これは子プロセスで実行されるため、cluster._getServer(server, serverQuery, listenOnPrimaryHandle)

がend:

// lib/internal/cluster/child.js
// 这里的 cb 就是上面的 listenOnPrimaryHandle
cluster._getServer = function (obj, options, cb) {
  ...
  send(message, (reply, handle) => {
    debugger
    if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb)
    }
  })

  ...
}
ログイン後にコピー
この関数は最終的に queryServer

メッセージを親プロセスに送信します。親プロセスがメッセージを処理した後、コールバック関数が呼び出され、コールバック関数は

cb、これは listenOnPrimaryHandle です。 listenのロジックは親プロセスで実行されているようです。 次に親プロセスに入ります。

親プロセスは queryServer

からメッセージを受信した後、最終的に

queryServer## を呼び出します。 # this Method: <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;">// lib/internal/cluster/primary.js function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return const key = `${message.address}:${message.port}:${message.addressType}:` + `${message.fd}:${message.index}` let handle = handles.get(key) if (handle === undefined) { let address = message.address // Find shortest path for unix sockets because of the ~100 byte limit if ( message.port &lt; 0 &amp;&amp; typeof address === &amp;#39;string&amp;#39; &amp;&amp; process.platform !== &amp;#39;win32&amp;#39; ) { address = path.relative(process.cwd(), address) if (message.address.length &lt; address.length) address = message.address } // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it&amp;#39;s connectionless. There is nothing to send to // the workers except raw datagrams and that&amp;#39;s pointless. if ( schedulingPolicy !== SCHED_RR || message.addressType === &amp;#39;udp4&amp;#39; || message.addressType === &amp;#39;udp6&amp;#39; ) { handle = new SharedHandle(key, address, message) } else { handle = new RoundRobinHandle(key, address, message) } handles.set(key, handle) } ... }</pre><div class="contentsignin">ログイン後にコピー</div></div>ご覧のとおり、これは主に handle の処理です。ここでの

handle

はスケジューリング戦略を指し、 SharedHandleRoundRobinHandle は、それぞれプリエンプションとポーリングの 2 つの戦略に対応します (記事の最後の補足セクションに 2 つの比較の例があります)。 <p>Node.js 中默认是 <code>RoundRobinHandle 策略,可通过环境变量 NODE_CLUSTER_SCHED_POLICY 来修改,取值可以为 noneSharedHandle) 或 rrRoundRobinHandle)。

<span style="font-size: 18px;">SharedHandle</span>

首先,我们来看一下 SharedHandle,由于我们这里是 TCP 协议,所以最后会通过 net._createServerHandle 创建一个 TCP 对象挂载在 handle 属性上(注意这里又有一个 handle,别搞混了):

// lib/internal/cluster/shared_handle.js
function SharedHandle(key, address, {port, addressType, fd, flags}) {
  this.key = key
  this.workers = new SafeMap()
  this.handle = null
  this.errno = 0

  let rval
  if (addressType === &#39;udp4&#39; || addressType === &#39;udp6&#39;)
    rval = dgram._createSocketHandle(address, port, addressType, fd, flags)
  else rval = net._createServerHandle(address, port, addressType, fd, flags)

  if (typeof rval === &#39;number&#39;) this.errno = rval
  else this.handle = rval
}
ログイン後にコピー

createServerHandle 中除了创建 TCP 对象外,还绑定了端口和地址:

// lib/net.js
function createServerHandle(address, port, addressType, fd, flags) {
  ...
  } else {
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }

  if (address || port || isTCP) {
      ...
      err = handle.bind6(address, port, flags);
    } else {
      err = handle.bind(address, port);
    }
  }

  ...
  return handle;
}
ログイン後にコピー

然后,queryServer 中继续执行,会调用 add 方法,最终会将 handle 也就是 TCP 对象传递给子进程:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  ...
  if (!handle.data) handle.data = message.data

  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    const {data} = handles.get(key)

    if (errno) handles.delete(key) // Gives other workers a chance to retry.

    send(
      worker,
      {
        errno,
        key,
        ack: message.seq,
        data,
        ...reply,
      },
      handle // TCP 对象
    )
  })
  ...
}
ログイン後にコピー

之后进入子进程:

子进程收到父进程对于 queryServer 的回复后,会调用 shared

// lib/internal/cluster/child.js
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (obj, options, cb) {
  ...

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
    }
  })
  ...
}
ログイン後にコピー
ログイン後にコピー

shared 中最后会调用 cb 也就是 listenOnPrimaryHandle

// lib/net.js
function listenOnPrimaryHandle(err, handle) {
  err = checkBindError(err, port, handle)

  if (err) {
    const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port)
    return server.emit(&#39;error&#39;, ex)
  }
  // Reuse primary&#39;s server handle 这里的 server 是 index.js 中 net.createServer 返回的那个对象
  server._handle = handle
  // _listen2 sets up the listened handle, it is still named like this
  // to avoid breaking code that wraps this method
  server._listen2(address, port, addressType, backlog, fd, flags)
}
ログイン後にコピー

这里会把 handle 赋值给 server._handle,这里的 serverindex.jsnet.createServer 返回的那个对象,并调用 server._listen2,也就是 setupListenHandle

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd)
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;)
  } else {
    ...
  }

  this[async_id_symbol] = getNewAsyncId(this._handle)
  this._handle.onconnection = onconnection
  this._handle[owner_symbol] = this

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511)

  if (err) {
    const ex = uvExceptionWithHostPort(err, &#39;listen&#39;, address, port)
    this._handle.close()
    this._handle = null
    defaultTriggerAsyncIdScope(
      this[async_id_symbol],
      process.nextTick,
      emitErrorNT,
      this,
      ex
    )
    return
  }
}
ログイン後にコピー

首先会执行 this._handle.onconnection = onconnection,由于客户端请求过来时会调用 this._handle(也就是 TCP 对象)上的 onconnection 方法,也就是会执行lib/net.js 中的 onconnection 方法建立连接,之后就可以通信了。为了控制篇幅,该方法就不继续往下了。

然后调用 listen 监听,注意这里参数 backlog 跟之前不同,不是表示端口,而是表示在拒绝连接之前,操作系统可以挂起的最大连接数量,也就是连接请求的排队数量。我们平时遇到的 listen EADDRINUSE: address already in use 错误就是因为这行代码返回了非 0 的错误。

如果还有其他子进程,也会同样走一遍上述的步骤,不同之处是在主进程中 queryServer 时,由于已经有 handle 了,不需要再重新创建了:

function queryServer(worker, message) {
  debugger;
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return;

  const key =
    `${message.address}:${message.port}:${message.addressType}:` +
    `${message.fd}:${message.index}`;
  let handle = handles.get(key);
  ...
}
ログイン後にコピー

以上内容整理成流程图如下:

Node.js のクラスターについて説明する記事

所谓的 SharedHandle,其实是在多个子进程中共享 TCP 对象的句柄,当客户端请求过来时,多个进程会去竞争该请求的处理权,会导致任务分配不均的问题,这也是为什么需要 RoundRobinHandle 的原因。接下来继续看看这种调度方式。

RoundRobinHandle

// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(
  key,
  address,
  {port, fd, flags, backlog, readableAll, writableAll}
) {
  ...
  this.server = net.createServer(assert.fail)

  ...
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
      backlog,
    })
  }
  ...
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle
    this.handle.onconnection = (err, handle) => {
      this.distribute(err, handle)
    }
    this.server._handle = null
    this.server = null
  })
}
ログイン後にコピー

如上所示,RoundRobinHandle 会调用 net.createServer() 创建一个 server,然后调用 listen 方法,最终会来到 setupListenHandle

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd)
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;)
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;)

    let rval = null

    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if (!address && typeof fd !== &#39;number&#39;) {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags)

      if (typeof rval === &#39;number&#39;) {
        rval = null
        address = DEFAULT_IPV4_ADDR
        addressType = 4
      } else {
        address = DEFAULT_IPV6_ADDR
        addressType = 6
      }
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags)

    if (typeof rval === &#39;number&#39;) {
      const error = uvExceptionWithHostPort(rval, &#39;listen&#39;, address, port)
      process.nextTick(emitErrorNT, this, error)
      return
    }
    this._handle = rval
  }

  this[async_id_symbol] = getNewAsyncId(this._handle)
  this._handle.onconnection = onconnection
  this._handle[owner_symbol] = this

  ...
}
ログイン後にコピー

且由于此时 this._handle 为空,会调用 createServerHandle() 生成一个 TCP 对象作为 _handle。之后就跟 SharedHandle 一样了,最后也会回到子进程:

// lib/internal/cluster/child.js
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (obj, options, cb) {
  ...

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
    }
  })
  ...
}
ログイン後にコピー
ログイン後にコピー

不过由于 RoundRobinHandle 不会传递 handle 给子进程,所以此时会执行 rr

function rr(message, {indexesKey, index}, cb) {
  ...
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = {close, listen, ref: noop, unref: noop}

  if (message.sockname) {
    handle.getsockname = getsockname // TCP handles only.
  }

  assert(handles.has(key) === false)
  handles.set(key, handle)
  debugger
  cb(0, handle)
}
ログイン後にコピー

可以看到,这里构造了一个假的 handle,然后执行 cb 也就是 listenOnPrimaryHandle。最终跟 SharedHandle 一样会调用 setupListenHandle 执行 this._handle.onconnection = onconnection

RoundRobinHandle 逻辑到此就结束了,好像缺了点什么的样子。回顾下,我们给每个子进程中的 server 上都挂载了一个假的 handle,但它跟绑定了端口的 TCP 对象没有任何关系,如果客户端请求过来了,是不会执行它上面的 onconnection 方法的。之所以要这样写,估计是为了保持跟之前 SharedHandle 代码逻辑的统一。

此时,我们需要回到 RoundRobinHandle,有这样一段代码:

// lib/internal/cluster/round_robin_handle.js
this.server.once(&#39;listening&#39;, () => {
  this.handle = this.server._handle
  this.handle.onconnection = (err, handle) => {
    this.distribute(err, handle)
  }
  this.server._handle = null
  this.server = null
})
ログイン後にコピー

listen 执行完后,会触发 listening 事件的回调,这里重写了 handle 上面的 onconnection

所以,当客户端请求过来时,会调用 distribute 在多个子进程中轮询分发,这里又有一个 handle,这里的 handle 姑且理解为 clientHandle,即客户端连接的 handle,别搞混了。总之,最后会将这个 clientHandle 发送给子进程:

// lib/internal/cluster/round_robin_handle.js
RoundRobinHandle.prototype.handoff = function (worker) {
  ...

  const message = { act: &#39;newconn&#39;, key: this.key };
  // 这里的 handle 是 clientHandle
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted) handle.close();
    else this.distribute(0, handle); // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};
ログイン後にコピー

而子进程在 require(&#39;cluster&#39;) 时,已经监听了该事件:

// lib/internal/cluster/child.js
process.on(&#39;internalMessage&#39;, internal(worker, onmessage))
send({act: &#39;online&#39;})

function onmessage(message, handle) {
  if (message.act === &#39;newconn&#39;) onconnection(message, handle)
  else if (message.act === &#39;disconnect&#39;)
    ReflectApply(_disconnect, worker, [true])
}
ログイン後にコピー

最终也同样会走到 net.js 中的 function onconnection(err, clientHandle) 方法。这个方法第二个参数名就叫 clientHandle,这也是为什么前面的 handle 我想叫这个名字的原因。

还是用图来总结下:

Node.js のクラスターについて説明する記事

SharedHandle 不同的是,该调度策略中 onconnection 最开始是在主进程中触发的,然后通过轮询算法挑选一个子进程,将 clientHandle 传递给它。

为什么端口不冲突

cluster 模块的调试就到此告一段落了,接下来我们来回答一下一开始的问题,为什么多个进程监听同一个端口没有报错?

网上有些文章说是因为设置了 SO_REUSEADDR,但其实跟这个没关系。通过上面的分析知道,不管什么调度策略,最终都只会在主进程中对 TCP 对象 bind 一次。

我们可以修改一下源代码来测试一下:

// deps/uv/src/unix/tcp.c 下面的 SO_REUSEADDR 改成 SO_DEBUG
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
ログイン後にコピー

编译后执行发现,我们仍然可以正常使用 cluster 模块。

那这个 SO_REUSEADDR 到底影响的是啥呢?我们继续来研究一下。

SO_REUSEADDR

首先,我们我们知道,下面的代码是会报错的:

const net = require(&#39;net&#39;)
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999)
server2.listen(9999)
ログイン後にコピー

但是,如果我稍微修改一下,就不会报错了:

const net = require(&#39;net&#39;)
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999, &#39;127.0.0.1&#39;)
server2.listen(9999, &#39;10.53.48.67&#39;)
ログイン後にコピー

原因在于 listen 时,如果不指定 address,则相当于绑定了所有地址,当两个 server 都这样做时,请求到来就不知道要给谁处理了。

我们可以类比成找对象,port 是对外貌的要求,address 是对城市的要求。现在甲乙都想要一个 port1米7以上 不限城市的对象,那如果有一个 1米7以上 来自 深圳 的对象,就不知道介绍给谁了。而如果两者都指定了城市就好办多了。

那如果一个指定了 address,一个没有呢?就像下面这样:

const net = require(&#39;net&#39;)
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999, &#39;127.0.0.1&#39;)
server2.listen(9999)
ログイン後にコピー

结果是:设置了 SO_REUSEADDR 可以正常运行,而修改成 SO_DEBUG 的会报错。

还是上面的例子,甲对城市没有限制,乙需要是来自 深圳 的,那当一个对象来自 深圳,我们可以选择优先介绍给乙,非 深圳 的就选择介绍给甲,这个就是 SO_REUSEADDR 的作用。

补充

<span style="font-size: 18px;">SharedHandle</span><span style="font-size: 18px;">RoundRobinHandle</span> 两种模式的对比

先准备下测试代码:

// cluster.js
const cluster = require(&#39;cluster&#39;)
const net = require(&#39;net&#39;)

if (cluster.isMaster) {
  for (let i = 0; i < 4; i++) {
    cluster.fork()
  }
} else {
  const server = net.createServer()
  server.on(&#39;connection&#39;, (socket) => {
    console.log(`PID: ${process.pid}!`)
  })
  server.listen(9997)
}
ログイン後にコピー
// client.js
const net = require(&#39;net&#39;)
for (let i = 0; i < 20; i++) {
  net.connect({port: 9997})
}
ログイン後にコピー

RoundRobin先执行 node cluster.js,然后执行 node client.js,会看到如下输出,可以看到没有任何一个进程的 PID 是紧挨着的。至于为什么没有一直按照一样的顺序,后面再研究一下。

PID: 42904!
PID: 42906!
PID: 42905!
PID: 42904!
PID: 42907!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42904!
ログイン後にコピー

Shared

先执行 NODE_CLUSTER_SCHED_POLICY=none node cluster.js,则 Node.js 会使用 SharedHandle,然后执行 node client.js,会看到如下输出,可以看到同一个 PID 连续输出了多次,所以这种策略会导致进程任务分配不均的现象。就像公司里有些人忙到 996,有些人天天摸鱼,这显然不是老板愿意看到的现象,所以不推荐使用。

PID: 42561!
PID: 42562!
PID: 42561!
PID: 42562!
PID: 42564!
PID: 42561!
PID: 42562!
PID: 42563!
PID: 42561!
PID: 42562!
PID: 42563!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42563!
PID: 42563!
PID: 42564!
PID: 42563!
ログイン後にコピー

更多node相关知识,请访问:nodejs 教程

以上がNode.js のクラスターについて説明する記事の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:juejin.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート