spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)

栏目: 服务器 · 发布时间: 6年前

内容简介:spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)

本文分析的源码基于Spark2.1.0版本,如果有理解不当的地方欢迎批评指正。

在之前的一篇文章中我们分析了Spark-submit脚本,发现该脚本会调用spark-class脚本检查参数设置,以及提交任务。最后发现,提交任务的入口类是org.apache.spark.deploy.SparkSubmit 我们接下来深入这个类,看看从提交任务到执行用户jar包之间都发生了什么;

首先找到org.apache.spark.deploy.SparkSubmit类的main方法:

def main (args: Array[ String ]): Unit

= {

val appArgs = new

SparkSubmitArguments(args)

if (appArgs. verbose

) {

// scalastyle:off println

printStream

.println(appArgs)

// scalastyle:on println

}

appArgs.

action match

{

case SparkSubmitAction. SUBMIT

=> submit (appArgs)

case SparkSubmitAction. KILL

=> kill (appArgs)

case SparkSubmitAction. REQUEST_STATUS

=> requestStatus (appArgs)

}

}

main方法很简单,首先利用参数创建了一个SparkSubmitArguments 这个类是SparkSubmitArgumentsParser的子类,主要工作就是对Spark应用的参数进行解析,以及加载当前和Spark相关的环境变量。

在SparkSubmitArguments中有一个action成员,用于表示spark-submit的动作,一般来说使用spark-submit有三个目的,第一是提交应用(这也是最常用的),第二是可以通过spark-submit杀死某个任务,第三是获取某个正在执行的任务的状态。这个action是通过参数指定的,默认值为submit即提交一个任务。

我们可以跳到submit方法看看,该方法定义如下:

private def submit(args: SparkSubmitArguments): Unit

仅接受一个SparkSubmitArguments实例作为参数,这个方法执行两个步骤,首先是基于集群管理器和部署模式设置合适的classpath、系统属性和应用程序参数,以此为运行用户的main方法做环境准备。

然后,使用第一步准备好的环境来启动main方法,这是通过反射完成的,我们下面再看。

Submit方法最终执行了其内部定义的doRunMain,而doRunMain方法会调用runMain(line 169)

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

在runMain方法中可以看到这三行代码:分别是line 695:

mainClass = Utils.classForName(childMainClass)

这行代码利用反射加载main方法所在类,Utils.classForName方法最终还是调用的Class.forName方法;

line 722:

valmainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

以及line 738:

mainMethod.invoke(null, childArgs.toArray)

上述两行分别获取main方法,然后执行main方法。这些动作都是在driver端完成的。

再理一下思路,spark提交jar的过程如下:

Spark-submit -> Spark-class -> org.apache.spark.deploy.SparkSubmit-> { main -> submit -> doRunMan -> runMain}

org.apache.spark.deploy.SparkSubmit主要负责准备运行环境以及通过反射获取app的main方法并执行。

为了方便理解,接下来我们对Spark利用反射加载运行用户应用程序的main方法做一个简易实现:

新建一个scala工程,在com.load包下新建一个Main类:

package com.load
/**
  * Created by hunan on 2017/5/19.
  */
object Main {
  def main(args: Array[String]): Unit = {
    println("Run main method success!")
  }
}

这个用来模拟我们自己开发的Spark应用,然后打jar包,生成ScalaTest.jar

再创建另一个工程,将刚才的jar作为依赖添加进来,并写如下类:

package com.example
/**
  * Created by hunan on 2017/5/19.
  */
object Test {
  def main(args: Array[String]): Unit = {
    var mainClass:Class[_]=null
    try {
      mainClass = Class.forName("com.load.Main", true, Thread.currentThread().getContextClassLoader)
    }catch{
      case e:Exception=>
        e.printStackTrace()
    }

    if(mainClass!=null){
      val mainMethod = mainClass.getMethod("main",new Array[String](0).getClass)
      mainMethod.invoke(null,Array[String]())
    }
  }
}

这个用来模拟Spark本身,运行即可发现输出如下:

spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)

这就是Spark提交jar的机制,我们也可以发现这里的main方法仅作为普通方法执行,只不过Spark会检查该mian方法是不是静态的,如果不是静态就抛出异常拒绝执行,如果修改722行的“main”字串也可以实现以任意方法名为Spark app执行入口。


以上所述就是小编给大家介绍的《spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Mobilizing Web Sites

Mobilizing Web Sites

Layon, Kristofer / 2011-12 / 266.00元

Everyone has been talking about the mobile web in recent years, and more of us are browsing the web on smartphones and similar devices than ever before. But most of what we are viewing has not yet bee......一起来看看 《Mobilizing Web Sites》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

SHA 加密
SHA 加密

SHA 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具