Spark 源码分析(二): Driver 注册及启动

栏目: 编程工具 · 发布时间: 4年前

内容简介:上一篇文章已经已经执行到 Client 向 masterEndpoint 发送了 RequestSubmitDriver 信息,下面就看看 master 怎么注册 driver 信息,并且怎么让 worker 去启动 driver 的。这个 Master 就是前面 Client 发送的对象,是一个 ThreadSafeRpcEndpoint。内部的这个方法内部会根据发送过来的消息做模式匹配,我们找到 Client 发送过来的 RequestSubmitDriver 这个消息对应代码,如下:

上一篇文章已经已经执行到 Client 向 masterEndpoint 发送了 RequestSubmitDriver 信息,下面就看看 master 怎么注册 driver 信息,并且怎么让 worker 去启动 driver 的。

一,org.apache.spark.deploy.master.Master

这个 Master 就是前面 Client 发送的对象,是一个 ThreadSafeRpcEndpoint。内部的 receiveAndReply 这个方法在监听外部发来信息。下面就来看这个方法。

1,receiveAndReply 方法

这个方法内部会根据发送过来的消息做模式匹配,我们找到 Client 发送过来的 RequestSubmitDriver 这个消息对应代码,如下:

// 匹配到 Client 发送过来的消息
case RequestSubmitDriver(description) =>
  		// 判断当前 master 的状态是否为 alive
      if (state != RecoveryState.ALIVE) {
        // 如果不是 alive 则回复 driver 提交失败
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        // 根据 client 发过来的 driver 信息创建 driver,然后持久化 driver
        // 然后将 driver 加入到等待队列中去
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        // 将 driver 加入到 HashSet 中去
        drivers.add(driver)
          
        // 开始调度
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }
复制代码

这段代码,做了这么一些操作:判断当前 master 的状态是否为 alive ,如果不是则回复消息说:提交失败,如果是则根据传递过来的 driver 信息创建 driver 对象(通过 createDriver 方法创建)并将其持久化,加入到等待队列中去,然后开始执行调度算法 schduler。

这里涉及到连个方法,分别可以看一下,一个是 createDriver 方法,一个是 schduler 方法。

2,createDriver 方法

// 创建 driver 对象
private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    // 通过系统当前时间生成一个 driverId
    // 然后将系统当前时间,driverId,DriverDescription,日期 这些信息封装成一个 DriverInfo
    new DriverInfo(now, newDriverId(date), desc, date)
  }
复制代码

这个方法主要是通过当前时间生成一个 driverId,然后将当前时间,DriverDescription 等参数封装成一个 DriverInfo 对象。

3,schduler 方法

该方法在 master 中会被多次调用,每当 driver 的等待队列中数据发生变动或者集群资源发生变化都会掉用这个方法。这个方法主要是为当前 driver 的等待队列分配资源的。

private def schedule(): Unit = {
  	// 首先判断当前 master 的状态是否为 alive 的,如果不是 alive 则不往下执行
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Random.shuffle 这个方法主要是随机分散各个元素,具体代码可以点进去看
  	// 这里主要是将集群中 state 为 alive 的 worker 帅选出来,然后随机打乱
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    
    // 当前 alive 的 worker 数量
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
      
    // 将等待分配资源的 driver 队列中的所有 driver 进行遍历
    // 然后为每个 driver 遍历一遍所有的 alive worker,当碰到 worker 的可用内存和比当前队列中
    // 等待的 driver 所需要的内存要大并且 worker 的 core 数量也满足 driver 的需求时
    // 就会调用 launcherDriver 方法去将 driver 发送对应的 worker 上去执行
    for (driver <- waitingDrivers.toList) { 
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        // 找到符合条件的 worker
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          // 将该 driver 从等待队列中移除
          waitingDrivers -= driver
          // 标记当前 driver 为 launched
          launched = true
        }
        
        // 移到下一个 driver 上
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
  	
  	// 调用 startExecutorsOnWorkers 方法
    startExecutorsOnWorkers()
  }
复制代码

这个 schduler 方法会遍历等待分配资源的 driver 队列,为每个 driver 遍历一遍 alive 的 worker,找到资源满足的 worker,然后调用 launchDriver 方法,将该 driver 在这个 worker 上启动,移除掉等待队列中当前 driver,然后调用 startExecutorsOnWorkers 启动 executor。

