首页 spark RPC原理
文章
取消

spark RPC原理

1. 概述

Spark-1.6以后RPC默认使用Netty替代Akka,在Netty上加了一层封装,为实现对Spark的定制开发,所以了解Spark中RPC的原理还是有必要的

Akka是一个异步的消息框架,所谓的异步,简言之就是消息发送方发送出消息,不用阻塞等待结果,接收方处理完返回结果即可。Akka支持百万级的消息传递,特别适合复杂的大规模分布式系统。Akka基于Actor模型,提供用于创建可扩展,弹性,快速响应的应用程序的平台。Actor封装了状态和行为的对象,不同的Actor之间可通过消息交换实现通信,每个Actor都有自己的消息收件箱。Akka可以简化并发场景下的开发,其异步,高性能的事件驱动模型,轻量级的事件处理可大大方便用于开发复杂的分布式系统。早期Spark大量采用Akka作为RPC。Netty也是一个知名的高性能,异步消息框架,Spark早期便使用它解决大文件传输问题,用来克服Akka的短板。根据社区的说法,因为很多Spark用户饱受Akka复杂依赖关系的困扰,所以后来干脆就直接用Netty代替了Akka。

2. Spark 1.6+ 中的RPC

2.1 RpcEndpoint

用来通讯的个体(Master,Worker,Diver),一个RpcEndpoint的生命周期包括:onStart()->recevie->onStop(),当然也还有很多其他方法,后面会有介绍

2.2 RpcEndpointRef

RpcEndpoint的一个引用,当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

2.3 RpcAddress

RpcEndpointRef的地址,Host + Port。

2.4 RpcEnv

RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。

3. RPC网络通信的抽象图

  1. 核心的RpcEnv是一个特质(trait),它主要提供了停止,注册,获取endpoint等方法的定义,而NettyRpcEnv提供了该特质的一个具体实现。

  2. 通过工厂RpcEnvFactory来产生一个RpcEnv,而NettyRpcEnvFactory用来生成NettyRpcEnv的一个对象。

  3. 当我们调用RpcEnv中的setupEndpoint来注册一个endpoint到rpcEnv的时候,在NettyRpcEnv内部,会将该endpoint的名称与其本省的映射关系,rpcEndpoint与rpcEndpointRef之间映射关系保存在dispatcher对应的成员变量中。

3. Master具体实现

3.1 Master类的定义

1
2
3
4
5
6
7
8
private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
  ......

可以看到Master继承ThreadSafeRpcEndpoint,而ThreadSafeRpcEndpoint继承RpcEndpoint,跟上面的抽象图差不多

3.2 Master启动

1
2
3
4
5
6
7
8
9
10
11
12
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

会通过NettyRpcEnvFactory创建一个RpcEnv,rcpEnv在创建一个Endpoint,也就是Master了

3.3 RpcEndpoint特质

master的启动会创建一个RpcEnv并将自己注册到其中。继续看RpcEndpoint特质的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private[spark] trait RpcEndpoint {
  //当前RpcEndpoint注册到的RpcEnv主子,可以类比为Akka中的actorSystem
  val rpcEnv: RpcEnv
  //直接用来发送消息的RpcEndpointRef,可以类比为Akka中的actorRef
  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }
  //处理来自RpcEndpointRef.send或者RpcCallContext.reply的消息
  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }
  //处理来自RpcEndpointRef.ask的消息,会有相应的回复
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }
  //篇幅限制,其余onError,onConnected,onDisconnected,onNetworkError,
  //onStart,onStop,stop方法此处省略
}

3.4 RpcEnv抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private[spark] abstract class RpcEnv(conf: SparkConf) {
  private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
  //返回endpointRef
  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
  //返回RpcEnv监听的地址
  def address: RpcAddress
  //注册一个RpcEndpoint到RpcEnv并返回RpcEndpointRef
  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
  //通过uri异步地查询RpcEndpointRef
  def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
  //通过uri查询RpcEndpointRef,这种方式会产生阻塞
  def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
  }
  //通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞
  def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
    setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
  }
  //关掉endpoint
  def stop(endpoint: RpcEndpointRef): Unit
  //关掉RpcEnv
  def shutdown(): Unit
  //等待结束
  def awaitTermination(): Unit
  //没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑
  def deserialize[T](deserializationAction: () => T): T
  //返回文件server实例
  def fileServer: RpcEnvFileServer
  //开一个针对给定URI的channel用来下载文件
  def openChannel(uri: String): ReadableByteChannel
}

