NodeJS Cluster模块源码学习

栏目: Node.js · 发布时间: 6年前

内容简介:一段常见的示例代码引出如下问题:

一段常见的示例代码

const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  // 根据cpu核心数出fork相同数量的子进程
} else {
  // 用http模块创建server监听某一个端口
}
复制代码

引出如下问题:

  1. cluster 模块如何区分子进程和主进程?

  2. 代码中没有在主进程中创建服务器,那么如何主进程如何承担代理服务器的职责?

  3. 多个子进程共同侦听同一个端口为什么不会造成端口 reuse error

1. cluster 模块如何区分主进程/子进程

cluster.js - 源码

const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);
复制代码

结论: 判断环境变量中是否含有NODE_UNIQUE_ID, 有则为子进程,没有则为主进程

1.1 isMaster & isWorker

这样的话, 在对应的文件中 isMasterisWorker 的值就明确啦

// child.js
module.exports = cluster;

cluster.isWorker = true;
cluster.isMaster = false;

// master.js
module.exports = cluster;

cluster.isWorker = false;
cluster.isMaster = true;
复制代码

那么接下来的问题是: NODE_UNIQUE_ID从哪里来?

1.2 NODE_UNIQUE_ID 从哪里来的?

internal/cluster/master.js 文件中搜索 NODE_UNIQUE_ID ----> 上层为 createWorkerProcess 函数 ----> 上层为 cluster.fork 函数

master.js 源码中相关部分

const { fork } = require('child_process');

cluster.workers = {}

var ids = 0;

cluster.fork = function(env) {
  const id = ++ ids;
  const workerProcess = createWorkerProcess(id, env);
  const worker = new Worker({
    id: id,
    process: workerProcess
  });
  cluster.workers[worker.id] = worker;
  return worker
}

function createWorkerProcess(id, env) {
  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
  
  return fork(args, {
    env: workerEnv
  })
}
复制代码

结论: 变量NODE_UNIQUE_ID是在主进程fork子进程时传递进去的参数,因此采用cluster.fork创建的子进程是一定包含NODE_UNIQUE_ID的,而直接使用child_process.fork的子进程是没有NODE_UNIQUE_ID的

并且, NODE_UNIQUE_ID 将作为主进程中存储活跃的工作进程对象的键值

2. 主进程中是否存在TCP服务器, 如果有, 什么时候创建的?

继续描述一下这个问题的由来:

const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  // 根据cpu核心数出fork相同数量的子进程
} else {
  // 用http模块创建server监听某一个端口
}
复制代码

并没有在 cluster.isMaster 条件语句中创建服务器 , 也没有提供服务器相关的路径,接口。而主进程又需要承担代理服务器的 职责,那么主进程中是否存在 TCP 服务器?

我们来猜猜看可能的步骤

  • 子进程会执行 http.createServer

  • http 模块会调用 net 模块, 因为 http.Server 继承 net.Server

  • 同时侦听端口, 创建 net.Server 实例, 创建的实例调用 listen(port) , 等待链接

这时如果主进程要创建服务器就需要把 创建服务器相关信息给主进程 , 继续猜测

  • 假设主进程已经拿到了服务器相关的信息, 主进程自己来创建

  • 后面的 fork 子进程就不用自己创建了,而是从主进程中 get 到相关数据

既然要在主进程需要得到完整的创建服务器相关信息, 那么很可能在 net 模块 listen 相关方法中进行处理

2.1 在源码中找答案

github net 模块源码

Server.prototype.listen 找找看,什么时候把服务器相关信息传递给主进程了?

Server.prototype.listen = function(...args) {
  // 无视其他的判断逻辑, 直达它的内心!
  if (成功) {
    listenInCluster()
    return this
  } else {
    // 无视
  }
}
复制代码

总的来说就是: Server.prototype.listen 函数中,在成功进入条件语句后所有的情况都执行了 listenInCluster 函数后返回

接下来看 listenInCluster 函数

function listenInCluster(server, 创建服务器需要的数据) {
  if (cluster === undefined) cluster = require('cluster')
  // 判断是否是主进程
  if (cluster.isMaster) {
    server._listen2(创建服务器需要的数据)
    return
  }
  // 创建服务器需要的数据
  const serverQuery = {
      address: address,
      port: port,
      addressType: addressType,
      fd: fd,
      flags,
    };
  // 只剩下子进程
  cluster._getServer(server, 创建服务器需要的数据, listenOnMasterHandle);
 
  function listenOnMasterHandle(err, handle) {
    server._handle = handle
    server._listen2(创建服务器需要的数据)
  }
}
复制代码

按照前面的推断: 子进程会给主进程发送创建server需要的数据, 主进程去创建

所以接下来去看 cluster 模块的 child._getServer 函数

cluster._getServer = function(obj, options, cb) {
  // 组装发送的数据
  const message = {
    act: 'queryServer',
    ...options,
  }
  // 发送数据
  send(message, (reply, handle) => {
  
  })
}
复制代码

那么接下来主进程就应该对 queryServer 作出想要的处理

具体可以看 cluster/master.js

