首页 Spark内部原理
文章
取消

Spark内部原理

1.Shuffle

1.1 什么是Shuffle

Spark是分布式计算系统,数据块在不同节点执行,但是一些操作,例如join,需要将不同节点上相同的Key对应的Value聚集到一起,Shuffle便应运而生。

Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,这期间涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等,所以说Shuffle是整个应用程序运行过程中非常昂贵的一个阶段,理解Spark Shuffle原理有助于优化Spark应用程序。

1.2 Spark Shuffle的基本原理与特性

与MapReduce 中的Shuffle类似

  • 在DAG阶段以shuffle为界,划分Stage,上游为 map task,下游为reduce task
  • 每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write
  • 每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read

在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作,所以整个shuffle过程是极其昂贵的

1.4 Spark Shuffle实现演进

1.4.1 Hash Shuffle v1

和上述流程类似,

假如一个executor上运行 M 个map task,下游reduce 有 N 个分区,则executor 会生成M*N个临时文件,生成文件时需要申请文件描述符,当partition很多时,并行化运行task时可能会耗尽文件描述符、消耗大量内存。因此后来Hash Shuffle进一步变成了如下版本。

1.4.2 Sort Shuffle

  • 在map阶段,会先按照partition id、每个partition内部按照key对中间结果进行排序。所有的partition数据写在一个文件里,并且通过一个索引文件记录每个partition的大小和偏移量。这样并行运行时每个core只要2个文件,一个executor上最多2m个文件。。
  • 在reduce阶段,reduce task拉取数据做combine时不再使用HashMap而是ExternalAppendOnlyMap。如果内存不足会写次磁盘。但是排序会导致性能损失。

1.4.3 Unsafe Shuffle

从spark 1.5.0开始,spark开始了钨丝计划(Tungsten),目的是优化内存和CPU的使用,进一步提升spark的性能。为此,引入Unsafe Shuffle,它的做法是将数据记录用二进制的方式存储,直接在序列化的二进制数据上sort而不是在java 对象上,这样一方面可以减少memory的使用和GC的开销,另一方面避免shuffle过程中频繁的序列化以及反序列化。在排序过程中,它提供cache-efficient sorter,使用一个8 bytes的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。

但是使用Unsafe Shuffle有几个限制,shuffle阶段不能有aggregate操作,分区数不能超过一定大小( 2^{24} -1,这是可编码的最大parition id),所以像reduceByKey这类有aggregate操作的算子是不能使用Unsafe Shuffle,它会退化采用Sort Shuffle。

从spark-1.6.0开始,把Sort Shuffle和Unsafe Shuffle全部统一到Sort Shuffle中,如果检测到满足Unsafe Shuffle条件会自动采用Unsafe Shuffle,否则采用Sort Shuffle。从spark-2.0.0开始,spark把Hash Shuffle移除,可以说目前spark-2.0中只有一种Shuffle,即为Sort Shuffle。

2. 宽依赖&&窄依赖

2.1 RDD Lineages

RDD也是一个DAG的任务集合,一个DAG代表了一个RDD的计算过程。每个DAG都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark中叫做Lineages。

2.2 宽依赖&&窄依赖

  • 窄依赖:父分区对应一个子分区。对于窄依赖,只需通过重新计算丢失的那一块数据来恢复,容错成本较小。
  • 宽依赖:分区对应多个子分区 。对于宽依赖,会对父分区进行重新计算,造成冗余计算。

B ->G 中的join是窄依赖,因为之前的groupby已经将B中的数据通过shuffle进行了分区

所以join操作已有窄依赖已有宽依赖

如何判断是宽依赖还是窄依赖

每个RDD对象都有一个dependencies,通过获取这个属性,可以判断他是宽依赖还是窄依赖

宽依赖:

  • ShuffleDependency

窄依赖:

  • OneToOneDependency
  • PruneDependency
  • RangeDependency

也可以通过 toDebugString 属性,查看整个RDD Lineages

2.3 RDD容错

当出现数据丢失时,会通过RDD之间的血缘关系(Lineages)进行重新计算,但是如果错误发生在一个复杂的宽依赖的时候,重新计算任然会消耗掉很多资源。

2.4 缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

如图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。

当然缓存也有缓存到内存或者是硬盘上,默认情况下是缓存到内存

更多的缓存方式请点这里

3. 共享变量

在Spark执行时,每个task之前无法进行数据交换的,但是有时却需要统计一些公共的值,譬如计数之类的,该怎么告呢?

这时候就要用到Spark 中的共享变量了。Spark中一共有两个共享变量:Broadcast Variables、Accumulators

  • Broadcast Variables

    广播变量是一个只读变量,存放后,在集群中任何节点都可以访问

  • Accumulators

    累加器,功能和名字差不多,可以在并行的情况下高效的进行累加

参考

本文由作者按照 CC BY 4.0 进行授权