赞
踩
spark3推荐使用sparksession来创建spark会话,然后利用使用sparksession创建出来的application来创建dataframe。
下面是两种创建方式,效果是相同的:
conf = SparkConf().setAppName('featureEngineering').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
Samples = spark.read.format('csv').option('header', 'true').load(ResourcesPath)
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
对于所有的spark功能,sparksession类都是入口,所以创建基础的sparksession只需要使用sparksession.builder()。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples.select("movieId").show()
collect方法将已经存储的dataframe数据从存储器中收集回来,并返回一个数组,包括datafame中的所有行。但是,当数据集很大或者分区数据集很大时,很容易让驱动器崩溃。数据收集到驱动器中进行计算,就不是分布式并行计算了,而是串行计算,会更慢,所以,除了常看小数据,一般吧建议使用。
count方法用来计算数据集dataframe中行的个数,返回dataframe集合的行数。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
print(movieSamples.count())
limit()限制输出,只保留Top_N,不是Action操作。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples.limit(5).show()
效果和下面的代码是一样的
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples.show(5)
distinct方法用来去除数据集中的重复项,返回一个不包含重复记录的dataframe。这里的重复项指的是两行的数据完全相同。
该方法和dropDuplicates()方法不传入指定字段时的结果相同。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples1=movieSamples.distinct()
movieSamples1.show()
filter方法是一个常用的方法,用条件来过滤数据集,如果想选择某列中大于或小于某数的数据,就可以使用filter方法。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples1=movieSamples.filter("id>3")
movieSamples1.show()
flatMap是对dataframe中的数据进行整体操作的一个特殊方法。flatMap方法首先将函数应用于此数据集的所有元素,然后将结果展平,从而返回一个新的数据集。
map方法可以对dataframe数据集中的数据进行逐个操作,他与flatMap的不同之处在于,flatMap是将数据集中的数据作为一个整体去处理,之后再对其中的数据做计算,map则是直接对数据集中的数据做单独处理。
在这里插入代码片
groupBy 方法是将传入的数据进行分组,依据是作为参数传入的计算方法。一般与agg配合使用,例如groupBy(“id”).agg({“vale”:“max”}表示按照id进行分组,在每一组中选出Vale最大的值。max可替换成其他的函数,比如max,min,mean,sum,count等等。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples.groupBy("id").agg({"vale":"max"}).show()
drop方法从数据集中删除某列,然后返回dataFrame类型。
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples.drop("moiveId")
sort方法对已有的dataframe重新排序,并将重新排序后的数据生成一个新的dataframe
if __name__ == '__main__':
movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
movieSamples.sort("id").show()
数据类型转换
F.col("movieId").cast(IntegerType())
F.avg
F.count
MinMaxScaler(inputCol="avgRatingVec", outputCol="scaleAvgRating")
pipelineStage = [ratingScaler]
featurePipeline = Pipeline(stages=pipelineStage)
movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。