Spark Action算子详解

1、reduce():通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

    # 案例1
    >>> rdd = sc.parallelize([i for i in range(5)])
    >>> rdd.reduce(lambda x, y: x + y)
    10

    # 案例2
    >>> rdd = sc.parallelize([('a', 'A'), ('b', 'B'), ('c', 'C')])
    >>> rdd.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    ('abc', 'ABC')

2、collect():在驱动程序中,以数组的形式返回数据集的所有元素。

这个就不用代码了吧。

3、count():返回RDD中元素的个数。

    >>> rdd = sc.parallelize([i for i in range(10)])
    >>> rdd.count()
    10

4、first():返回RDD中的第一个元素。

    >>> rdd = sc.parallelize([i for i in range(10)])
    >>> rdd.first()
    0

5、take(num):返回一个由RDD的前n个元素组成的数组。

    >>> rdd = sc.parallelize([i for i in range(10)])
    >>> rdd.take(3)
    [0, 1, 2]

6、takeOrdered(num, key=None):返回该RDD排序后的前n个元素组成的数组。

    >>> rdd = sc.parallelize([5, 3, 2, 9, 7, 6])
    >>> rdd.takeOrdered(10)
    [2, 3, 5, 6, 7, 9]
    >>> rdd.takeOrdered(10, lambda x: -x)
    [9, 7, 6, 5, 3, 2]

7、aggregate(zeroValue, seqOp, combOp):将每个分区里面的元素通过seqOp和zeroValue(初始值)进行聚合,然后用combOp函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    # 案例1
    >>> rdd = sc.parallelize([i for i in range(5)], 2)
    >>> rdd.aggregate(1, lambda x, y: x + y, lambda x, y: x + y)
    13

    # 案例2
    >>> rdd = sc.parallelize(['a', 'b', 'c'], 2)
    >>> rdd.getNumPartitions()
    2
    >>> rdd.aggregate('init', lambda x, y: x + ' |seqOp| ' + y, lambda x, y: x + ' #combOp# ' + y)
    'init #combOp# init |seqOp| a #combOp# init |seqOp| b |seqOp| c'

根据函数定义可知aggregate执行分为两步:(1)在分区内,seqOp函数对zeroValue和RDD原始值进行操作;(2)combOp函数对每个分区的结果进行操作。

在第一个案例中,RDD的元素为0、1、2、3、4,其中0、1在第一个分区,2、3、4在第二个分区。zeroValue的值为1,首先是seqOp对每个分区内部进行操作,第一个分区结果为2(1 + 0  + 1),第二个分区结果为10(1 + 2 + 3 + 4),然后是combOp对每个分区的结果进行操作得到13(1 + 2 + 10)

在第二个案例中,RDD中的元素为a、b、c,其中a在第一个分区,b、c在第二个分区。zeroValue的值为init,首先是seqOp对每个分区内部进行操作,第一个分区结果为init |seqOp| a,第二个分区结果为init |seqOp| b |seqOp| c,然后combOp对每个分区的结果进行操作(将初始值、第一个分区的结果、第二个分区的结果连接起来),init #combOp# init |seqOp| a #combOp# init |seqOp| b |seqOp| c

如果想知道哪个元素在哪个分区,可以参考Spark Transformation算子详解中的mapPartitionsWithIndex()算子的用法。

8、fold(zeroValue, op):aggregate的简化操作,seqOp和combOp一样。

    >>> rdd = sc.parallelize([i for i in range(5)], 2)
    >>> rdd.fold(1, lambda x, y: x + y)
    13

9、countByKey():返回一个(K, Int)的map,表示每一个Key对应的元素个数。

    >>> rdd = sc.parallelize(['a', 'a', 'a', 'b', 'b', 'c'])
    >>> rdd.countByKey()
    defaultdict(<class 'int'>, {'a': 3, 'b': 2, 'c': 1})

    >>> rdd = sc.parallelize([('a', 10), ('a', 20), ('b', 10)])
    >>> rdd.countByKey()
    defaultdict(<class 'int'>, {'a': 2, 'b': 1})

类似的还有countByValue(),不过countByValue()的功能countByKey()也能实现。

10、foreach(func):在数据集的每一个元素上,运行函数func进行更新。

    >>> rdd = sc.parallelize([i for i in range(5)])
    >>> rdd.foreach(lambda x: print(x))
    3
    4
    2
    0
    1

点个赞呗:程序员虾说 » Spark Action算子详解

赞 (0) 打赏

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

请作者喝杯咖啡~

支付宝扫一扫打赏

微信扫一扫打赏