Spark Transformation算子详解

Spark RDD算子分为Transformation和Action两类,其中Transformation算子又分为单Value型、双Value型、Key-Value型。

Transformation算子是延迟计算的,它只是计录一系列的计算过程(血族关系),并不会立即执行计算任务。只有当执行Action算子的时候才会触发计算任务,提价作业。

 

1、单Value算子

(1)map(func):将传入的函数作用在每一个RDD的每一个元素上,然后返回一个新的RDD。

        >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
        >>> map_rdd = rdd.map(lambda x: x * 2) 
        >>> map_rdd.collect() 
        [2, 4, 6, 8, 10]

(2)mapPartitions(func):作用类似于map算子,但是map算子是将func函数应用于RDD的每个元素上,而mapPartitions算子是将func函数应用于RDD的每个分片上。如果内存空间比较大的话可以使用mapPartitions算子来提高效率。

        >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
        >>> def func(iterator): 
        ...     for i in iterator:
        ...         yield i * 2
        >>> mapp_rdd = rdd.mapPartitions(lambda x:func(x))
        >>> mapp_rdd.collect()
        [2, 4, 6, 8, 10]

(3)mapPartitionsWithIndex(func):作用类似于mapPartitions算子,但是mapPartitionsWithIndex的func函数的第一个参数是分片索引,第二个才是分片。

        >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
        >>> def func(index, iterator): 
        ...     for i in iterator:
        ...         yield (index, i * 2)
        >>> mappindex_rdd = rdd.mapPartitionsWithIndex(lambda x, y: func(x, y))
        >>> mappindex_rdd.collect()
        [(0, 2), (1, 4), (2, 6), (3, 8), (3, 10)]

(4)flatMap(func):作用类似于map算子,只不过是在map之后再对元素进行扁平化处理(将多维序列转换成一维序列)。

        >>> rdd = sc.parallelize([[1,2,3], [4,5,6], [7,8,9]])
        >>> rdd.map(lambda x:x).collect()
        [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
        >>> rdd.flatMap(lambda x:x).collect()
        [1, 2, 3, 4, 5, 6, 7, 8, 9]

(5)glom():将每一个分区形成一个数组。

        >>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
        >>> rdd.glom().collect()
        [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

创建了带有三个分区的RDD对象,然后使用glom算子之后再使用college算子。将三个分区中的元素分别放到三个数组中。

(6)groupBy(func):按照传入函数的返回值进行分组,将返回值相同的元素放入一个迭代器中。

        >>> rdd = sc.parallelize([i for i in range(10)])
        >>> gropyby_rdd = rdd.groupBy(lambda x: x % 3)
        >>> [(x, sum(y)) for (x, y) in groupby_rdd.collect()]
        [(0, 18), (1, 12), (2, 15)]

创建了一个包含0到9正整数的数组,然后按照数组中每个元素对3取余的结果分组,总共可以分为三个组。得到分组的RDD之后对每个分组里面的内容进行求和。

(7)filter(func):按照func作用于元素上的返回值进行过滤。保留返回值为True的元素。

        >>> rdd = sc.parallelize([i for i in range(10)])
        >>> rdd.filter(lambda x: x % 2 == 0).collect()
        [0, 2, 4, 6, 8]

(8)sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,True为有放回的抽样,False为无放回的抽样,seed用于指定随机数生成器种子。

        >>> rdd = sc.parallelize([i for i in range(10)])
        >>> rdd.sample(True, 0.4, 2).collect() # 放回
        [0, 1, 1, 2, 3, 7, 9]
        >>> rdd.sample(False, 0.4, 2).collect() # 不放回
        [5, 8]

(9)distinct([numTasks]):返回去重之后的RDD。默认情况下会有8个并行任务来执行,但是也可以自己通过numTasks参数来指定。

        >>> rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])
        >>> rdd.distinct(2).collect()
        [2, 1, 3]

(10)coalesce(numPartitions, shuffle=False):缩减分区数。用于大数据集过滤后,提高小数据集的执行效率。

        >>> rdd = sc.parallelize([i for i in range(20)], 5)
        >>> rdd.getNumPartitions()
        5
        >>> col_rdd = rdd.coalesce(2)
        >>> col_rdd.getNumPartitions()
        2

(11)repartition(numPartitions):根据分区数,重新通过网络Shuffle所有数据。

        >>> rdd = sc.parallelize([i for i in range(20)], 5)
        >>> rdd.getNumPartitions()
        5
        >>> rep_rdd = rdd.repartition(2)
        >>> rep_rdd.getNumPartitions()
        2

coalesce重新分区,可以选择是否进行shuffle过程。repartition是调用的coalesce,默认是进行shuffle的。

(12)sortBy(func,[ascending], [numTasks]):使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

        >>> rdd = sc.parallelize([2, 3, 5, 4, 1])
        >>> rdd.sortBy(lambda x: x, True).collect() # 按照自身大小升序排列
        [1, 2, 3, 4, 5]
        >>> rdd.sortBy(lambda x: x^2, False).collect() # 按照平方数倒序排列
        [5, 4, 1, 3, 2]

(13)pipe(command, env=None, checkCode=False):Shell命令作用于RDD上。

        >>> rdd = sc.parallelize(['1', '2', '', '3'])
        >>> rdd.pipe('cat').collect()
        ['1', '2', '', '3']

 

2、双Value算子

(1)union(other):返回源RDD和参数RDD的并集。

        >>> rdd = sc.parallelize([1, 1, 2, 3])
        >>> rdd.union(rdd).collect()
        [1, 1, 2, 3, 1, 1, 2, 3]

(2)subtract(other, numPartitions=None):计算差集。

        >>> rdd1 = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
        >>> rdd2 = sc.parallelize([("a", 3), ("c", None)])
        >>> rdd1.subtract(rdd2).collect()
        [('b', 5), ('b', 4), ('a', 1)]

