Rumah > hujung hadapan web > tutorial js > Artikel untuk bercakap tentang kelompok dalam Node.js

Artikel untuk bercakap tentang kelompok dalam Node.js

青灯夜游
Lepaskan: 2023-01-22 05:30:01
ke hadapan
2984 orang telah melayarinya

Artikel untuk bercakap tentang kelompok dalam Node.js

Dalam kerja harian, penggunaan Node.js agak cetek. Mari kita pelajari sesuatu yang lebih maju semasa kita masih muda.

Nicholas Zhang San berkata, "Ini adalah cara yang lebih baik untuk belajar dengan soalan", jadi mari kita mencubanya.

Apabila saya mula-mula menggunakan kluster, saya sentiasa ingin tahu tentang cara ia boleh membenarkan berbilang proses anak mendengar port yang sama tanpa konflik, seperti kod berikut:

const cluster = require('cluster')
const net = require('net')
const cpus = require('os').cpus()

if (cluster.isPrimary) {
  for (let i = 0; i < cpus.length; i++) {
    cluster.fork()
  }
} else {
  net
    .createServer(function (socket) {
      socket.on(&#39;data&#39;, function (data) {
        socket.write(`Reply from ${process.pid}: ` + data.toString())
      })
      socket.on(&#39;end&#39;, function () {
        console.log(&#39;Close&#39;)
      })
      socket.write(&#39;Hello!\n&#39;)
    })
    .listen(9999)
}
Salin selepas log masuk

Kod ini melepasi Proses induk fork mempunyai berbilang proses anak, dan proses anak ini semuanya mendengar port 9999 dan boleh menyediakan perkhidmatan seperti biasa. Jom kaji. [Cadangan tutorial berkaitan: tutorial video nodejs, Pengajaran pengaturcaraan]

Sediakan persekitaran penyahpepijatan

Ketahui Node.js Rasmi Cara terbaik untuk menyediakan perpustakaan sudah tentu dengan menyahpepijatnya, jadi mari kita sediakan persekitaran terlebih dahulu. Nota: Sistem pengendalian artikel ini ialah macOS Big Sur 11.6.6 Sila sediakan persekitaran yang sepadan untuk sistem lain.

Kompilasi Node.js

  • Muat turun kod sumber Node.js

git clone https://github.com/nodejs/node.git
Salin selepas log masuk

Kemudian tambahkan titik putus di dua tempat berikut untuk memudahkan penyahpepijatan kemudian:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  debugger;
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return;

  ...
}
Salin selepas log masuk
// lib/internal/cluster/child.js
send(message, (reply, handle) => {
  debugger
  if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

  if (handle) {
    // Shared listen socket
    shared(reply, {handle, indexesKey, index}, cb)
  } else {
    // Round-robin.
    rr(reply, {indexesKey, index}, cb)
  }
})
Salin selepas log masuk
  • Masukkan direktori dan laksanakan

./configure --debug
make -j4
Salin selepas log masuk

akan dijana selepas ituout/Debug/node

Sediakan persekitaran IDE

Gunakan vscode untuk nyahpepijat, konfigurasikannyalaunch.json Itu sahaja (IDE lain adalah serupa, sila selesaikan sendiri):

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Debug C++",
      "type": "cppdbg",
      "program": "/Users/youxingzhi/ayou/node/out/Debug/node",
      "request": "launch",
      "args": ["/Users/youxingzhi/ayou/node/index.js"],
      "stopAtEntry": false,
      "cwd": "${workspaceFolder}",
      "environment": [],
      "externalConsole": false,
      "MIMode": "lldb"
    },
    {
      "name": "Debug Node",
      "type": "node",
      "runtimeExecutable": "/Users/youxingzhi/ayou/node/out/Debug/node",
      "request": "launch",
      "args": ["--expose-internals", "--nolazy"],
      "skipFiles": [],
      "program": "${workspaceFolder}/index.js"
    }
  ]
}
Salin selepas log masuk

Yang pertama adalah untuk menyahpepijat kod C++ (perlu memasang pemalam C/C++), dan yang kedua ialah untuk menyahpepijat kod JS. Seterusnya, anda boleh mula menyahpepijat Buat masa ini, kami boleh menggunakan konfigurasi untuk menyahpepijat kod JS.

Penyahpepijatan sumber kluster

Sediakan kod penyahpepijatan (untuk tujuan penyahpepijatan sahaja, sudah cukup untuk memulakan sub-proses di sini):