这里又有两个方法,一个是 launchDriver 方法,一个是 startExecutorsOnWorkers 方法去启动 executor,startExecutorsOnWorkers 这个方法放到下面文章里讲,这篇文章主要讲 driver 注册和启动。

4,launchDriver 方法

这个方法主要是更新一些信息(worker 中的资源变更,worker 中启动的 driver 信息记录;driver 中添加上 worker 的信息),然后将向对应的 worker 发送 LaunchDriver 的消息。

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // 这里是将 workerInfo 中添加上启动 driver 的信息,内部也会减去 driver 使用掉的资源
    worker.addDriver(driver)
    // 将 driver 启动的 worker 信息记录到 driver 中
    driver.worker = Some(worker)
    // 给 worker 发送 LaunchDriver 的信息
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    // 标记当前 driver 状态为 running 状态
    driver.state = DriverState.RUNNING
  }
复制代码

通过把启动的 driver 信息记录到对应的 worker 信息中,再将对应的 worker 信息记录到 driver 里,然后给 worker 发送消息让 worker 启动 driver,标记当前的 driver 状态为 running。

这里会给 worker 发送 LaunchDriver 的消息,下面去看下 worker 中是怎么处理这个消息的。

二,org.apache.spark.deploy.worker.Worker

private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging
复制代码

从继承关系上可以看出 worker 也是 RpcEndPoint,所以直接找到它的 receive 方法,然后根据模式匹配找到 LaunchDriver 这个匹配下看操作逻辑即可。

case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
  		// 将 driver 信息封装到一个 runner 内
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
  		// 然后将这个 runner 保存到一个 HashMap 中
      drivers(driverId) = driver
      // 启动这个 runner
      driver.start()
			// 更新当前 worker 的资源信息
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
复制代码

这里会将 driver 的信息封装到一个 DriverRunner 里面,然后再降这个 runner 保存到内存的一个 HashMap 中,然后开启这个 ruuner,更新当前 worker 的资源信息。

到这里我们需要去看 DriverRunner 里是怎么操作的。

三,org.apache.spark.deploy.worker.DriverRunner

DriverRunner 是在 standalone cluster 部署模式下用来执行 driver 操作的,包括当 driver 挂掉之后的自动重启。

1,start 方法

前面调用的是 runner 的 start 方法,所以我们直接看这个 start 方法:

private[worker] def start() = {
  	// 开一个线程
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }

          // 准备 driver 的 jar 包并且执行 driver,并返回一个 exitCode
          val exitCode = prepareAndRunDriver()

          // 根据 exitCode 设置 finalState,一共有三种,分别为:FINISHED,KILLED,FAILED
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        // 然后将 driverId 和 driver 执行结果 finalState 以及一些异常信息发送给 worker
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }
复制代码

这里主要是调用了一个 prepareAndRunDriver 这个方法,返回了一个结果码,然后把结果码转换为 finalState ,然后发送给 worker。

所以我们直接去找 prepareAndRunDriver 这个方法。

2,prepareAndRunDriver 方法

private[worker] def prepareAndRunDriver(): Int = {
  	
    // 创建 driver 的工作目录
    val driverDir = createWorkingDirectory()
    // 下载 driver 的 jar 包到工作目录下
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // 创建 ProcessBuilder
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }
复制代码

这个方法主要做了这些事:创建 driver 的工作目录,将 driver 的 jar 包下载到工作目录下,然后创建 ProcessBuilder,传入 driver 的执行命令,然后调用 runDriver 方法。

下面我们看下 runDriver 方法。

3,runDriver 方法

private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    builder.directory(baseDir)
    // 初始化操作
    def initialize(process: Process): Unit = {
      // 创建 stout 文件
      val stdout = new File(baseDir, "stdout")
      // 将 process 的 InputStream 流重定向为 stout 文件
      CommandUtils.redirectStream(process.getInputStream, stdout)
			
      // 创建 stderr 文件
      val stderr = new File(baseDir, "stderr")
      // 将 builder 命令格式化处理
      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      // 将 process 的 ErrStream 重定向到 stderr 文件
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
  	// 调用 runCommandWithRetry
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }
复制代码

该方法主要是定义了一个 initialize 方法,里面会将传入的 process 的输入流和 err 流重定向到自定义的两个文件中去,然后调用 runCommandWithRetry 这个方法。

看下 runCommandWithRetry 这个方法。

