首页 spark源码分析 - shell
文章
取消

spark源码分析 - shell

spark-shell

1
2
3
4
5
6
7
8
9
10
11
12
function main() {
# 对当前系统进行判断,通过spark-submits.sh 启动 org.apache.spark.repl.Main
  if $cygwin; then
    stty -icanon min 1 -echo > /dev/null 2>&1
    export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
    stty icanon echo > /dev/null 2>&1
  else
    export SPARK_SUBMIT_OPTS
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
  fi
}

org.apache.spark.repl.Main

1
2
3
4
5
def main(args: Array[String]) {
    //初始化SparkILoop,调用process方法
    _interp = new SparkILoop
    _interp.process(args)
}

SparkILoop.process

1
2
3
4
5
6
7
8
9
private def process(settings: Settings): Boolean = savingContextLoader {
	......
	//前面内容很多,大致就是判断一些参数、初始化解释器之类的,例如运行的模式,
    //然后就运行主要的两个方法
    addThunk(printWelcome())
    addThunk(initializeSpark())
    ......
    //后面的也不是很重要
}

printWelcome

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//打印Spark 中的版本等信息,也就是每次启动Spark-shell显示的欢迎界面
def printWelcome() {
    echo("""Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version %s
      /_/
""".format(SPARK_VERSION))
    import Properties._
    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
      versionString, javaVmName, javaVersion)
    echo(welcomeMsg)
    echo("Type in expressions to have them evaluated.")
    echo("Type :help for more information.")
 }

initializeSpark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def initializeSpark() {
    intp.beQuietDuring {
        //创建SparkContex,也就是创建spark运行环境,以及下面的SparkSql运行环境
      command("""
         @transient val sc = {
           val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
           println("Spark context available as sc.")
           _sc
         }
        """)
      command("""
         @transient val sqlContext = {
           val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext()
           println("SQL context available as sqlContext.")
           _sqlContext
         }
        """)
      command("import org.apache.spark.SparkContext._")
      command("import sqlContext.implicits._")
      command("import sqlContext.sql")
      command("import org.apache.spark.sql.functions._")
    }
  }

createSparkContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//初始化SparkContex,初始化createSQLContext就不贴了
def createSparkContext(): SparkContext = {
    val execUri = System.getenv("SPARK_EXECUTOR_URI")
    val jars = SparkILoop.getAddedJars
    val conf = new SparkConf()
      .setMaster(getMaster())
      .setJars(jars)
      .set("spark.repl.class.uri", intp.classServerUri)
      .setIfMissing("spark.app.name", "Spark shell")
    if (execUri != null) {
      conf.set("spark.executor.uri", execUri)
    }
    sparkContext = new SparkContext(conf)
    logInfo("Created spark context..")
    sparkContext
  }
本文由作者按照 CC BY 4.0 进行授权