(3)intersection(other):求交集。

        >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
        >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
        >>> rdd1.intersection(rdd2).collect()
        [1, 2, 3]

(4)cartesian(other):求笛卡尔积。

        >>> rdd = sc.parallelize([1, 2])
        >>> sorted(rdd.cartesian(rdd).collect())
        [(1, 1), (1, 2), (2, 1), (2, 2)]

(5)zip(other):将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

        >>> x = sc.parallelize(range(0,5))
        >>> y = sc.parallelize(range(1000, 1005))
        >>> x.zip(y).collect()
        [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

 

3、Key-Value型算子

(1)partitionBy(numPartitions, partitionFunc=<function portable_hash>):对RDD进行分区操作,如果原有的RDD和现有的RDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生Shuffle过程。

        >>> rdd = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))          
        >>> rdd.partitionBy(2).glom().collect()
        [[(2, 2), (4, 4), (2, 2), (4, 4)], [(1, 1), (3, 3), (1, 1)]]

(2)groupByKey(numPartitions=None, partitionFunc=<function portable_hash>):按Key进行分组。

        >>> rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
        >>> rdd.groupByKey().map(lambda x: (x[0],sum(x[1]))).collect()
        [('b', 7), ('a', 3)]

(3)reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>):在一个(K, V)的RDD上调用,返回一个(K, V)的RDD,使用指定的reduce函数,将相同Key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

        >>> rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])
        >>> def func(x, y):
        ...     return x + y
        >>> rdd.reduceByKey(func).collect()
        [('b', 7), ('a', 3)]

(4)aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash>):在KV对的RDD中,,按Key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

参数描述:

    • zeroValue:给每一个分区中的每一个key一个初始值;
    • seqFunc:函数。用于在每一个分区中用初始值逐步迭代value;
    • combFunc:函数用于合并每个分区中的结果。
        >>> rdd = sc.parallelize([('a',3), ('a',2), ('c',4), ('b',3), ('c',6), ('c',8)], 2)
        >>> agg_rdd = rdd.aggregateByKey(0, lambda x, y: max(x, y), lambda x, y: x + y)
        >>> agg_rdd.collect()
        [('c', 12), ('b', 3), ('a', 3)]

以上代码定义了一个包含6个KV对的RDD,该RDD的分区数为2。其中一号分区中的内容是('a',3), ('a',2), ('c',4),二号分区中的内容是('b',3), ('c',6), ('c',8)。然后针对每个分区执行lambda x, y: max(x, y)计算每个分区中相同Key对应的最大值,一号分区得到('a', 3), ('c',4),二号分区得到('b',3), ('c',8),然后针对每个分区执行lambda x, y: x + y对两个分区中相同Key对应的Value进行求和,得到('a',3), ('b',3), ('c',12)

(5)foldByKey(zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash>):aggregateByKey的简化版,seqFunc和combFunc相同。

        >>> rdd = sc.parallelize([('a',3), ('a',2), ('c',4), ('b',3), ('c',6), ('c',8)], 2)
        >>> fold_rdd = rdd.foldByKey(0, lambda x, y: max(x, y))
        >>> fold_rdd.collect()
        [('c', 8), ('b', 3), ('a', 3)]

(6)combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash>):将相同Key对应的Value合并为一个集合。

参数描述:

    • createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值;
    • mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并;
    • mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
        >>> def my_list(a): 
        ...     return [a] 
        ... 
        >>> def my_append(a, b): 
        ...     a.append(b) 
        ...     return a 
        ... 
        >>> def my_extend(a, b): 
        ...     a.extend(b) 
        ...     return a 
        ... 
        >>> rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('a', 3), ('b', 2), ('b', 3)], 2)
        >>> combine_rdd = rdd.combineByKey(my_list, my_append, my_extend) 
        >>> combine_rdd.collect() 
        [('b', [1, 2, 3]), ('a', [1, 2, 3])]

(7)sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>):针对KV型的RDD排序。

        >>> rdd = sc.parallelize([('a', 1), ('b', 2), ('f', 3), ('d', 4), ('c', 5)])
        >>> rdd.sortByKey().collect()
        [('a', 1), ('b', 2), ('c', 5), ('d', 4), ('f', 3)]

(8)mapValues():针对于(K,V)形式的类型只对V进行操作。

        >>> rdd = sc.parallelize([('a', 1), ('b', 2), ('f', 3), ('d', 4), ('c', 5)])
        >>> rdd.mapValues(lambda v: v + 10).collect()
        [('a', 11), ('b', 12), ('f', 13), ('d', 14), ('c', 15)]

(9)join():在类型为(K, V)和(K, W)的RDD上调用,返回一个相同Key对应的所有元素对在一起的(K, (V , W))的RDD。

        >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
        >>> rdd2 = sc.parallelize([("a", 2), ("a", 3)])
        >>> rdd1.join(rdd2).collect()
        [('a', (1, 2)), ('a', (1, 3))]

(10)cogroup():在类型为(K, V)和(K, W)的RDD上调用,返回一个(K, (Iterable<V>, Iterable<W>))类型的RDD。

        >>> rdd1 = sc.parallelize([("a", 1), ("b", 4)])
        >>> rdd2 = sc.parallelize([("a", 2)])
        >>> [(x, tuple(map(list, y))) for x, y in list(rdd1.cogroup(rdd2).collect())]
        [('a', ([1], [2])), ('b', ([4], []))]

 

点个赞呗:程序员虾说 » Spark Transformation算子详解

赞 (0) 打赏

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

请作者喝杯咖啡~

支付宝扫一扫打赏

微信扫一扫打赏