赞
踩
作为数据挖掘工程师,以后必不可免要用到并行计算,pyspark是python操作spark的API,本人因此入了坑。
见我另一篇博客:https://blog.csdn.net/qq_23860475/article/details/90476197
Spark 允许用户读取、转换和 聚合数据,可以轻松地训练和部署复杂的统计模型。Spark 支持Java、Scala、Python、R和SQL通过相应API进行访问。Spark还提供有几个已经实现并调优过的算法、统计模型和框架,如用于机器学习的MLlib和ML,用于图形处理的GraphX和GraphFrames,用于处理实时流数据的Spark Streaming。
使用Spark核心API的应用以SparkContext对象作为程序主入口,而Spark SQL应用则以SparkSession对象作为程序主入口,在Spark2.0发布之前,Spark SQL应用使用的专用主入口是SQLContext和HiveContext。SparkSession把它们封装为一个简洁而统一的入口。
SparkSession和sparkContext初始化:
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
RDD(Resilient Distributed Dataset)即弹性分布式数据集。RDD是Spark编程中最基本的数据对象,无论是最初加载的数据集,还是任何中间结果的数据集,或是最终的结果数据集,都是RDD。大多数Spark应用从外部加载数据到RDD,然后对已有的RDD进行操作来创建新的RDD,这些操作就是转化操作(返回指向新RDD的指针)。这个过程不断重复,直到需要进行输出操作为止,这种操作则是行动操作(在运行计算后向驱动程序返回值)。
RDD本质上是对象分布在各节点上的集合,用来表示Spark程序中的数据。在pyspark中,RDD是由分布在各节点上python对象组成的,这里的对象可以是列表、元组、字典等。如果使用Scala或Java的API,RDD则分别有Scala或Java对象组成。
对弹性分布式数据集这一术语进行拆解描述:
弹性:RDD是由弹性的,意思是说如果Spark中一个执行任务的节点丢失了,数据集依然可以被重建起来。这是因为Spark有每个RDD的谱系,也就是从头构建RDD的步骤。
分布式:RDD是分布式的,RDD中的数据被分到至少一个分区中,在集群上跨工作节点分布式地作为对象集合保存在内存中。
数据集:RDD是由记录组成的数据集。记录是数据集中可以唯一区分的数据的集合。一条记录可以是由几个字段组成的,这类似于关系型数据库里面表中的行,或是文件中的一行文本,或其他的一些格式中类似的结构。RDD的各分区包含不同的一部分记录,可以独立进行操作。
常见的RDD基础操作属性函数如下表所示:
操作类型 | 函数名 | 作用 |
转化操作 | map() | 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD |
flatMap() | 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD | |
filter() | 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD | |
distinct() | 没有参数,将RDD里的元素进行去重操作 | |
union() | 参数是RDD,生成包含两个RDD所有元素的新RDD | |
intersection() | 参数是RDD,求出两个RDD的共同元素 | |
subtract() | 参数是RDD,将原RDD里和参数RDD里相同的元素去掉 | |
cartesian() | 参数是RDD,求两个RDD的笛卡儿积 | |
行动操作 | collect() | 返回RDD所有元素 |
count() | RDD里元素个数 | |
countByValue() | 各元素在RDD中出现次数 | |
reduce() | 并行整合所有RDD数据,例如求和操作 | |
fold(0)(func) | 和reduce功能一样,不过fold带有初始值 | |
aggregate(0)(seqOp,combop) | 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 | |
foreach(func) | 对RDD每个元素都是使用特定函数 |
文件系统协议与URL结构
文件系统 | URL结构 |
本地文件系统 | file:///path |
HDFS* | hdfs:///hdfs_path |
Amazon S3* | s3://bucket/path |
OpenStack Swift | swift://container.PROVIDER/path |
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- #读取整个目录下的内容
- logs=sc.textFile("hdfs:///demo/data/website-Logs/")
- #读取单个文件
- logs=sc.textFile("hdfs:///demo/data/website-Logs/IB_websitelogLog_001.txt")
- #使用通配符读取文件
- logs=sc.textFile("hdfs:///demo/data/website-Logs/*_001.txt"")
- #把整个目录的内容加载为键值对
- logs=sc.wholeTextFiles("hdfs:///demo/data/website-Logs/")
- spark.stop()
注意:使用textFile()方法读入一个目录下的所有文件时,每个文件的每一行都成为了一条单独的记录,而该行数据属于哪个文件的信息没有保留。用wholeTextFile()方法来读取包含多文件的整个目录,每个文件会作为一条记录,其中文件名是记录的键,而文件的内容是记录的值。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- #通过parallelize
- rdd= sc.parallelize([1,2,3,4,5,6,7])
- #通过range
- rdd=sc.range(1,8,1,2)#sc.range(start,end=None,step=1,numslices=None),numslices指定所需分区数量
- spark.stop()
将RDD里的元素以列表形式返回
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd= sc.parallelize([1,2,3,4,5,6,7])
- print(rdd.collect())
- spark.stop()
运行结果:
map()类似于Python中的map,针对RDD对应的列表的每一个元素,进行map()函数里面的lambda函数(这个函数是map函数的一个参数)对应的操作,返回的仍然是一个RDD对象。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- rdd2=rdd1.map(lambda x:x**2)
- print(rdd2.collect())
- spark.stop()
运行结果:
reduce()是针对RDD对应的列表中的元素,递归地选择第一个和第二个元素进行操作,操作的结果作为一个元素用来替换这两个元素,注意,reduce返回的是一个Python可以识别的对象,非RDD对象。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- rdd2=rdd1.reduce(lambda x,y:x+y)
- print(rdd2)
- spark.stop()
运行结果:
reduce()最终只返回一个值,reduceByKey()和reduceByKeyLocally()均是将Key相同的元素合并。
区别在于,reduce()和reduceByKeyLocally()函数均是将RDD转化为非RDD对象,而reduceByKey()将RDD对象转化为另一个RDD对象,需要collect()函数才能输出。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([[1,10],[1,1],[2,100],[2,1]])
- rdd2=rdd1.reduceByKey(lambda x,y:x+y)
- rdd3=rdd1.reduceByKeyLocally(lambda x,y:x+y)
- print(rdd2.collect())
- print(rdd3)
- spark.stop()
运行结果:
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- rdd2=rdd1.flatMap(lambda x:(x,x**2))
- print(rdd2.collect())
- spark.stop()
运行结果:
可以看出map和flatMap的区别,前者是用单个元素替换原来的单个元素,后者直接用多个元素替换单个元素。
filter()用于删除/过滤,即删除不满足条件的元素,这个条件一lambda函数的形式作为参数传入filter()函数中,返回rdd对象。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- rdd2=rdd1.filter(lambda x:x%2==0)#过滤掉奇数,保留偶数
- print(rdd2.collect())
- spark.stop()
运行结果:
distinct()用于去重,没有参数,返回RDD。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,1,3,3,4,4,5])
- rdd2=rdd1.distinct()
- print(rdd2.collect())
- spark.stop()
运行结果:
用于匹配元素,类似sql中的leftjoin。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([[1,10],[1,1],[2,100],[2,1]])
- rdd2= sc.parallelize([[1,11],[1,12],[2,101],[2,102]])
- rdd3=rdd1.join(rdd2)
- print(rdd3.collect())
- spark.stop()
运行结果:
union()求两个RDD对象的所有元素的并,不去掉重复元素。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4])
- rdd2= sc.parallelize([3,4,5,6])
- rdd3=rdd1.union(rdd2)
- print(rdd3.collect())
- spark.stop()
运行结果:
intersection()求交集
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4])
- rdd2= sc.parallelize([3,4,5,6])
- rdd3=rdd1.intersection(rdd2)
- print(rdd3.collect())
- spark.stop()
运行结果:
sortByKey()按键值进行排序
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([('B',2),('A',1),('C',3)])
- rdd2= rdd1.sortByKey()
- print(rdd2.collect())
- spark.stop()
运行结果:
根据函数进行排序。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize(['mouse','cat','dog'])
- def f(x):return x[0]
- rdd2= rdd1.sortBy(f)
- print(rdd2.collect())
- spark.stop()
运行结果:
glom() 显示分区结果。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7],3)
- rdd2= rdd1.glom()
- print(rdd2.collect())
- spark.stop()
运行结果:
cartesian()组合元素。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize(['A','B'])
- rdd2= sc.parallelize(['C','D'])
- rdd3= rdd1.cartesian(rdd2)
- print(rdd3.collect())
- spark.stop()
运行结果:
groupBy()对元素按给定条件(函数)进行分组。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- rdd2= rdd1.groupBy(lambda x:x%2).collect()
- result=[(x,sorted(y)) for x,y in rdd2]
- print(result)
- spark.stop()
运行结果:
foreach()对应所有元素应用给定函数,不返回值
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- def f(x):return print(x)
- rdd1.foreach(f)
- spark.stop()
foreachPartition()对RDD每个分区应用给定函数
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd1= sc.parallelize([1,2,3,4,5,6,7])
- def f(iterator):
- for x in iterator:
- print(x)
- rdd1.foreachPartition(f)
- spark.stop()
DataFrame是Spark RDD的抽象。然而,DataFrame有别于原生RDD,区别在于DataFrame维护表结构,并且对许多常见的SQL函数和关系型操作符提供了原生支持。而DataFrame和RDD的相似之处包括它们都是作为DAG求值,都使用惰性求值,并且都提供了谱系和容错性。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- sc=spark.sparkContext
- rdd= sc.parallelize([('John',30),('Mary',78)])
- dataframe=spark.createDataFrame(rdd,['name','age'])
- print(dataframe.collect())
- spark.stop()
运行结果:
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78)]
- dataframe=spark.createDataFrame(data,['name','age'])
- print(dataframe.collect())
- spark.stop()
运行结果:
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=spark.read.json(".../data.json")
- spark.stop()
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=spark.read.csv(path="file:///.../data.csv",schema=None,sep=",",header=True)
- spark.stop()
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- #读取整个目录下的内容
- df=spark.read.text("hdfs:///demo/data/website-Logs/")
- #读取单个文件
- df=spark.read.text("hdfs:///demo/data/website-Logs/IB_websitelogLog_001.txt")
- spark.stop()
获取字段名。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- print(dataframe.columns)
- spark.stop()
运行结果:
返回字段名跟数据类型。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- print(dataframe.dtypes)
- spark.stop()
take(n)返回前n行数据。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- dataSub=dataframe.take(2)
- print(dataSub)
- spark.stop()
运行结果:
show(n)将前n行打印到控制台上,与collect()和take(n)不同,show()并不把结果返回到变量。show()命令输出格式比较好看,它会以表格形式呈现结果,并且包含表头,可读性很高。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- dataSub=dataframe.show(2)
- spark.stop()
运行结果:
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- newData=dataframe.select("name","age").filter("age>20")
- newData.show()
- spark.stop()
运行结果:
删除列
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- newData=dataframe.drop('name')
- newData.show()
- spark.stop()
运行结果:
删除重复行,当一行数据的所有列的值都与另一行相同,我们就把它看作重复的行。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- newData=dataframe.distinct()
- newData.show()
- spark.stop()
运行结果:
registerTempTable()创建临时表
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- dataframe=spark.createDataFrame(data,['name','age'])
- dataframe.registerTempTable('stuInfo')#创建并登陆临时表
- spark.sql("select * from stuInfo where age>20").show()#使用sql语句查询
- spark.stop()
运行结果:
join()将当前DataFrame与其他DataFrame做连接操作,on参数指定一个列、一组列、或者一个表达式,用于连接操作的求值。how参数指定要执行的连接类型。有效的值包括inner(默认值)、outer、left_outer、right_outer和leftsemi。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data1=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- data2=[('John',1),('Mary',0),('Jones',0),('Mike',1)]
- df1=spark.createDataFrame(data1,['name','age'])
- df2=spark.createDataFrame(data2,['name','sex'])
- df3=df1.join(df2,on='name',how='left_outer')#按name字段匹配
- df3.show()
- spark.stop()
运行结果:
orderBy()根据指定列进行排序。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('Mary',78),('Jones',20),('Mike',10)]
- df1=spark.createDataFrame(data,['name','age'])
- df2=df1.orderBy(['age'],ascending=True)
- df2.show()
- spark.stop()
运行结果:
groupBy() 按照指定列进行分组。
- from pyspark.sql import SparkSession
- spark=SparkSession.builder.master("local").appName("test").getOrCreate()
- data=[('John',30),('John',78),('Mike',20),('Mike',10)]
- df1=spark.createDataFrame(data,['name','age'])
- df2=df1.groupBy(['name']).sum('age')
- df2.show()
- spark.stop()
运行结果:
如果对你有帮助,请点下赞,予人玫瑰手有余香!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。