另外RpcEnv有一个伴生对象,实现了create方法:

1
2
3
4
5
6
7
8
9
10
11
12
private[spark] object RpcEnv {
  def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = {
    val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
    new NettyRpcEnvFactory().create(config)
  }
}

这就是在master启动方法中的create具体实现,可以看到调用了Netty工厂方法NettyRpcEnvFactory,该方法是对Netty的具体封装。

3.5 master中消息处理

上文可以看到,在RpcEndpoint中最核心的便是receive和receiveAndReply方法,定义了消息处理的核心逻辑,master中也有相应的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override def receive: PartialFunction[Any, Unit] = {
    case ElectedLeader =>      
    case CompleteRecovery => 
    case RevokedLeadership =>
    case RegisterApplication(description, driver) =>
    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
    case DriverStateChanged(driverId, state, exception) =>
    case Heartbeat(workerId, worker) =>
    case MasterChangeAcknowledged(appId) =>
    case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
    case WorkerLatestState(workerId, executors, driverIds) =>
    case UnregisterApplication(applicationId) =>
    case CheckForWorkerTimeOut =>
}

这里定义了master一系列的消息处理逻辑,而receiveAndReply中,

1
2
3
4
5
6
7
8
9
10
11
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
    case RequestSubmitDriver(description) =>
    case RequestKillDriver(driverId) =>
    case RequestDriverStatus(driverId) =>
    case RequestMasterState =>
    case BoundPortsRequest =>
    case RequestExecutors(appId, requestedTotal) =>
    case KillExecutors(appId, executorIds) =>
}

定义了对需要回复的消息组的处理逻辑。

在看看Worker的实现

3.6 Worker

1
2
3
4
5
6
7
8
9
10
11
12
private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging {

与Master一样

3.7 worker启动方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def startRpcEnvAndEndpoint(
    host: String,
    port: Int,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterUrls: Array[String],
    workDir: String,
    workerNumber: Option[Int] = None,
    conf: SparkConf = new SparkConf): RpcEnv = {
  // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
  val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
  val securityMgr = new SecurityManager(conf)
  val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
  val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
  rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
    masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
  rpcEnv
}

和Master一样,但是Worker会在onStart()向Master进行注册

3.8 worker注册到master RpcEnv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
  masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
    workerId, host, port, self, cores, memory, workerWebUiUrl))
    .onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        Utils.tryLogNonFatalError {
          handleRegisterResponse(msg)
        }
      case Failure(e) =>
        logError(s"Cannot register with master: ${masterEndpoint.address}", e)
        System.exit(1)
    }(ThreadUtils.sameThread)
}

masterEndpoint.ask是核心,发送了一个RegisterWorker消息到masterEndpoint并期待对方的RegisterWorkerResponse,对response做出相应的处理。这样worker就成功和master建立了连接,它们之间可以互相发送消息进行通信。

3.9worker到master的通信

worker和master之间是一个主从关系,worker注册到master之后,master就可以通过消息传递实现对worker的管理,在worker中有一个方法:

1
2
3
4
5
6
7
8
private def sendToMaster(message: Any): Unit = {
  master match {
    case Some(masterRef) => masterRef.send(message)
    case None =>
      logWarning(
        s"Dropping $message because the connection to master has not yet been established")
  }
}

一目了然,就是干的发送消息到master的活儿,在worker中很多地方都用到这个方法,比如handleExecutorStateChanged(executorStateChanged:ExecutorStateChanged)方法中,sendToMaster(executorStateChanged)就向masterRef发送了executorStateChanged消息,前文中master中的recevie方法中,就有一个对ExecutorStateChanged消息的处理逻辑。

3.10master到worker的通信

同样的,master要对worker实现管理也是通过发送消息实现的,比如launchExecutor(worker: WorkerInfo, exec: ExecutorDesc)方法中:

1
2
3
4
5
6
7
8
9
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)
  //向worker发送LaunchExecutor消息
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

master向worker发送了LaunchExecutor消息告诉worker应该启动executor了,而worker中的receive方法中对LaunchExecutor消息进行处理并完成master交代给自己的任务。

本文由作者按照 CC BY 4.0 进行授权