const RoundRobinHandle = require('internal/cluster/round_robin_handle');

const handles = new Map()

function onmessage(message, handle) {
  if (message.act === 'queryServer') {
    queryServer(worker, message)
  }
}
queryServer(worker, message) {
  const key = `${message.address}:${message.port}:${message.addressType}:` +
              `${message.fd}:${message.index}`;
  const constructor = RoundRobinHandle
  
  let handle = new constructor(创建服务器相关信息)
  
  handles.set(key, handle);
}
复制代码

终于要到终点了:

internal/cluster/round_robin_handle.js

function RoundRobinHandle(创建服务器相关信息) {
  this.server = net.createServer()
  this.server.listen(.....)
}
复制代码

2.2 主进程在 cluster 模式下如何创建服务器的结论

主进程 fork 子进程, 子进程中有显式创建服务器的操作,但实际上在 cluster 模式下, 子进程是把创建服务器所需要的数据发送给主进程, 主进程来隐式创建 TCP 服务器

流程图

NodeJS Cluster模块源码学习

3. 为什么多个子进程可以监听同一个端口?

这个问题可以转换为: 子进程中有没有也创建一个服务器,同时侦听某个端口呢?

其实,上面的源码分析中可以得出结论: 子进程中确实创建了 net.Server 对象,可是它没有像主进程那样在 libuv 层构建 socket 句柄,子进程的 net.Server 对象使用的是一个假句柄来'欺骗'使用者端口已侦听

3.1 首先要明确默认的调度策略: round-robin

这部分可以参考文章 Node.js V0.12 新特性之 Cluster 轮转法负载均衡

主要就是说: Node.js v0.12 引入了 round-robin方式 , 用轮转法来分配请求, 每个子进程的获取的时间的机会都是均等的(windows除外)

源码在 internal/cluster/master.js

var schedulingPolicy = {
  'none': SCHED_NONE,
  'rr': SCHED_RR
}[process.env.NODE_CLUSTER_SCHED_POLICY];

if (schedulingPolicy === undefined) {
  // FIXME Round-robin doesn't perform well on Windows right now due to the
  // way IOCP is wired up.
  schedulingPolicy = (process.platform === 'win32') ? SCHED_NONE : SCHED_RR;
}

cluster.schedulingPolicy = schedulingPolicy;
复制代码

3.2 证明子进程拿到的是假句柄

上面说明了:默认的调度策略是 round-robin , 那么子进程将创建服务器的数据发送给主进程, 当主进程发送创建服务器成功的消息后,子进程会执行回调函数

源码在 internal/cluster/child.js _getServer中

cluster._getServer = function(obj, options, cb) {
  const indexesKey = [address,
                      options.port,
                      options.addressType,
                      options.fd ].join(':');
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);
      
    // 这里可以反推出主进程返回的handle为null
    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
}
复制代码

rr 函数, 注意这里的回调函数其实就是 net 模块中的 listenOnMasterHandle 方法

function rr(message, indexesKey, cb) {
  const key = message.key
  const handle = { close, listen, ref: noop, unref: noop };
  handles.set(key, handle)
  // 将假句柄传递给上层的net.Server
  cb(0, handle)
}
复制代码

所以结论是这样: 子进程压根没有创建底层的服务端 socket 做侦听,所以在子进程创建的 HTTP 服务器侦听的端口根本不会出现端口复用的情况

3.3 子进程没有创建底层socket, 如何接收请求和发送响应呢?

显而易见:主进程的服务器中会创建 RoundRobinHandle 决定分发请求给哪一个子进程,筛选出子进程后发送 newconn 消息给对应的子进程

4. 请求分发策略 RoundRobin

源码见 internal/cluster/round_robin_handle

module.exports = RoundRobinHandle

function RoundRobinHandle(创建服务器需要的参数) {
  // 存储空闲的子进程
  this.free = []
  // 存放待处理的用户请求
  this.handles = []
}
// 负责筛选出处理请求的子进程
RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle)
  const worker = this.free.shift()
  
  if (worker) {
    this.handoff(worker)
  }
}
// 获取请求,并通过IPC发送句柄handle和newconn消息,等待子进程返回
RoundRobinHandle.prototype.handoff = function(worker) {
  const handle = this.handles.shift()
  
  if (handle === undefined) {
    this.free.push(worker)
    return
  }
  
  const message = { act: 'newconn', key: this.key }
  
  sendHelper(worker.process, message, handle, reply => {
    if (reply.accepted)
      handle.close();
    // 某个子进程办事不力,给下一个子进程再试试  
    else
      this.distribute(0, handle)  
    this.handoff(worker)  
  })
}
复制代码

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Compilers

Compilers

Alfred V. Aho、Monica S. Lam、Ravi Sethi、Jeffrey D. Ullman / Addison Wesley / 2006-9-10 / USD 186.80

This book provides the foundation for understanding the theory and pracitce of compilers. Revised and updated, it reflects the current state of compilation. Every chapter has been completely revised ......一起来看看 《Compilers》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

html转js在线工具
html转js在线工具

html转js在线工具