Spark RDD概述以及创建方式

一、概述

1、什么是RDD?

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。在代码中是一个抽象类,它代表一个不可变可分区、里面的元素可并行计算的集合。

RDD用于支持在并行计算时能够高效地利用中间结果,支持更简单的编程模型,同时也具有像MapReduce等并行计算框架的高容错性、能够高效地进行调度及可扩展性。RDD的容错通过记录RDD转换操作的lineage关系来进行,lineage记录了RDD的依赖关系,当出现错误的时候,直接通过lineage进行恢复。

注意:RDD是Spark 2.0之前的概念,在2.0之后的版本被Dataset代替了,Dataset是在RDD的基础上做了优化,是具有强类型的(但是在Python API当中不是强类型的,Python中所有的Dataset是Dataset[Row],为了和Pandas和概念一致,可以将其称为DataFrame)。但是在目前的Spark中,RDD是兼容的,并且RDD的应用依然是第一位的。

2、RDD的五大属性

(1)一组分区( A list of partitions)
一个RDD有多个分区,后期Spark任务计算是以分区为单位,一个分区就对应上一个Task线程。 通过val rdd1=sc.textFile(文件) ,如果这个文件大小的block个数小于等于2,它产生的rdd的分区数就是2。如果这个文件大小的block个数大于2,它产生的rdd的分区数跟文件的block相同。

(2)一个计算每个分区的函数(A function for computing each split)
由一个函数计算每一个分片 比如: rdd2=rdd1.map(x => (x, 1)) ,这里指的就是每个单词计为1的函数。

(3)RDD之间的一系列依赖关系(A list of dependencies on other RDDs)
一个RDD会依赖于其它多个RDD,这里就涉及到RDD与RDD之间的依赖关系,后期Spark任务的容错机制就是根据这个特性而来。 比如: rdd2=rdd1.map(x => (x, 1)) rdd2的结果是通过rdd1调用了map方法生成,那么rdd2就依赖于rdd1的结果。

(4)可选,对于Key-Value类型的RDD才会有分区函数(Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned))
分区函数的作用:决定了原始RDD的数据会流入到下面RDD的哪些分区中。

(5)可选,存储存取每个Partition的优先位置(Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file))
这里涉及到数据的本地性和数据位置最优Spark后期在进行任务调度的时候,会优先考虑存有数据的worker节点来进行任务的计算。大大减少数据的网络传输,提升性能。

3、其它说明

(1)RDD是只读的,如果改变RDD中的数据,只能在现有的RDD上进行转换得到新的RDD。而通过一个RDD得到另一个RDD是通过算子实现的(算子就是一系列函数),RDD的算子包括两类:Transformation算子和Action算子。前者是用来将RDD进行转化,创建RDD血缘关系的(或者说是用来计算的),后来是用来触发RDD计算的。

因为RDD在经过一系列Transformation算子转换之后,它并不是马上进行计算的,而是记录了RDD之间的血缘关系,或者说RDD之间的依赖关系。从RDD A进行什么样的计算会得到RDD B,从RDD B进行怎么样的计算会得到RDD C,记录的是这样的关系。真正的计算是在执行了Actions算子之后才会触发的,此时才会得到计算结果。

(2)RDD之间是存在某种依赖的,或者说是存在血缘关系的。依赖可以分为两种:窄依赖和宽依赖。窄依赖的意思就是当前RDD与下一个RDD的分区之间是一一对应的,是一对一的关系;宽依赖就是当前RDD与下一个RDD之间的分区是多对多的关系。

(3)如果多次使用一个RDD,可以将该RDD缓存起来,后续需要使用的时候直接从缓存读取,可以加速计算。

(4)虽然RDD可以记录血缘关系,但是当血缘关系过长的时候,万一某个RDD的数据丢失了,如果根据血缘关系从头重建的话,消耗是比较大的。所以可以通过Checkpoint将RDD持久化保存,这样就可以切断之前的血缘关系,需要的时候直接从Checkpoint出开始。

二、创建RDD

Spark中RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其它RDD转换得到。

1、从集合中创建。从集合中创建有两个函数:一个是parallelize(),另一个是makeRDD()。但是在pyspark中没有makeRDD()。

scala> val rdd = sc.parallelize(Array(1, 2 , 3, 4, 5))

scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))

在调用parallelize()方法时,可以通过传入第二个参数来指定分区数。Spark官方的建议是,为集群中的每个CPU创建2-4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如,parallelize(array, 2)。

2、由外部存储系统的数据创建。包括本地文件系统以及Hadoop支持的HDFS、HBase等。

scala> val rdd = sc.textFile(“/home/hadoop/data.txt”)

3、从其它RDD转换。

scala> val rdd2 = rdd1.map( _ * 2)

更多关于转换的算子会专门列出来。

点个赞呗:程序员虾说 » Spark RDD概述以及创建方式

赞 (0) 打赏

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

请作者喝杯咖啡~

支付宝扫一扫打赏

微信扫一扫打赏