当前位置:   PYTHON > 正文

spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

python,apache-spark,pyspark,scala,spark,DevBox,在线流程图,编程,编程问答,程序员,开发者工具,开发工具,json解析,二维码生成,unix时间戳,在线开发工具,前端开发工具,开发人员工具,站长工具

据我所知,distinct()散列分区RDD以识别唯一键.但它是否优化了每个分区只移动不同的元组?

想象一下具有以下分区的RDD

    [1,2,2,1,4,2,2]

    [1,3,3,5,4,5,5,5]

在这个RDD的一个独特的地方,所有重复的密钥(分区1中的2s和分区2中的5s)是否会被混洗到它们的目标分区,或者只有每个分区的不同密钥被洗牌到目标?

如果所有键都被洗牌,那么带有set()操作的aggregate()将减少shuffle.

def set_update(u, v):
    u.add(v)
    return u
rdd.aggregate(set(), set_update, lambda u1,u2: u1|u2)

zero323.. 6

unique通过实现 reduceByKey(element, None)对.因此,它每个分区只会刷新唯一值.如果重复数量很少,那么仍然是相当昂贵的操作.

有些情况下使用set可能很有用.特别是如果你打电话distinct,PairwseRDD你可能更喜欢aggregateByKey/ combineByKey同时通过键同时实现重复数据删除和分区.特别考虑以下代码:

rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)])
rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")])
rdd1.distinct().join(rdd2)

它必须洗牌rdd1两次 - 一次换distinct一次换一次join.相反,你可以使用combineByKey:

def flatten(kvs):
    (key, (left, right)) = kvs
    for v in left:
        yield (key, (v, right))

aggregated = (rdd1
    .aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2))

rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions())

(aggregated.join(rdd2_partitioned)
    .flatMap(flatten))

注意:

joinScala中的逻辑与Python中的稍微不同(PySpark使用union后跟groupByKey,参见Spark RDD groupByKey +加入vs连接性能的Python和Scala DAG),因此我们必须RDD在调用join之前手动分区第二个.



1> zero323..:

unique通过实现 reduceByKey(element, None)对.因此,它每个分区只会刷新唯一值.如果重复数量很少,那么仍然是相当昂贵的操作.

有些情况下使用set可能很有用.特别是如果你打电话distinct,PairwseRDD你可能更喜欢aggregateByKey/ combineByKey同时通过键同时实现重复数据删除和分区.特别考虑以下代码:

rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)])
rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")])
rdd1.distinct().join(rdd2)

它必须洗牌rdd1两次 - 一次换distinct一次换一次join.相反,你可以使用combineByKey:

def flatten(kvs):
    (key, (left, right)) = kvs
    for v in left:
        yield (key, (v, right))

aggregated = (rdd1
    .aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2))

rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions())

(aggregated.join(rdd2_partitioned)
    .flatMap(flatten))

注意:

joinScala中的逻辑与Python中的稍微不同(PySpark使用union后跟groupByKey,参见Spark RDD groupByKey +加入vs连接性能的Python和Scala DAG),因此我们必须RDD在调用join之前手动分区第二个.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/PYTHON/detail/3363
推荐阅读
相关标签
  

闽ICP备14008679号