spark RDD五大特性并在源码中的体现

RDD的五大特性

Posted by Woods on September 22, 2018

RDD: Resilient Distributed Dataset

弹性分布式数据集

RDD五大特性及源码中体现:

1. A list of partitions (可分区的)

是由多个分区集合合成的列表

1
protected def get Partitons:Array[Partition]

HadoopRdd实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) //getSplits 获得Hadoop文件的分片就是partition
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits 
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      case e: InvalidInputException if ignoreMissingFiles =>
        logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
            s" partitions returned from this path.", e)
        Array.empty[Partition]
    }
  }

getSplits方法,获得Hadoop文件的分片就是partition

2. A function for computing each split/partition (函数作用所有分区)

函数是作用在所有分片/分区的上的。

1
def compute(split: Partiton, context: TaskContext): Iterator[T]

HadoopRDD实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {

      private val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      private val jobConf = getJobConf()

      private val inputMetrics = context.taskMetrics().inputMetrics
      private val existingBytesRead = inputMetrics.bytesRead

      // Sets InputFileBlockHolder for the file block's information
      split.inputSplit.value match {
        case fs: FileSplit =>
          InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
        case _ =>
          InputFileBlockHolder.unset()
      }
      ...//省略部分实现
    }
    ...//省略部分实现
}

3. A list of dependencies on other RDDs (血缘关系)

RDD之间具有依赖关系,血缘关系。如有一个窄依赖RDD里的分区数据丢失,会自动从上一个RDD里分区重新计算一个分区的数据。

1
protected def getDependencies: Seq[Dependency[_]] = deps

ShuffledRDD实现:

1
2
3
4
5
6
7
8
9
10
11
 override def getDependencies: Seq[Dependency[_]] = {
    val serializer = userSpecifiedSerializer.getOrElse {
      val serializerManager = SparkEnv.get.serializerManager
      if (mapSideCombine) {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
      } else {
        serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
      }
    }
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }

4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) (可以KV存储)

可选的,一个分区的数据是按照KV方式存储的(例如按照key的哈希分区)

1
@transient val partitioner: Option[Partitioner] = None

HadoopRDD实现:

1
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (数据本地性)

可选的, 每一个分片计算时都要遵循数据本地性原则。(计算和存储最好在同一个节点)

1
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

HadoopRDD实现:

1
2
3
4
5
6
7
8
9
override def getPreferredLocations(split: Partition): Seq[String] = {
    val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
    val locs = hsplit match {
      case lsplit: InputSplitWithLocationInfo =>
        HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
      case _ => None
    }
    locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
  }