赞
踩
创建:RDD = sc.parallelize(data) or sc.textFile(file path)
RDD = sc.textFile(json file path).map(lambda s : json.loads(s))
Transformation:
针对键值对RDD
rdd = sc.parallelize(data,2)设置2个分区
广播变量broadcastvar = sc.broadcast([1, 2, 3])
DataFrame创建:读取文件创建
>>> spark=SparkSession.builder.getOrCreate() >>> df = **spark.read.json**("file:///usr/local/spark/examples/src/main/resources/people.json") >>> df.show() `>>> df.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true) // 选择多列 >>> df.select(df.name,df.age + 1).show() +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+ // 条件过滤 >>> df.filter(df.age > 20).show() // 分组聚合 >>> df.groupBy("age").count().show() // 排序 >>> df.sort(df.age.desc()).show() //多列排序 >>> df.sort(df.age.desc(), df.name.asc()).show() //对列进行重命名 >>> df.select(df.name.alias("username"),df.age).show()``
DataFrame创建:RDD转换—Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用映射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。
利用映射机制推断RDD模式:会用到toDF()方法
>>> from pyspark.sql.types import Row
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。