首页 spark源码分析 - submit
文章
取消

spark源码分析 - submit

下图大致描述了整个过程

spark-submit

1
2
3
4
5
6
7
8
if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 调用bin目录中的spark-class 参数为org.apache.spark.deploy.SparkSubmit
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

会先经历spark-class org.apache.spark.deploy.SparkSubmit处理,里面包括一些基本环境配置,然后运行

再看看 org.apache.spark.deploy.SparkSubmit的main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    // 在这里进行操作的匹配,在这里我们肯定是进入submit()
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

submit()

1
2
3
4
5
private def submit(args: SparkSubmitArguments): Unit = {
    //先是初始化环境,包括建立合适的环境变量,系统配置,应用参数
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
    .....
}

prepareSubmitEnvironment(args)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
   * Prepare the environment for submitting an application.
   * This returns a 4-tuple:
   *   (1) the arguments for the child process,
   *   (2) a list of classpath entries for the child,
   *   (3) a map of system properties, and
   *   (4) the main class for the child
   * Exposed for testing.
   * 这些都是spark-submit 里面的一些配置,例如启动模式、环境变量。这个方法就是根据这些启动模式来做	 * 出对应的处理,由于一般情况下会在yarn进行任务运行,所以这次运行的模式--master yarn --deploy-	 * mode cluster 
   */
  private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
      : (Seq[String], Seq[String], Map[String, String], String) = {
          ......
         //前面都是一些判断,直接进入正题
     if (isYarnCluster) {
       //当为yarn 的cluster模式时  会调用org.apache.spark.deploy.yarn.Client类
      childMainClass = "org.apache.spark.deploy.yarn.Client"
      //是否使用Python
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        if (args.pyFiles != null) {
          childArgs += ("--py-files", args.pyFiles)
        }
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
         //或者使用R
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
          //最后是默认情况,也就是我们这次任务执行的模式
        if (args.primaryResource != SPARK_INTERNAL) {
          childArgs += ("--jar", args.primaryResource)
        }
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }
      //最后通过筛选,返回这些参数
  	 (childArgs, childClasspath, sysProps, childMainClass)
 }

再回到submit(),会执行runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose),也就是传入我们刚刚返回的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    //前面会根据传入的参数进行环境配置,参数导入以及日志的打印
    ......
    try {
      // 加载我们传入的类,也就是 org.apache.spark.deploy.yarn.Client
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        .......//捕捉异常信息,这里就忽略了
    }
    //获取传入类的main函数
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
	......
    try {
      // 调用main函数
      mainMethod.invoke(null, childArgs.toArray)
    }
}

然后来看看org.apache.spark.deploy.yarn.Client.main()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  def main(argStrings: Array[String]) {
    if (!sys.props.contains("SPARK_SUBMIT")) {
      logWarning("WARNING: This client is deprecated and will be removed in a " +
        "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
    }
    // Set an env variable indicating we are running in YARN mode.
    // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
    System.setProperty("SPARK_YARN_MODE", "true")
    val sparkConf = new SparkConf

    val args = new ClientArguments(argStrings, sparkConf)
    // to maintain backwards-compatibility
    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
      sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
    }
    //在一顿初始化与判断后,初始化自己然后调用run()方法
    new Client(args, sparkConf).run()
  }

run()

1
2
3
4
5
6
7
8
9
10
def run(): Unit = {
    //提交应用
    this.appId = submitApplication()
    if (!launcherBackend.isConnected() && fireAndForget) {
      ......
    } else {
      val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
      ......
    }
  }

submitApplication(),来看看提交应用里面做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      //launcherBackend在前面已经进行了初始化,launcherBackend是一个底层使用Socket用来传递信息的抽象类
      launcherBackend.connect()
      // Setup the credentials before doing anything else,
      // so we have don't have issues at any point.
      setupCredentials()
      yarnClient.init(yarnConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      // 通过yarn api 创建一个application
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()
      reportLauncherState(SparkAppHandle.State.SUBMITTED)
      launcherBackend.setAppId(appId.toString())

      // Verify whether the cluster has enough resources for our AM
      // 检测集群是否有足够的资源可以调用
      verifyClusterResources(newAppResponse)
	  // 初始化环境用于启动ApplicationManager
      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)
	  //最后提交应用,并返回appId
      // Finally, submit and monitor the application
      logInfo(s"Submitting application ${appId.getId} to ResourceManager")
      yarnClient.submitApplication(appContext)
      appId
    } catch {
      case e: Throwable =>
        if (appId != null) {
          cleanupStagingDir(appId)
        }
        throw e
    }
  }

提交完任务后在回到 run() ,通过代码可以看出,他会继续执行monitorApplication(appId),也就是监视任务的进行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
   * Report the state of an application until it has exited, either successfully or
   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
   * or KILLED).
   *
   * @param appId ID of the application to monitor.
   * @param returnOnRunning Whether to also return the application state when it is RUNNING.
   * @param logApplicationReport Whether to log details of the application report every iteration.
   * @return A pair of the yarn application state and the final application state.
   */
  def monitorApplication(
      appId: ApplicationId,
      returnOnRunning: Boolean = false,
      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
    var lastState: YarnApplicationState = null
    while (true) {
      Thread.sleep(interval)
      val report: ApplicationReport =
        try {
          getApplicationReport(appId)
        } catch {
          case e: ApplicationNotFoundException =>
            logError(s"Application $appId not found.")
            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
          case NonFatal(e) =>
            logError(s"Failed to contact YARN for application $appId.", e)
            return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
        }
      val state = report.getYarnApplicationState

      if (logApplicationReport) {
        logInfo(s"Application report for $appId (state: $state)")

        // If DEBUG is enabled, log report details every iteration
        // Otherwise, log them every time the application changes state
        if (log.isDebugEnabled) {
          logDebug(formatReportDetails(report))
        } else if (lastState != state) {
          logInfo(formatReportDetails(report))
        }
      }

      if (lastState != state) {
        state match {
          case YarnApplicationState.RUNNING =>
            reportLauncherState(SparkAppHandle.State.RUNNING)
          case YarnApplicationState.FINISHED =>
            reportLauncherState(SparkAppHandle.State.FINISHED)
          case YarnApplicationState.FAILED =>
            reportLauncherState(SparkAppHandle.State.FAILED)
          case YarnApplicationState.KILLED =>
            reportLauncherState(SparkAppHandle.State.KILLED)
          case _ =>
        }
      }

      if (state == YarnApplicationState.FINISHED ||
        state == YarnApplicationState.FAILED ||
        state == YarnApplicationState.KILLED) {
        cleanupStagingDir(appId)
        return (state, report.getFinalApplicationStatus)
      }

      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
        return (state, report.getFinalApplicationStatus)
      }

      lastState = state
    }
本文由作者按照 CC BY 4.0 进行授权