4,runCommandWithRetry 方法

private[worker] def runCommandWithRetry(
      command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
  	// 退出码  
  	var exitCode = -1
    // 提交重试的灯带时间
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        // 如果被 kill 则返回 exitcode
        if (killed) { return exitCode }
        // 执行 command 命令,启动 driver 进程
        process = Some(command.start())
        // 调用上面定义好的 initialize 方法,将一些流的输出文件做重定向
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()
      exitCode = process.get.waitFor()

      // check if attempting another run
      keepTrying = supervise && exitCode != 0 && !killed
      if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }

    exitCode
  }
复制代码

这里是真正运行 driver 进程的地方,开启 driver 进程后会使用上面 runDriver 中定义好的 initialize 方法去将 driver 进程中的一些流的输出文件做重定向操作,并返回 exitcode。

至此,driver 就已经在 master 上注册好了,并且 master 也分配合适的 worker 启动了该 driver 进程。

我们在 DriverRunner start 方法的最后会调用 worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 这个方法,给 worker 发送 driver 状态变化的消息。

四,org.apache.spark.deploy.worker.Worker

这里我们看下 worker 是怎么处理的。

在 woker 的 receive 方法的模式匹配中是这么操作的:

case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
      handleDriverStateChanged(driverStateChanged)
复制代码

会去调用 handleDriverStateChanged 这个方法。

1,handleDriverStateChanged 方法

我们再看下 handleDriverStateChanged 这个方法:

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    val driverId = driverStateChanged.driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged.state
    // 根据 state 做匹配打印日志
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $driverId was killed by user")
      case _ =>
        logDebug(s"Driver $driverId changed state to $state")
    }
  	// 向 master 发送 driverStateChanged 消息
    sendToMaster(driverStateChanged)
    // 将该 driver 从 drivers 移除到 finishedDrivers 中去
    val driver = drivers.remove(driverId).get
    finishedDrivers(driverId) = driver
    trimFinishedDriversIfNecessary()
    // 更新 worker 节点的资源情况
    memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver.driverDesc.cores
  }
复制代码

主要是做了这些事:根据发送过来的 state 做模式匹配,打印对应的 log。然后把这个 driverStateChanged 消息转发给 master,最后再更新下当前 worker 的一些存储数据。

最后在看下 master 收到这个 driverStateChanged 消息是怎么处理的。

五,org.apache.spark.deploy.master.Master

在其 recieve 方法中可以找到匹配到 driverStageChanged 消息后的操作:

case DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          // 调用 removeDriver 方法
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
复制代码

在这里是调用了 removeDriver 方法,我们下面就看下这个方法。

1,removeDriver 方法

private def removeDriver(
      driverId: String,
      finalState: DriverState,
      exception: Option[Exception]) {
  	// 根据 driver id 进行模式匹配
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        // 从 drivers 集合中移除当前 driver
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
      	// 将 driver 添加到 completedDrivers 中去
        completedDrivers += driver
        // 从持久化引擎中移除
        persistenceEngine.removeDriver(driver)
        // 更新 driver 的状态和 exception 并从 driver 的 worker 中移除掉当前 driver
        driver.state = finalState
        driver.exception = exception
        driver.worker.foreach(w => w.removeDriver(driver))
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }
复制代码

这个方法主要是将 master 中资源做恢复操作,会根据当前退出的 driver 做模式匹配,找到这个 driver,然后将其从 drivers 的集合中移除,添加到 completedDrivers 中去,然后从持久化引擎中移除掉,更新 driver 的状态,并从 driver 持有的 worker 中移除掉结束的这个 driver。然后再调用 schedule 方法,让释放资源重新调度。

至此,driver 的注册,启动,以及退出后资源回收,都结束了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Flash ActionScript 3.0从入门到精通

Flash ActionScript 3.0从入门到精通

章精设、胡登涛 / 清华大学出版社 / 2008-10-1 / 69.00元

Flash ActionScript 3.0的出现,不仅从形式上改变了ActionScript,而且从本质上改变了ActionScript,使ActionScript 3.0成为了真正的面向对象编程语言。 本书从最简单的编程知识出发,带领读者走进编程的大门,是一本不可多得的ActionScript 3.0入门书。本书在注重基础的同时,从更高的层次来介绍ActionScript 3.0的面向对......一起来看看 《Flash ActionScript 3.0从入门到精通》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具