当前位置:   article > 正文

Spark:PySpark的RDD算子操作-基于JupyterNotebook_在jupytet notebook中怎么定义rdd

在jupytet notebook中怎么定义rdd

记录下,方便以后查验

文章较长,请按照目录享用

官网文档

http://spark.apache.org/docs/2.4.5/api/python/pyspark.html#pyspark.RDD

环境

  • jupyternotebook
  • spark2.4.8
  • python3.7
  • jdk8

jupyternotebook整合spark可参考:Spark:JupyterNotebook整合PySpark开发环境

算子操作示例

测试说明

在jupyternotebook中

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("appname").getOrCreate()
sc = spark.sparkContext

# =======================算子处理开始==========================
rdd = sc.parallelize(["hello world", "hello spark"])
rdd2 = rdd.flatMap(lambda line:line.split(" "))
rdd3 = rdd2.map(lambda word:(word, 1))
rdd5 = rdd3.reduceByKey(lambda a, b : a + b)
print(rdd5.collect())
# =======================算子处理结束==========================

sc.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

下方每个算子测试将只展示算子处理部分代码,其中action表示行动算子;transform表示变换算子

first

# first action rdd.first()
# 取第一个
rdd = sc.parallelize([1,2,3])
print(rdd.first()) # 1
  • 1
  • 2
  • 3
  • 4

max、min

# max、min action rdd.max() rdd.min()
# 取最大 最小
rdd = sc.parallelize([1,2,3])
print(rdd.max()) # 3
print(rdd.min()) # 1
  • 1
  • 2
  • 3
  • 4
  • 5

sum

# sum action rdd.sum() 
# 求和
rdd = sc.parallelize([1,2,3])
print(rdd.sum()) # 6
  • 1
  • 2
  • 3
  • 4

take

# take action rdd.take(n) 
# 取前n个
rdd = sc.parallelize([1,2,3])
print(rdd.take(2)) # [1, 2]	
  • 1
  • 2
  • 3
  • 4

top

# top action rdd.top(n) 
# 取排序后前n个 先降序排列
rdd = sc.parallelize([1,2,3])
print(rdd.top(2)) # [3, 2]
  • 1
  • 2
  • 3
  • 4

count

# count action rdd.count() 
# 计数
rdd = sc.parallelize([1,2,3])
print(rdd.count()) # 3
  • 1
  • 2
  • 3
  • 4

collect

# collect action rdd.collect() 
# 把数据转化为数组收集到Driver,注意数据大小
rdd = sc.parallelize([1,2,3])
print(rdd.collect()) # [1,2,3]
  • 1
  • 2
  • 3
  • 4

collectAsMap

# collectAsMap action rdd.collectAsMap() 与collect类似,适用于键值对的rdd 
rdd = sc.parallelize([("a",2),("b", 3),("a", 2)])
m = rdd.collectAsMap()
print(type(m)) # <class 'dict'>
print(m["a"]) # 2
  • 1
  • 2
  • 3
  • 4
  • 5

countByKey

# countByKey action 统计每个K的数量 items()转换成字典 sorted 按字典排序
rdd = sc.parallelize([("a",2),("b", 3),("a", 2)])
print(sorted(rdd.countByKey().items())) # [('a', 2), ('b', 1)]
  • 1
  • 2
  • 3

glom

# glom transform 把分区中的 T 转换成Array[T],这样一个分区只有一个Array元素
rdd = sc.parallelize([1,2,3,4,5,6], 3)
print(rdd.glom().collect()) # [[1, 2], [3, 4], [5, 6]]
  • 1
  • 2
  • 3

coalesce

# coalesce(numPartitions,[isShuffle=False]) transform 重新划分分区数 glom是为了展示方便查看重新分区效果
# 如果减少分区建议coalesce(numPartitions, False)避免shuffle。
# repartition()就是coalesce(numPartitions, True)
rdd = sc.parallelize([1,2,3,4,5,6])
print(rdd.coalesce(2, True).glom().collect()) # [[1, 4, 5, 6], [2, 3]]
  • 1
  • 2
  • 3
  • 4
  • 5

combineByKey

# combineByKey transform 高级函数 6个参数 就是groupByKey和reduceByKey的底层API
# createCombiner:V => C, 接受一个函数参数,把当前值做些初始化后提交给第二个参数
# mergeValue: (C, V) => C,  接受一个函数参数,把当前值V合并到C上(相当于C是个累积值)各个分区各自合并
# mergeCombiners:(C, C) => C, 接受一个函数参数, 把所有分区的C合并
# partitioner: Partitioner, 分区函数
# mapSideCombine: Boolean = True, 是否map端合并
# serializer: Serializer = null 序列化
rdd = sc.parallelize([("a",1),("b",3),("a",2),("b",4)], 2)
def to_list(V):
    print("init combine")
    return [V]
def append(C, V):
    # 单分区combine
    C.append(V)
    return C
def extend(C1, C2):
    # 所有分区combine
    C1.extend(C2)
    return C1
ret_combineByKey = sorted(rdd.combineByKey(to_list, append, extend).collect())
print(ret_combineByKey) # [('a', [1, 2]), ('b', [3, 4])]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

distinct

# distinct transform 去重
rdd = sc.parallelize(["a", "a", "c"])
print(rdd.distinct().collect()) # ['c', 'a']
  • 1
  • 2
  • 3

filter

# filter transform 过滤
rdd = sc.parallelize([1,2,3,4,5,6])
print(rdd.filter(lambda x:x > 3).collect()) # [4, 5, 6]
  • 1
  • 2
  • 3

flatMap

# flatMap transform 按参数函数对每一个元素进行处理并将结果扁平化
rdd = sc.parallelize([1,2,3])
print(rdd.flatMap(lambda x:[x*2]).collect()) # [2, 4, 6]
  • 1
  • 2
  • 3

flatMapValues

# flatMapValues transform 把KV对中的V打散给到函数处理后返回值再与原来的K组成新的KV对
rdd = sc.parallelize([("a",["x", "y", "z"]), ("c", ["k", "w"])])
print(rdd.flatMapValues(lambda x:x).collect()) # [('a', 'x'), ('a', 'y'), ('a', 'z'), ('c', 'k'), ('c', 'w')]
  • 1
  • 2
  • 3

fold

# fold(value, func) transform 以value为初始化值给到func(a,b)中的a,之后每个元素遍历都经过func处理,a作为累积值,b为当前迭代元素
# 注意返回值不是RDD
rdd = sc.parallelize([1,2,3,4,5])
print(rdd.fold(0, lambda a,b:a + b)) # 15
  • 1
  • 2
  • 3
  • 4

foldByKey

# foldByKey(value, func) transform 相对于fold,此针对的是KV的RDD, 按K给到func处理 注意返回值不是RDD
rdd = sc.parallelize([("a", 1),("b", 2),("a", 3),("b", 4)])
print(rdd.foldByKey(0, lambda a,b:a + b).collect()) # [('b', 6), ('a', 4)]
  • 1
  • 2
  • 3

foreach

# foreach(func) transform  返回值是RDD 必须与原RDD结构一样
rdd = sc.parallelize([("a", 1),("b", 2),("a", 3),("b", 4)])
rdd.foreach(lambda a:print("foreach {0},{1}".format(a[0], a[1])))
  • 1
  • 2
  • 3

foreachPartition

# foreachPartition(func) transform  按分区的元素汇集了后调用func,所以func拿到的是一个list,需要迭代处理
rdd = sc.parallelize([1,2,3,4,5,6], 2)
def fp(iters):
    for x in iters:
        print(x)
rdd.foreachPartition(fp)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

map

# map(func, preservesPartitioning=False) 每个元素执行func并返回一个新的RDD
rdd = sc.parallelize(["a","b","c"])
print(rdd.map(lambda x:(x, 1)).collect()) # [('a', 1), ('b', 1), ('c', 1)]
  • 1
  • 2
  • 3

mapPartitions

# mapPartitions(func, preservesPartitioning=False) 每个分区元素执行func并返回一个新的RDD,func参数是分区中元素list
rdd = sc.parallelize([1,2,3,4,5],2)
def funcmap(items):
    yield sum(items)
print(rdd.mapPartitions(funcmap).collect()) # [3, 12]
  • 1
  • 2
  • 3
  • 4
  • 5

mapPartitionsWithIndex

# mapPartitionsWithIndex(func, preservesPartitioning=False) 每个分区元素执行func并返回一个新的RDD,同时跟踪原始分区索引
rdd = sc.parallelize([1,2,3,4,5,6],3)
def f(index, iter): 
    print(index) #分区索引 0,1,2
    for x in iter:     
        print(x) #1,2;3,4;5,6
    yield index
ret = rdd.mapPartitionsWithIndex(f).sum()
print(ret) # 3 其中3=0+1+2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

mapValues

# mapValues(func) transform 对KV应用func func的返回值会和K组成新的RDD返回
rdd = sc.parallelize([("a", ["hello", "spark", "!"]), ("b", ["cumt"])])
def f(x):
    print(x)
    return len(x)
print(rdd.mapValues(f).collect()) # [('a', 3), ('b', 1)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

groupBy

# groupBy transform 分组
print("groupBy " + "=============================================")
rdd = sc.parallelize([1,2,3,4,5,10])
def f(x):
    return x % 2
result = rdd.groupBy(f).collect()
ret = sorted([(x, sorted(y)) for (x, y) in result])
print(ret) # [(0, [2, 4, 10]), (1, [1, 3, 5])]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

groupByKey

# groupByKey transform rdd.groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)
# 它将RDD中每个键的值分组为单个序列,用numPartitions分区对生成的RDD进行哈希分区,并返回一个新的RDD对象
# 若要分组后聚合,建议用reduceByKey或AggregateByKey 性能更好
print("groupByKey " + "=============================================")
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
ret = rdd.groupByKey() # 会根据a,b分组,把值放到序列中
def f(x):
    return len(x)
print(ret.mapValues(f).collect()) # [('b', 1), ('a', 2)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

keyBy

# keyBy transform rdd.keyBy(func)
# 它通过在RDD上应用函数func,其中将原有RDD中的元素作为Key,该Key通过func函数返回的值作为Value创建一个元组,并返回一个新的RDD对象
print("keyBy " + "=============================================")
rdd = sc.parallelize(range(0,3))
def f(x):
    return x * x
print(rdd.keyBy(f).collect()) # [(0, 0), (1, 1), (4, 2)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

keys

# keys transform rdd.keys()
# 它的作用是获取KV格式的RDD中的Key序列,并返回一个新的RDD对象
print("keys " + "=============================================")
rdd = sc.parallelize([("a",1),("b",2),("a",3)])
print(rdd.keys().collect()) # ['a', 'b', 'a']
  • 1
  • 2
  • 3
  • 4
  • 5

zip

# zip transform rdd.zip(otherRDD)
# 它的作用是将第一个RDD中的元素作为Key,第二个RDD对应的元素作为Value,组合成元素格式为元组的新RDD。
# 这两个参与运算的RDD元素个数应该相同
print("zip " + "=============================================")
rdd_zip1 = sc.parallelize(range(1,6))
rdd_zip2 = sc.parallelize(range(801, 806))
print(rdd_zip1.zip(rdd_zip2).collect()) # [(1, 801), (2, 802), (3, 803), (4, 804), (5, 805)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

zipWithIndex

# zipWithIndex transform rdd.zipWithIndex()
# 它的作用是将RDD中的元素作为Key,Key对应的元素索引作为Value,组合成元素格式为元组的新RDD
print("zipWithIndex " + "=============================================")
rdd = sc.parallelize(["a", "b", "c", "d"], 3)
print(rdd.zipWithIndex().collect()) # [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
  • 1
  • 2
  • 3
  • 4
  • 5

values

# values transform rdd.values()
# 它的作用是将RDD中的元素的value取出,组合成元素格式为元组的新RDD
print("values " + "=============================================")
rdd = sc.parallelize([("a",1),("b",2),("a",3)])
print(rdd.values().collect()) # [1, 2, 3]
  • 1
  • 2
  • 3
  • 4
  • 5

union

# union transform rdd.union(otherRDD)
# 它的作用是将第一个RDD中的元素与第二个RDD对应的元素进行合并,返回新RDD
print("union " + "=============================================")
rdd_union1 = sc.parallelize(range(1,5))
rdd_union2 = sc.parallelize(range(11,15))
print(rdd_union1.union(rdd_union2).collect()) # [1, 2, 3, 4, 11, 12, 13, 14]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

takeOrdered

# takeOrdered transform rdd.takeOrdered(num, key=None)
# 它的作用是从RDD中获取排序后的前num个元素构成的RDD
# 默认按照升序对元素进行排序,但也支持用可选函数进行指定
print("takeOrdered " + "=============================================")
rdd = sc.parallelize(range(2,100))
print(rdd.takeOrdered(5))
print(rdd.takeOrdered(5, key=lambda x: -x))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

takeSample

# takeSample transform rdd.takeSample(withReplacement, num, seed=None)
# 它的作用是从RDD中抽样出固定大小的子数据集合,返回新的RDD。
# 其中,第一个参数withReplacement是一个布尔值,代表元素是否可以多次抽样。
# 参数num代表期望样本的大小作为RDD大小的一部分。参数seed代表随机数生成器的种子
# takeSample操作应该应用在RDD数据不大的情况,由于它会将RDD整个加载到driver端的内存中
print("takeSample " + "=============================================")
rdd = sc.parallelize(range(2,10))
#True代表一个元素可以出现多次 False代表一个元素只能出现1次
print(rdd.takeSample(True, 5, 1)) # [2, 4, 9, 8, 2]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

subtract

# subtract transform rdd.subtract(otherRDD, numPartitions=None)
# 它的作用是从RDD中排除掉otherRDD中的元素,并返回一个新RDD
print("subtract " + "=============================================")
rdd_subtract1 = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
rdd_subtract2 = sc.parallelize([("a", 1), ("b", 5)])
print(rdd_subtract1.subtract(rdd_subtract2).collect()) # [('b', 4), ('a', 3)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

subtractByKey

# subtractByKey transform rdd.subtractByKey(otherRDD, numPartitions=None)
# 它的作用是从RDD中排除掉otherRDD中的元素,只要key一样就排除,并返回一个新RDD
print("subtractByKey " + "=============================================")
rdd_subtractByKey1 = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
rdd_subtractByKey2 = sc.parallelize([("a", 7), ("b", 0)])
print(rdd_subtractByKey1.subtractByKey(rdd_subtractByKey2).collect()) # []
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

stats

# stats action rdd.stats()
# 它的作用是返回一个StatCounter对象,该对象获取到RDD元素的计数、均值,方差、最大和最小值
print("stats " + "=============================================")
rdd = sc.parallelize(range(100))
print(rdd.stats()) # (count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)
  • 1
  • 2
  • 3
  • 4
  • 5

sortBy

# sortBy transform rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
# 它的作用是根据函数keyfunc来对RDD对象元素排序,并返回一个新的RDD
print("sortBy " + "=============================================")
rdd = sc.parallelize([('a', 6), ('f', 2), ('c', 7), ('d', 4), ('e', 5)])
print(rdd.sortBy(lambda x: x[0]).collect()) # [('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
print(rdd.sortBy(lambda x: x[1]).collect()) # [('f', 2), ('d', 4), ('e', 5), ('a', 6), ('c', 7)]
print(rdd.sortBy(lambda x: x[1], False).collect()) # [('c', 7), ('a', 6), ('e', 5), ('d', 4), ('f', 2)]
print(rdd.sortBy(lambda x: x[1], False, 2).collect()) # [('c', 7), ('a', 6), ('e', 5), ('d', 4), ('f', 2)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

sortByKey

# sortByKey transform rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
# 根据函数keyfunc来对RDD对象元素排序,并返回一个新的RDD,但参数keyfunc是一个可选的参数,不提供则按照RDD中元素的Key进行升序排序
print("sortByKey " + "=============================================")
rdd1 = sc.parallelize([('a', 6), ('f', 2), ('c', 7), ('d', 4), ('e', 5)])
print(rdd1.sortByKey(True, 1).collect()) # [('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
def f(x):
    return x[1]
rdd2 = sc.parallelize([(['a','g'], 6), (['f','h'], 2), (['c','m'], 7), (['d','n'], 4), (['e','w'], 5)])
print(rdd2.sortByKey(False, 1, f).collect()) # [(['e', 'w'], 5), (['d', 'n'], 4), (['c', 'm'], 7), (['f', 'h'], 2), (['a', 'g'], 6)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

sample

# sample transform rdd.sample(withReplacement, fraction, seed=None)
# 抽样,withReplacement 为True或False是否放回;fraction 比例(不精确);seed 随机数生成器的种子
print("sample " + "=============================================")
rdd = sc.parallelize(range(100), 1)
print(rdd.sample(False, 0.05, 1).collect())
  • 1
  • 2
  • 3
  • 4
  • 5

repartition

# repartition transform rdd.repartition(numPartitions)
# 按数据进行重新分区,重新分配过程中,可以增加或减少此RDD中的并行度
# 如果要减少此RDD中的分区数,请使用coalesce操作,可避免执行shuffle操作,效率也更高
print("repartition " + "=============================================")
rdd = sc.parallelize([1,2,3,4,5,6,7], 3)
print(rdd.glom().collect()) # [[1, 2], [3, 4], [5, 6, 7]]
print(rdd.repartition(2).glom().collect()) # [[1, 2, 5, 6, 7], [3, 4]]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

reduce

# reduce transform rdd.reduce(func)
# 按照函数func的逻辑对RDD中的元素进行运算。空RDD会报错ValueError
print("reduce " + "=============================================")
rdd = sc.parallelize([1,2,3,4,5])
def f(x, y):
    return x + y
print(rdd.reduce(f)) # 15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

reduceByKey

# reduceByKey transform rdd.reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
# 按照函数func的逻辑对KV格式的RDD中的元素进行运算。空RDD会报错ValueError
print("reduceByKey " + "=============================================")
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b", 3)])
def f(x, y):
    return x + y
print(rdd.reduceByKey(f).collect()) # [('b', 4), ('a', 3)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

randomSplit

# randomSplit transform rdd.randomSplit(weights, seed=None)
# 按照权重weights对RDD进行随机分割,并返回多个RDD构成的列表
print("randomSplit " + "=============================================")
rdd_randomSplit = sc.parallelize(range(100), 1)
rdd1, rdd2 = rdd_randomSplit.randomSplit([2, 3], 10)
print(len(rdd1.collect()), len(rdd2.collect())) # 40 60
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

lookup

# lookup transform rdd.lookup(key)
# 根据key值从RDD中查找到相关的元素,返回RDD中键值的值列表
print("lookup " + "=============================================")
a = range(100)
rdd_lookup1 = sc.parallelize(zip(a, a), 2)
print(rdd_lookup1.lookup(32)) # [32]
rdd_lookup2 = sc.parallelize([('a', 'b'), ('c', 'd')])
print(rdd_lookup2.lookup('a')) # ['b']
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

join

# join transform rdd.join(otherRDD, numPartitions=None)
# inner join.每对元素将以(k,(v1,v2))元组返回,其中(k,v1)在自身中,而(k,v2)在另一个otherRDD中
print("join " + "=============================================")
rdd_join_x = sc.parallelize([("a", 1), ("b", 4)])
rdd_join_y = sc.parallelize([("a", 2), ("a", 3)])
print(rdd_join_x.join(rdd_join_y).collect()) # [('a', (1, 2)), ('a', (1, 3))]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

intersection

# intersection transform rdd.intersection(otherRDD)
# 取交集,会去重
print("intersection " + "=============================================")
rdd_intersection_x = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd_intersection_y = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd_intersection_x.intersection(rdd_intersection_y).collect()) # [1, 2, 3]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

fullOuterJoin

# fullOuterJoin transform rdd.fullOuterJoin(otherRDD, numPartitions=None)
# 全外连接 full join.如果另外一个otherRDD匹配不到,则生成的RDD元素格式为(k,(v,None))
print("fullOuterJoin " + "=============================================")
rdd_fullOuterJoin_x = sc.parallelize([("a", 1), ("b", 4)])
rdd_fullOuterJoin_y = sc.parallelize([("a", 2), ("c", 3)])
print(rdd_fullOuterJoin_x.fullOuterJoin(rdd_fullOuterJoin_y).collect()) # [('a', (1, 2)), ('b', (4, None)), ('c', (None, 3))]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

leftOuterJoin

# leftOuterJoin transform rdd.leftOuterJoin(otherRDD, numPartitions=None)
# 左外连接 left join.如果另外一个otherRDD匹配不到,则生成的RDD元素格式为(k,(v,None))
print("leftOuterJoin " + "=============================================")
rdd_leftOuterJoin_x = sc.parallelize([("a", 1), ("b", 4)])
rdd_leftOuterJoin_y = sc.parallelize([("a", 2), ("c", 3)])
print(rdd_leftOuterJoin_x.leftOuterJoin(rdd_leftOuterJoin_y).collect()) # [('a', (1, 2)), ('b', (4, None))]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

rightOuterJoin

# rightOuterJoin transform rdd.rightOuterJoin(otherRDD, numPartitions=None)
# 右外连接 right join.如果RDD匹配不到,则生成的RDD元素格式为(k,(v,None))
print("rightOuterJoin " + "=============================================")
rdd_rightOuterJoin_x = sc.parallelize([("a", 1), ("b", 4)])
rdd_rightOuterJoin_y = sc.parallelize([("a", 2), ("c", 3)])
print(rdd_rightOuterJoin_x.rightOuterJoin(rdd_rightOuterJoin_y).collect()) # [('a', (1, 2)), ('c', (None, 3))]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

aggregate

# aggregate action rdd.aggregate(zeroValue,seqOp,combOp)
# 用给定的seqOp函数和给定的零值zeroValue来聚合每个分区上的元素,然后再用combOp函数和给定的零值zeroValue汇总所有分区的结果
print("aggregate " + "=============================================")
rdd = sc.parallelize([1,3,5,7,9,11,13,15,17], 2)
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
print(rdd.aggregate((0, 0), seqOp, combOp)) # (81, 9)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

aggregateByKey

# aggregateByKey transform rdd.aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=<function portable_hash>)
# 其中zeroValue代表每次按Key分组之后的每个组的初始值。seqFunc函数用来对每个分区内的数据按照key分别进行逻辑计算。combFunc对经过seqFunc处理过的数据按照key分别进行逻辑计算
print("aggregateByKey " + "=============================================")
rdd = sc.parallelize([("a",1),("b",2),("a",3),("b",4),("a",5),("b",6),("a",7),("b",8),("a",9),("b",10)], 2)
seqFunc = (lambda x, y: x + y)
combFunc = (lambda x, y: x + y)
print(rdd.aggregateByKey(0, seqFunc, combFunc).collect()) # [('b', 30), ('a', 25)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

cartesian

# cartesian transform rdd.cartesian(otherRDD)
# 笛卡尔积
print("cartesian " + "=============================================")
rdd_cartesian_x = sc.parallelize([1, 2])
rdd_cartesian_y = sc.parallelize([3, 7])
print(sorted(rdd_cartesian_x.cartesian(rdd_cartesian_y).collect())) # [(1, 3), (1, 7), (2, 3), (2, 7)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

cache

# cache action rdd.cache()
# 缓存处理,提高效率。默认存储级别(MEMORY_ONLY)
# 还一个rdd.persist(storageLevel)可以设置缓存级别的缓存操作
print("cache " + "=============================================")
rdd = sc.parallelize(range(1000))
rdd.cache()
print(rdd_cache.max()) # 999
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

saveAsTextFile

# saveAsTextFile action rdd.saveAsTextFile(path, compressionCodecClass=None)
# 保存RDD对象为一个文件,其中元素以字符串的形式体现.而compressionCodecClass参数用于压缩,默认为“org.apache.hadoop.io.compress.GzipCodec”
print("saveAsTextFile " + "=============================================")
rdd = sc.range(1,10,2) #[1, 3, 5, 7, 9]
# spark-rdd是目录,且不能存在此目录
rdd.saveAsTextFile("D://a/spark-rdd3")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

共享变量-广播变量

# 广播变量 SparkContext.broadcast(v)
print("broadcast " + "=============================================")
broadcast_v = {"ip":"192.168.1.1","key":"mmm"}
# 广播
brVar = sc.broadcast(broadcast_v) 
# 获取广播变量值
print(brVar.value) # {'ip': '192.168.1.1', 'key': 'mmm'}
# 更新广播变量
broadcast_v["key"] = "yyy"
# 再次广播
brVar =  sc.broadcast(broadcast_v)
# 删除广播(销毁)
brVar.destroy()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

共享变量-累加器

# 累加器 SparkContext.accumulator(v)
print("accumulator " + "=============================================")
rdd = sc.range(1,101)
#创建累加器,初始值0
acc = sc.accumulator(0)
def f(x):
    global acc
    if x % 2 == 0:
        acc += 1 # 不能 -=
rdd_counter =  rdd.map(f)
print(acc.value) # 0
print(rdd_counter.count()) # 100
# 保证多次正确获取累加器值 (用行动算子来触发执行)
rdd_counter.persist()
print(rdd_counter.count()) # 100
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/556901
推荐阅读
相关标签
  

闽ICP备14008679号