1. 概述
Spark-1.6以后RPC默认使用Netty替代Akka,在Netty上加了一层封装,为实现对Spark的定制开发,所以了解Spark中RPC的原理还是有必要的
Akka是一个异步的消息框架,所谓的异步,简言之就是消息发送方发送出消息,不用阻塞等待结果,接收方处理完返回结果即可。Akka支持百万级的消息传递,特别适合复杂的大规模分布式系统。Akka基于Actor模型,提供用于创建可扩展,弹性,快速响应的应用程序的平台。Actor封装了状态和行为的对象,不同的Actor之间可通过消息交换实现通信,每个Actor都有自己的消息收件箱。Akka可以简化并发场景下的开发,其异步,高性能的事件驱动模型,轻量级的事件处理可大大方便用于开发复杂的分布式系统。早期Spark大量采用Akka作为RPC。Netty也是一个知名的高性能,异步消息框架,Spark早期便使用它解决大文件传输问题,用来克服Akka的短板。根据社区的说法,因为很多Spark用户饱受Akka复杂依赖关系的困扰,所以后来干脆就直接用Netty代替了Akka。
2. Spark 1.6+ 中的RPC
2.1 RpcEndpoint
用来通讯的个体(Master,Worker,Diver),一个RpcEndpoint
的生命周期包括:onStart()->recevie->onStop(),当然也还有很多其他方法,后面会有介绍
2.2 RpcEndpointRef
RpcEndpoint
的一个引用,当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。
2.3 RpcAddress
RpcEndpointRef
的地址,Host + Port。
2.4 RpcEnv
RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。
3. RPC网络通信的抽象图
核心的RpcEnv是一个特质(trait),它主要提供了停止,注册,获取endpoint等方法的定义,而NettyRpcEnv提供了该特质的一个具体实现。
通过工厂RpcEnvFactory来产生一个RpcEnv,而NettyRpcEnvFactory用来生成NettyRpcEnv的一个对象。
当我们调用RpcEnv中的setupEndpoint来注册一个endpoint到rpcEnv的时候,在NettyRpcEnv内部,会将该endpoint的名称与其本省的映射关系,rpcEndpoint与rpcEndpointRef之间映射关系保存在dispatcher对应的成员变量中。
3. Master具体实现
3.1 Master类的定义
1
2
3
4
5
6
7
8
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
......
可以看到Master继承ThreadSafeRpcEndpoint
,而ThreadSafeRpcEndpoint
继承RpcEndpoint
,跟上面的抽象图差不多
3.2 Master启动
1
2
3
4
5
6
7
8
9
10
11
12
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
会通过NettyRpcEnvFactory
创建一个RpcEnv,rcpEnv在创建一个Endpoint,也就是Master了
3.3 RpcEndpoint特质
master的启动会创建一个RpcEnv并将自己注册到其中。继续看RpcEndpoint特质的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private[spark] trait RpcEndpoint {
//当前RpcEndpoint注册到的RpcEnv主子,可以类比为Akka中的actorSystem
val rpcEnv: RpcEnv
//直接用来发送消息的RpcEndpointRef,可以类比为Akka中的actorRef
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
//处理来自RpcEndpointRef.send或者RpcCallContext.reply的消息
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
//处理来自RpcEndpointRef.ask的消息,会有相应的回复
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
//篇幅限制,其余onError,onConnected,onDisconnected,onNetworkError,
//onStart,onStop,stop方法此处省略
}
3.4 RpcEnv抽象类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private[spark] abstract class RpcEnv(conf: SparkConf) {
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
//返回endpointRef
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
//返回RpcEnv监听的地址
def address: RpcAddress
//注册一个RpcEndpoint到RpcEnv并返回RpcEndpointRef
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
//通过uri异步地查询RpcEndpointRef
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
//通过uri查询RpcEndpointRef,这种方式会产生阻塞
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}
//通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
}
//关掉endpoint
def stop(endpoint: RpcEndpointRef): Unit
//关掉RpcEnv
def shutdown(): Unit
//等待结束
def awaitTermination(): Unit
//没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑
def deserialize[T](deserializationAction: () => T): T
//返回文件server实例
def fileServer: RpcEnvFileServer
//开一个针对给定URI的channel用来下载文件
def openChannel(uri: String): ReadableByteChannel
}
另外RpcEnv有一个伴生对象,实现了create方法:
1
2
3
4
5
6
7
8
9
10
11
12
private[spark] object RpcEnv {
def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
new NettyRpcEnvFactory().create(config)
}
}
这就是在master启动方法中的create具体实现,可以看到调用了Netty工厂方法NettyRpcEnvFactory,该方法是对Netty的具体封装。
3.5 master中消息处理
上文可以看到,在RpcEndpoint中最核心的便是receive和receiveAndReply方法,定义了消息处理的核心逻辑,master中也有相应的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader =>
case CompleteRecovery =>
case RevokedLeadership =>
case RegisterApplication(description, driver) =>
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
case DriverStateChanged(driverId, state, exception) =>
case Heartbeat(workerId, worker) =>
case MasterChangeAcknowledged(appId) =>
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
case WorkerLatestState(workerId, executors, driverIds) =>
case UnregisterApplication(applicationId) =>
case CheckForWorkerTimeOut =>
}
这里定义了master一系列的消息处理逻辑,而receiveAndReply中,
1
2
3
4
5
6
7
8
9
10
11
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
case RequestSubmitDriver(description) =>
case RequestKillDriver(driverId) =>
case RequestDriverStatus(driverId) =>
case RequestMasterState =>
case BoundPortsRequest =>
case RequestExecutors(appId, requestedTotal) =>
case KillExecutors(appId, executorIds) =>
}
定义了对需要回复的消息组的处理逻辑。
在看看Worker的实现
3.6 Worker
1
2
3
4
5
6
7
8
9
10
11
12
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
与Master一样
3.7 worker启动方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
和Master一样,但是Worker会在onStart()向Master进行注册
3.8 worker注册到master RpcEnv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
masterEndpoint.ask是核心,发送了一个RegisterWorker消息到masterEndpoint并期待对方的RegisterWorkerResponse,对response做出相应的处理。这样worker就成功和master建立了连接,它们之间可以互相发送消息进行通信。
3.9worker到master的通信
worker和master之间是一个主从关系,worker注册到master之后,master就可以通过消息传递实现对worker的管理,在worker中有一个方法:
1
2
3
4
5
6
7
8
private def sendToMaster(message: Any): Unit = {
master match {
case Some(masterRef) => masterRef.send(message)
case None =>
logWarning(
s"Dropping $message because the connection to master has not yet been established")
}
}
一目了然,就是干的发送消息到master的活儿,在worker中很多地方都用到这个方法,比如handleExecutorStateChanged(executorStateChanged:ExecutorStateChanged)方法中,sendToMaster(executorStateChanged)就向masterRef发送了executorStateChanged消息,前文中master中的recevie方法中,就有一个对ExecutorStateChanged消息的处理逻辑。
3.10master到worker的通信
同样的,master要对worker实现管理也是通过发送消息实现的,比如launchExecutor(worker: WorkerInfo, exec: ExecutorDesc)方法中:
1
2
3
4
5
6
7
8
9
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
//向worker发送LaunchExecutor消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
master向worker发送了LaunchExecutor消息告诉worker应该启动executor了,而worker中的receive方法中对LaunchExecutor消息进行处理并完成master交代给自己的任务。