赞
踩
# -*- coding: utf-8 -*- """ (C) rgc All rights reserved create time '2021/5/30 20:06' Usage: # 每个元素进行操作 map: 对rdd每个元素(包括key,value) 进行 数据转换操作,rdd的元素个数不能改变 mapValues: 对rdd每个元素的values进行 数据转换操作,rdd的元素个数不能改变; 其实 每个元素 格式大致为: RDD( ('key1',[val1,val2]),('key2',[val11,val21]) ) 而 mapValues就是 通过一个函数对 每个元素的values作为参数传给函数;如 sum([val1,val2]); 所以 mapValues是对每个元素的values整体进行的操作;而不像reduceByKey是对每个元素的values进行迭代操作; """ # 构建spark from pyspark.conf import SparkConf from pyspark.context import SparkContext conf = SparkConf() # 使用本地模式;且 executor设置为1个方便debug conf.setMaster('local[1]').setAppName('rgc') sc = SparkContext(conf=conf) rdd = sc.parallelize([2, 1, 3, 4, 4], 1) def map_func(x: int) -> tuple: """ 将每个元素转为元祖 :param x: rdd中每个元素 :return: """ return (x, 1) # map操作 map_rdd = rdd.map(map_func).cache() print(map_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] # mapValues操作 # 数据结构转换方式:[(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] => [(2,[1]),(1,[1]),(3,[1]),(4,[1,1])] temp_rdd = rdd.map(map_func).groupByKey() print(temp_rdd) map_values_rdd = temp_rdd.mapValues(len) print(map_values_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 2)] def map_values_func(x): """ 对values进行集合去重操作 :param x: :return: """ print(list(x), 'groupByKey结果中 每个元素的values转为List') return set(x) # 对每个元素的values进行 集合并集操作 map_values_rdd2 = temp_rdd.mapValues(map_values_func) print(map_values_rdd2.collect()) # [(2, {1}), (1, {1}), (3, {1}), (4, {1})]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。