debugger
const cluster = require(&#39;cluster&#39;)
const net = require(&#39;net&#39;)

if (cluster.isPrimary) {
  debugger
  cluster.fork()
} else {
  const server = net.createServer(function (socket) {
    socket.on(&#39;data&#39;, function (data) {
      socket.write(`Reply from ${process.pid}: ` + data.toString())
    })
    socket.on(&#39;end&#39;, function () {
      console.log(&#39;Close&#39;)
    })
    socket.write(&#39;Hello!\n&#39;)
  })
  debugger
  server.listen(9999)
}
Salin selepas log masuk

sangat Jelas sekali, program kami boleh dianalisis dalam dua bahagian: proses ibu bapa dan proses anak.

Perkara pertama yang perlu dimasukkan ialah proses induk:

Apabila melaksanakan require(&#39;cluster&#39;), ia akan memasuki fail lib/cluster.js:

const childOrPrimary = &#39;NODE_UNIQUE_ID&#39; in process.env ? &#39;child&#39; : &#39;primary&#39;
module.exports = require(`internal/cluster/${childOrPrimary}`)
Salin selepas log masuk

akan berdasarkan Adakah pada masa ini terdapat process.env pada NODE_UNIQUE_ID untuk memperkenalkan modul yang berbeza pada masa ini, jadi modul internal/cluster/primary.js akan diperkenalkan:

...
const cluster = new EventEmitter();
...
module.exports = cluster

const handles = new SafeMap()
cluster.isWorker = false
cluster.isMaster = true // Deprecated alias. Must be same as isPrimary.
cluster.isPrimary = true
cluster.Worker = Worker
cluster.workers = {}
cluster.settings = {}
cluster.SCHED_NONE = SCHED_NONE // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR // Primary distributes connections.
...
cluster.schedulingPolicy = schedulingPolicy

cluster.setupPrimary = function (options) {
...
}

// Deprecated alias must be same as setupPrimary
cluster.setupMaster = cluster.setupPrimary

function setupSettingsNT(settings) {
...
}

function createWorkerProcess(id, env) {
  ...
}

function removeWorker(worker) {
 ...
}

function removeHandlesForWorker(worker) {
 ...
}

cluster.fork = function (env) {
  ...
}
Salin selepas log masuk

Modul ini ialah? terutamanya pada objek cluster Sesetengah sifat dan kaedah dipasang dan dieksport. Kami akan melihat semula ini kemudian dan meneruskan penyahpepijatan. Apabila menyahpepijat, anda akan memasuki cawangan if (cluster.isPrimary) Kod ini sangat mudah, cuma fork mencipta sub-proses baharu:

// lib/internal/cluster/primary.js
cluster.fork = function (env) {
  cluster.setupPrimary()
  const id = ++ids
  const workerProcess = createWorkerProcess(id, env)
  const worker = new Worker({
    id: id,
    process: workerProcess,
  })

  ...

  worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage))
  process.nextTick(emitForkNT, worker)
  cluster.workers[worker.id] = worker
  return worker
}
Salin selepas log masuk

cluster.setupPrimary(): agak mudah, mulakan beberapa parameter dan sebagainya. pada.

createWorkerProcess(id, env):

// lib/internal/cluster/primary.js
function createWorkerProcess(id, env) {
  const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}`}
  const execArgv = [...cluster.settings.execArgv]

  ...

  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid,
  })
}
Salin selepas log masuk

Seperti yang anda lihat, kaedah ini terutamanya memulakan sub-proses melalui fork untuk melaksanakan index.js kami dan apabila memulakan sub-proses Pembolehubah persekitaran NODE_UNIQUE_ID ditetapkan, supaya apabila index.js disertakan dalam require(&#39;cluster&#39;), modul internal/cluster/child.js diperkenalkan.

worker.process.on(&#39;internalMessage&#39;, internal(worker, onmessage)): Dengar mesej yang dihantar oleh proses kanak-kanak dan proseskannya.

Kemudian kita masukkan logik sub-proses:

Seperti yang dinyatakan sebelum ini, modul internal/cluster/child.js diperkenalkan pada masa ini Mari kita langkau dahulu dan teruskan . , apabila server.listen(9999) dilaksanakan, kaedah pada Server sebenarnya dipanggil:

// lib/net.js
Server.prototype.listen = function (...args) {
  ...
      listenInCluster(
        this,
        null,
        options.port | 0,
        4,
        backlog,
        undefined,
        options.exclusive
      );
}
Salin selepas log masuk

Seperti yang anda lihat, listenInCluster akhirnya dipanggil:

// lib/net.js
function listenInCluster(
  server,
  address,
  port,
  addressType,
  backlog,
  fd,
  exclusive,
  flags,
  options
) {
  exclusive = !!exclusive

  if (cluster === undefined) cluster = require(&#39;cluster&#39;)

  if (cluster.isPrimary || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
    return
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
    backlog,
    ...options,
  }
  // Get the primary&#39;s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnPrimaryHandle)

  function listenOnPrimaryHandle(err, handle) {
    err = checkBindError(err, port, handle)

    if (err) {
      const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port)
      return server.emit(&#39;error&#39;, ex)
    }

    // Reuse primary&#39;s server handle
    server._handle = handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
  }
}
Salin selepas log masuk

Kerana ia adalah Ia dilaksanakan dalam proses anak, jadi cluster._getServer(server, serverQuery, listenOnPrimaryHandle) akan dipanggil pada akhirnya:

// lib/internal/cluster/child.js
// 这里的 cb 就是上面的 listenOnPrimaryHandle
cluster._getServer = function (obj, options, cb) {
  ...
  send(message, (reply, handle) => {
    debugger
    if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb)
    }
  })

  ...
}
Salin selepas log masuk

Fungsi ini akhirnya akan menghantar mesej queryServer ke proses induk memprosesnya, ia akan memanggil fungsi panggil balik Dalam fungsi panggil balik Akan memanggil cb iaitu listenOnPrimaryHandle. Nampaknya logik listen dilakukan dalam proses induk.

Seterusnya masukkan proses induk:

Selepas proses induk menerima mesej daripada queryServer, ia akhirnya akan memanggil kaedah queryServer:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return

  const key =
    `${message.address}:${message.port}:${message.addressType}:` +
    `${message.fd}:${message.index}`
  let handle = handles.get(key)

  if (handle === undefined) {
    let address = message.address

    // Find shortest path for unix sockets because of the ~100 byte limit
    if (
      message.port < 0 &&
      typeof address === &#39;string&#39; &&
      process.platform !== &#39;win32&#39;
    ) {
      address = path.relative(process.cwd(), address)

      if (message.address.length < address.length) address = message.address
    }

    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it&#39;s connectionless. There is nothing to send to
    // the workers except raw datagrams and that&#39;s pointless.
    if (
      schedulingPolicy !== SCHED_RR ||
      message.addressType === &#39;udp4&#39; ||
      message.addressType === &#39;udp6&#39;
    ) {
      handle = new SharedHandle(key, address, message)
    } else {
      handle = new RoundRobinHandle(key, address, message)
    }

    handles.set(key, handle)
  }

  ...
}
Salin selepas log masuk

Seperti yang anda lihat, ini terutamanya mengenai pemprosesan handle handle di sini merujuk kepada strategi penjadualan, yang dibahagikan kepada SharedHandle dan RoundRobinHandle, yang sepadan dengan dua strategi. preemptive dan pengundian masing-masing (artikel Terdapat contoh perbandingan antara kedua-duanya dalam bahagian tambahan akhir).

Node.js 中默认是 RoundRobinHandle 策略,可通过环境变量 NODE_CLUSTER_SCHED_POLICY 来修改,取值可以为 noneSharedHandle) 或 rrRoundRobinHandle)。

<span style="font-size: 18px;">SharedHandle</span>

首先,我们来看一下 SharedHandle,由于我们这里是 TCP 协议,所以最后会通过 net._createServerHandle 创建一个 TCP 对象挂载在 handle 属性上(注意这里又有一个 handle,别搞混了):

// lib/internal/cluster/shared_handle.js
function SharedHandle(key, address, {port, addressType, fd, flags}) {
  this.key = key
  this.workers = new SafeMap()
  this.handle = null
  this.errno = 0

  let rval
  if (addressType === &#39;udp4&#39; || addressType === &#39;udp6&#39;)
    rval = dgram._createSocketHandle(address, port, addressType, fd, flags)
  else rval = net._createServerHandle(address, port, addressType, fd, flags)

  if (typeof rval === &#39;number&#39;) this.errno = rval
  else this.handle = rval
}
Salin selepas log masuk

createServerHandle 中除了创建 TCP 对象外,还绑定了端口和地址:

// lib/net.js
function createServerHandle(address, port, addressType, fd, flags) {
  ...
  } else {
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }

  if (address || port || isTCP) {
      ...
      err = handle.bind6(address, port, flags);
    } else {
      err = handle.bind(address, port);
    }
  }

  ...
  return handle;
}
Salin selepas log masuk

然后,queryServer 中继续执行,会调用 add 方法,最终会将 handle 也就是 TCP 对象传递给子进程:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  ...
  if (!handle.data) handle.data = message.data

  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    const {data} = handles.get(key)

    if (errno) handles.delete(key) // Gives other workers a chance to retry.

    send(
      worker,
      {
        errno,
        key,
        ack: message.seq,
        data,
        ...reply,
      },
      handle // TCP 对象
    )
  })
  ...
}
Salin selepas log masuk

之后进入子进程:

子进程收到父进程对于 queryServer 的回复后,会调用 shared

// lib/internal/cluster/child.js
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (obj, options, cb) {
  ...

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
    }
  })
  ...
}
Salin selepas log masuk
Salin selepas log masuk

shared 中最后会调用 cb 也就是 listenOnPrimaryHandle

// lib/net.js
function listenOnPrimaryHandle(err, handle) {
  err = checkBindError(err, port, handle)

  if (err) {
    const ex = exceptionWithHostPort(err, &#39;bind&#39;, address, port)
    return server.emit(&#39;error&#39;, ex)
  }
  // Reuse primary&#39;s server handle 这里的 server 是 index.js 中 net.createServer 返回的那个对象
  server._handle = handle
  // _listen2 sets up the listened handle, it is still named like this
  // to avoid breaking code that wraps this method
  server._listen2(address, port, addressType, backlog, fd, flags)
}
Salin selepas log masuk

这里会把 handle 赋值给 server._handle,这里的 serverindex.jsnet.createServer 返回的那个对象,并调用 server._listen2,也就是 setupListenHandle

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd)
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;)
  } else {
    ...
  }

  this[async_id_symbol] = getNewAsyncId(this._handle)
  this._handle.onconnection = onconnection
  this._handle[owner_symbol] = this

  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511)

  if (err) {
    const ex = uvExceptionWithHostPort(err, &#39;listen&#39;, address, port)
    this._handle.close()
    this._handle = null
    defaultTriggerAsyncIdScope(
      this[async_id_symbol],
      process.nextTick,
      emitErrorNT,
      this,
      ex
    )
    return
  }
}
Salin selepas log masuk

首先会执行 this._handle.onconnection = onconnection,由于客户端请求过来时会调用 this._handle(也就是 TCP 对象)上的 onconnection 方法,也就是会执行lib/net.js 中的 onconnection 方法建立连接,之后就可以通信了。为了控制篇幅,该方法就不继续往下了。

然后调用 listen 监听,注意这里参数 backlog 跟之前不同,不是表示端口,而是表示在拒绝连接之前,操作系统可以挂起的最大连接数量,也就是连接请求的排队数量。我们平时遇到的 listen EADDRINUSE: address already in use 错误就是因为这行代码返回了非 0 的错误。

如果还有其他子进程,也会同样走一遍上述的步骤,不同之处是在主进程中 queryServer 时,由于已经有 handle 了,不需要再重新创建了:

function queryServer(worker, message) {
  debugger;
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return;

  const key =
    `${message.address}:${message.port}:${message.addressType}:` +
    `${message.fd}:${message.index}`;
  let handle = handles.get(key);
  ...
}
Salin selepas log masuk

以上内容整理成流程图如下:

Artikel untuk bercakap tentang kelompok dalam Node.js

所谓的 SharedHandle,其实是在多个子进程中共享 TCP 对象的句柄,当客户端请求过来时,多个进程会去竞争该请求的处理权,会导致任务分配不均的问题,这也是为什么需要 RoundRobinHandle 的原因。接下来继续看看这种调度方式。

RoundRobinHandle

// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(
  key,
  address,
  {port, fd, flags, backlog, readableAll, writableAll}
) {
  ...
  this.server = net.createServer(assert.fail)

  ...
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
      backlog,
    })
  }
  ...
  this.server.once(&#39;listening&#39;, () => {
    this.handle = this.server._handle
    this.handle.onconnection = (err, handle) => {
      this.distribute(err, handle)
    }
    this.server._handle = null
    this.server = null
  })
}
Salin selepas log masuk

如上所示,RoundRobinHandle 会调用 net.createServer() 创建一个 server,然后调用 listen 方法,最终会来到 setupListenHandle

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug(&#39;setupListenHandle&#39;, address, port, addressType, backlog, fd)
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don&#39;t need to do this.
  if (this._handle) {
    debug(&#39;setupListenHandle: have a handle already&#39;)
  } else {
    debug(&#39;setupListenHandle: create a handle&#39;)

    let rval = null

    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if (!address && typeof fd !== &#39;number&#39;) {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags)

      if (typeof rval === &#39;number&#39;) {
        rval = null
        address = DEFAULT_IPV4_ADDR
        addressType = 4
      } else {
        address = DEFAULT_IPV6_ADDR
        addressType = 6
      }
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags)

    if (typeof rval === &#39;number&#39;) {
      const error = uvExceptionWithHostPort(rval, &#39;listen&#39;, address, port)
      process.nextTick(emitErrorNT, this, error)
      return
    }
    this._handle = rval
  }

  this[async_id_symbol] = getNewAsyncId(this._handle)
  this._handle.onconnection = onconnection
  this._handle[owner_symbol] = this

  ...
}
Salin selepas log masuk

且由于此时 this._handle 为空,会调用 createServerHandle() 生成一个 TCP 对象作为 _handle。之后就跟 SharedHandle 一样了,最后也会回到子进程:

// lib/internal/cluster/child.js
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (obj, options, cb) {
  ...

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === &#39;function&#39;) obj._setServerData(reply.data)

    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
    }
  })
  ...
}
Salin selepas log masuk
Salin selepas log masuk

不过由于 RoundRobinHandle 不会传递 handle 给子进程,所以此时会执行 rr

function rr(message, {indexesKey, index}, cb) {
  ...
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it&#39;s backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = {close, listen, ref: noop, unref: noop}

  if (message.sockname) {
    handle.getsockname = getsockname // TCP handles only.
  }

  assert(handles.has(key) === false)
  handles.set(key, handle)
  debugger
  cb(0, handle)
}
Salin selepas log masuk

可以看到,这里构造了一个假的 handle,然后执行 cb 也就是 listenOnPrimaryHandle。最终跟 SharedHandle 一样会调用 setupListenHandle 执行 this._handle.onconnection = onconnection

RoundRobinHandle 逻辑到此就结束了,好像缺了点什么的样子。回顾下,我们给每个子进程中的 server 上都挂载了一个假的 handle,但它跟绑定了端口的 TCP 对象没有任何关系,如果客户端请求过来了,是不会执行它上面的 onconnection 方法的。之所以要这样写,估计是为了保持跟之前 SharedHandle 代码逻辑的统一。

此时,我们需要回到 RoundRobinHandle,有这样一段代码:

// lib/internal/cluster/round_robin_handle.js
this.server.once(&#39;listening&#39;, () => {
  this.handle = this.server._handle
  this.handle.onconnection = (err, handle) => {
    this.distribute(err, handle)
  }
  this.server._handle = null
  this.server = null
})
Salin selepas log masuk

listen 执行完后,会触发 listening 事件的回调,这里重写了 handle 上面的 onconnection

所以,当客户端请求过来时,会调用 distribute 在多个子进程中轮询分发,这里又有一个 handle,这里的 handle 姑且理解为 clientHandle,即客户端连接的 handle,别搞混了。总之,最后会将这个 clientHandle 发送给子进程:

// lib/internal/cluster/round_robin_handle.js
RoundRobinHandle.prototype.handoff = function (worker) {
  ...

  const message = { act: &#39;newconn&#39;, key: this.key };
  // 这里的 handle 是 clientHandle
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted) handle.close();
    else this.distribute(0, handle); // Worker is shutting down. Send to another.

    this.handoff(worker);
  });
};
Salin selepas log masuk

而子进程在 require(&#39;cluster&#39;) 时,已经监听了该事件:

// lib/internal/cluster/child.js
process.on(&#39;internalMessage&#39;, internal(worker, onmessage))
send({act: &#39;online&#39;})

function onmessage(message, handle) {
  if (message.act === &#39;newconn&#39;) onconnection(message, handle)
  else if (message.act === &#39;disconnect&#39;)
    ReflectApply(_disconnect, worker, [true])
}
Salin selepas log masuk

最终也同样会走到 net.js 中的 function onconnection(err, clientHandle) 方法。这个方法第二个参数名就叫 clientHandle,这也是为什么前面的 handle 我想叫这个名字的原因。

还是用图来总结下:

Artikel untuk bercakap tentang kelompok dalam Node.js

SharedHandle 不同的是,该调度策略中 onconnection 最开始是在主进程中触发的,然后通过轮询算法挑选一个子进程,将 clientHandle 传递给它。

为什么端口不冲突

cluster 模块的调试就到此告一段落了,接下来我们来回答一下一开始的问题,为什么多个进程监听同一个端口没有报错?

网上有些文章说是因为设置了 SO_REUSEADDR,但其实跟这个没关系。通过上面的分析知道,不管什么调度策略,最终都只会在主进程中对 TCP 对象 bind 一次。

我们可以修改一下源代码来测试一下:

// deps/uv/src/unix/tcp.c 下面的 SO_REUSEADDR 改成 SO_DEBUG
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
Salin selepas log masuk

编译后执行发现,我们仍然可以正常使用 cluster 模块。

那这个 SO_REUSEADDR 到底影响的是啥呢?我们继续来研究一下。

SO_REUSEADDR

首先,我们我们知道,下面的代码是会报错的:

const net = require(&#39;net&#39;)
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999)
server2.listen(9999)
Salin selepas log masuk

但是,如果我稍微修改一下,就不会报错了:

const net = require(&#39;net&#39;)
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999, &#39;127.0.0.1&#39;)
server2.listen(9999, &#39;10.53.48.67&#39;)
Salin selepas log masuk

原因在于 listen 时,如果不指定 address,则相当于绑定了所有地址,当两个 server 都这样做时,请求到来就不知道要给谁处理了。

我们可以类比成找对象,port 是对外貌的要求,address 是对城市的要求。现在甲乙都想要一个 port1米7以上 不限城市的对象,那如果有一个 1米7以上 来自 深圳 的对象,就不知道介绍给谁了。而如果两者都指定了城市就好办多了。

那如果一个指定了 address,一个没有呢?就像下面这样:

const net = require(&#39;net&#39;)
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999, &#39;127.0.0.1&#39;)
server2.listen(9999)
Salin selepas log masuk

结果是:设置了 SO_REUSEADDR 可以正常运行,而修改成 SO_DEBUG 的会报错。

还是上面的例子,甲对城市没有限制,乙需要是来自 深圳 的,那当一个对象来自 深圳,我们可以选择优先介绍给乙,非 深圳 的就选择介绍给甲,这个就是 SO_REUSEADDR 的作用。

补充

<span style="font-size: 18px;">SharedHandle</span><span style="font-size: 18px;">RoundRobinHandle</span> 两种模式的对比

先准备下测试代码:

// cluster.js
const cluster = require(&#39;cluster&#39;)
const net = require(&#39;net&#39;)

if (cluster.isMaster) {
  for (let i = 0; i < 4; i++) {
    cluster.fork()
  }
} else {
  const server = net.createServer()
  server.on(&#39;connection&#39;, (socket) => {
    console.log(`PID: ${process.pid}!`)
  })
  server.listen(9997)
}
Salin selepas log masuk
// client.js
const net = require(&#39;net&#39;)
for (let i = 0; i < 20; i++) {
  net.connect({port: 9997})
}
Salin selepas log masuk

RoundRobin先执行 node cluster.js,然后执行 node client.js,会看到如下输出,可以看到没有任何一个进程的 PID 是紧挨着的。至于为什么没有一直按照一样的顺序,后面再研究一下。

PID: 42904!
PID: 42906!
PID: 42905!
PID: 42904!
PID: 42907!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42904!
Salin selepas log masuk

Shared

先执行 NODE_CLUSTER_SCHED_POLICY=none node cluster.js,则 Node.js 会使用 SharedHandle,然后执行 node client.js,会看到如下输出,可以看到同一个 PID 连续输出了多次,所以这种策略会导致进程任务分配不均的现象。就像公司里有些人忙到 996,有些人天天摸鱼,这显然不是老板愿意看到的现象,所以不推荐使用。

PID: 42561!
PID: 42562!
PID: 42561!
PID: 42562!
PID: 42564!
PID: 42561!
PID: 42562!
PID: 42563!
PID: 42561!
PID: 42562!
PID: 42563!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42563!
PID: 42563!
PID: 42564!
PID: 42563!
Salin selepas log masuk

更多node相关知识,请访问:nodejs 教程

Atas ialah kandungan terperinci Artikel untuk bercakap tentang kelompok dalam Node.js. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:juejin.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan