赞
踩
第一步:PySpark 应用程序从初始化开始,SparkSession
这是 PySpark 的入口点
- from pyspark.sql import SparkSession
-
- spark = SparkSession.builder.getOrCreate()
第二步:创建DataFrame,三种方式
DataFrame是在Spark 1.3中正式引入的一种以RDD为基础的不可变的分布式数据集,类似于传统数据库的二维表格,数据在其中以列的形式被组织存储。如果熟悉Pandas,其与Pandas DataFrame是非常类似的东西。
- #从行列表创建 PySpark DataFrame
- from datetime import datetime, date
- import pandas as pd
- from pyspark.sql import Row
-
- df = spark.createDataFrame([
- Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
- Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
- Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
- ])
- #从 pandas DataFrame 创建 PySpark DataFrame
- pandas_df = pd.DataFrame({
- 'a': [1, 2, 3],
- 'b': [2., 3., 4.],
- 'c': ['string1', 'string2', 'string3'],
- 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
- 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
- })
- df = spark.createDataFrame(pandas_df)
- #从包含元组列表的 RDD 创建 PySpark DataFrame
- rdd = spark.sparkContext.parallelize([
- (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
- (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
- (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
- ])
- df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])

工作中读取数据的方式
- #普通读取csv为DataFrames数据
- # 读取csv为DataFrame
- traffic = spark.read.csv('Pokemon.csv', header='true')
- # 创建临时表
- traffic.createOrReplaceTempView("traffic")
-
- #通过pandas辅助读取csv
- import pandas as pd
-
- df = pd.read_csv('Pokemon.csv')
- traffic = spark.createDataFrame(df)
- traffic.createOrReplaceTempView("traffic")
备注由于Pokemon.csv这个文件中有空值,所以spark.createDataFrame()会失败的,但是使用第种方式读取就行了
RDD是一个抽象类,支持多种类型,弹性分布式数据集,其特点:一个RDD由多个分区/分片组成,对RDD进行一个函数操作,会对RDD的所有分区都执行相同函数操作,一个RDD依赖于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,若RDD1中某节点数据丢失,后面的RDD会根据前面的信息进行重新计算,对于Key-Value的RDD可以制定一个partitioner,告诉他如何分片。常用hash/range,移动数据不如移动计算,注:移动数据,不如移动计算。考虑磁盘IO和网络资源传输等
- from pyspark import SparkConf, SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
- sc = SparkContext(conf=conf)
- '''
- map:
- map(func)
- 将func函数作用到数据集的每个元素上,生成一个新的分布式数据集返回
- '''
- print("***************************map***************************")
- def my_map():
- # 创建一个序列
- data = [1,2,3,4,5]
- # 将序列转换为RDD
- rdd1 = sc.parallelize(data)
- # 使用函数对RDD进行作用,生成RDD2
- rdd2 = rdd1.map(lambda x:x*2)
- # 使用collect()讲结果输出
- print(rdd2.collect())
-
- my_map()
-
- def my_map2():
- a = sc.parallelize(["dog","tiger","lion","cat","panter","eagle"])
- b = a.map(lambda x:(x,1)) #进来一个x,返回一个(x,1)的形式
- print(b.collect())
- my_map2()
- print("***************************filter***************************")
- def my_filter():
- #给一个数据
- data = [1,2,3,4,5]
- rdd1 = sc.parallelize(data)
- mapRdd = rdd1.map(lambda x:x**2)
- filterRdd = mapRdd.filter(lambda x:x>5)
- print(filterRdd.collect())
- '''
- filter:
- filter(func)
- 返回所有func返回值为true的元素,生成一个新的分布式数据集返回
- '''
- def my_filter():
- data = [1,2,3,4,5]
- rdd1 = sc.parallelize(data)
- mapRdd = rdd1.map(lambda x:x*2)
- filterRdd = mapRdd.filter(lambda x:x > 5)
- print(filterRdd.collect())
- print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
- my_filter()
- print("***************************flatMap()***************************")
- #Wordcount第一步:
- def my_flatMap():
- #flatMap,将东西压扁/拆开 后做map
- data = ["hello spark","hello world","hello world"]
- rdd = sc.parallelize(data)
- print(rdd.flatMap(lambda line:line.split(" ")).collect())
- my_flatMap()
- print("***************************groupBy()***************************")
- def my_groupBy():
- data = ["hello spark","hello world","hello world"]
- rdd = sc.parallelize(data)
- mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
- groupByRdd = mapRdd.groupByKey()
- print(groupByRdd.collect())
- print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())
-
- my_groupBy()
-
- print("***************************reduceByKey()***************************")
- #出现Wordcount结果
- def my_reduceByKey():
- data = ["hello spark", "hello world", "hello world"]
- rdd = sc.parallelize(data)
- mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
- reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
- print(reduceByKeyRdd.collect())
- my_reduceByKey()
-
- print("***************************sortByKey()***************************")
- #将Wordcount结果中数字出现的次数进行降序排列
- def my_sort():
- data = ["hello spark", "hello world", "hello world"]
- rdd = sc.parallelize(data)
- mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
- reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
- #reduceByKeyRdd.sortByKey().collect() 此时是按照字典在排序
- #reduceByKeyRdd.sortByKey(False).collect()
- #先对对键与值互换位置,再排序,再换位置回来
- reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
- print(reduceByKey)
- my_sort()
-
- print("***************************union()***************************")
- def my_union():
- a = sc.parallelize([1,2,3])
- b = sc.parallelize([3,4,5])
- U = a.union(b).collect()
- print(U)
- my_union()
-
- print("***************************union_distinct()***************************")
- def my_distinct():
- #这个和数学并集一样了
- a = sc.parallelize([1, 2, 3])
- b = sc.parallelize([3, 4, 2])
- D = a.union(b).distinct().collect()
- print(D)
- my_distinct()
-
- print("***************************join()***************************")
- def my_join():
- a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
- b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
- J = a.fullOuterJoin(b).collect
- print(J)
- my_join()
-
- sc.stop()
-
- '''
- Spark Core核心算子回顾
- -- Transformation算子编程:
- map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等
- '''

Spark Streaming
任务1:PySpark数据处理
任务2:PySpark数据统计
任务3:PySpark分组聚合
任务5:SparkML基础:分类模型
任务6:SparkML基础:回归模型
任务7:SparkML:聚类模型
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。