当前位置:   article > 正文

PySpark数据分析_hadoop:pyspark对使用算子对数据进行数据分析和可视化

hadoop:pyspark对使用算子对数据进行数据分析和可视化

Spark SQL是 Apache Spark 用于处理结构化数据的模块。

第一步:PySpark 应用程序从初始化开始,SparkSession这是 PySpark 的入口点

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.getOrCreate()

第二步:创建DataFrame,三种方式

DataFrame是在Spark 1.3中正式引入的一种以RDD为基础的不可变的分布式数据集,类似于传统数据库的二维表格,数据在其中以列的形式被组织存储。如果熟悉Pandas,其与Pandas DataFrame是非常类似的东西。

  1. #从行列表创建 PySpark DataFrame
  2. from datetime import datetime, date
  3. import pandas as pd
  4. from pyspark.sql import Row
  5. df = spark.createDataFrame([
  6. Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
  7. Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
  8. Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
  9. ])
  10. #从 pandas DataFrame 创建 PySpark DataFrame
  11. pandas_df = pd.DataFrame({
  12. 'a': [1, 2, 3],
  13. 'b': [2., 3., 4.],
  14. 'c': ['string1', 'string2', 'string3'],
  15. 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
  16. 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
  17. })
  18. df = spark.createDataFrame(pandas_df)
  19. #从包含元组列表的 RDD 创建 PySpark DataFrame
  20. rdd = spark.sparkContext.parallelize([
  21. (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
  22. (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
  23. (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
  24. ])
  25. df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])

工作中读取数据的方式

  1. #普通读取csv为DataFrames数据
  2. # 读取csv为DataFrame
  3. traffic = spark.read.csv('Pokemon.csv', header='true')
  4. # 创建临时表
  5. traffic.createOrReplaceTempView("traffic")
  6. #通过pandas辅助读取csv
  7. import pandas as pd
  8. df = pd.read_csv('Pokemon.csv')
  9. traffic = spark.createDataFrame(df)
  10. traffic.createOrReplaceTempView("traffic")

备注由于Pokemon.csv这个文件中有空值,所以spark.createDataFrame()会失败的,但是使用第种方式读取就行了

SparkSQL基础语法 

Spark RDD 

RDD是一个抽象类,支持多种类型,弹性分布式数据集,其特点:一个RDD由多个分区/分片组成,对RDD进行一个函数操作,会对RDD的所有分区都执行相同函数操作,一个RDD依赖于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,若RDD1中某节点数据丢失,后面的RDD会根据前面的信息进行重新计算,对于Key-Value的RDD可以制定一个partitioner,告诉他如何分片。常用hash/range,移动数据不如移动计算,注:移动数据,不如移动计算。考虑磁盘IO和网络资源传输等

  1. from pyspark import SparkConf, SparkContext
  2. if __name__ == '__main__':
  3. conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
  4. sc = SparkContext(conf=conf)
  5. '''
  6. map:
  7. map(func)
  8. 将func函数作用到数据集的每个元素上,生成一个新的分布式数据集返回
  9. '''
  10. print("***************************map***************************")
  11. def my_map():
  12. # 创建一个序列
  13. data = [1,2,3,4,5]
  14. # 将序列转换为RDD
  15. rdd1 = sc.parallelize(data)
  16. # 使用函数对RDD进行作用,生成RDD2
  17. rdd2 = rdd1.map(lambda x:x*2)
  18. # 使用collect()讲结果输出
  19. print(rdd2.collect())
  20. my_map()
  21. def my_map2():
  22. a = sc.parallelize(["dog","tiger","lion","cat","panter","eagle"])
  23. b = a.map(lambda x:(x,1)) #进来一个x,返回一个(x,1)的形式
  24. print(b.collect())
  25. my_map2()
  26. print("***************************filter***************************")
  27. def my_filter():
  28. #给一个数据
  29. data = [1,2,3,4,5]
  30. rdd1 = sc.parallelize(data)
  31. mapRdd = rdd1.map(lambda x:x**2)
  32. filterRdd = mapRdd.filter(lambda x:x>5)
  33. print(filterRdd.collect())
  34. '''
  35. filter:
  36. filter(func)
  37. 返回所有func返回值为true的元素,生成一个新的分布式数据集返回
  38. '''
  39. def my_filter():
  40. data = [1,2,3,4,5]
  41. rdd1 = sc.parallelize(data)
  42. mapRdd = rdd1.map(lambda x:x*2)
  43. filterRdd = mapRdd.filter(lambda x:x > 5)
  44. print(filterRdd.collect())
  45. print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
  46. my_filter()
  47. print("***************************flatMap()***************************")
  48. #Wordcount第一步:
  49. def my_flatMap():
  50. #flatMap,将东西压扁/拆开 后做map
  51. data = ["hello spark","hello world","hello world"]
  52. rdd = sc.parallelize(data)
  53. print(rdd.flatMap(lambda line:line.split(" ")).collect())
  54. my_flatMap()
  55. print("***************************groupBy()***************************")
  56. def my_groupBy():
  57. data = ["hello spark","hello world","hello world"]
  58. rdd = sc.parallelize(data)
  59. mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
  60. groupByRdd = mapRdd.groupByKey()
  61. print(groupByRdd.collect())
  62. print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())
  63. my_groupBy()
  64. print("***************************reduceByKey()***************************")
  65. #出现Wordcount结果
  66. def my_reduceByKey():
  67. data = ["hello spark", "hello world", "hello world"]
  68. rdd = sc.parallelize(data)
  69. mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
  70. reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
  71. print(reduceByKeyRdd.collect())
  72. my_reduceByKey()
  73. print("***************************sortByKey()***************************")
  74. #将Wordcount结果中数字出现的次数进行降序排列
  75. def my_sort():
  76. data = ["hello spark", "hello world", "hello world"]
  77. rdd = sc.parallelize(data)
  78. mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
  79. reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
  80. #reduceByKeyRdd.sortByKey().collect() 此时是按照字典在排序
  81. #reduceByKeyRdd.sortByKey(False).collect()
  82. #先对对键与值互换位置,再排序,再换位置回来
  83. reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
  84. print(reduceByKey)
  85. my_sort()
  86. print("***************************union()***************************")
  87. def my_union():
  88. a = sc.parallelize([1,2,3])
  89. b = sc.parallelize([3,4,5])
  90. U = a.union(b).collect()
  91. print(U)
  92. my_union()
  93. print("***************************union_distinct()***************************")
  94. def my_distinct():
  95. #这个和数学并集一样了
  96. a = sc.parallelize([1, 2, 3])
  97. b = sc.parallelize([3, 4, 2])
  98. D = a.union(b).distinct().collect()
  99. print(D)
  100. my_distinct()
  101. print("***************************join()***************************")
  102. def my_join():
  103. a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
  104. b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
  105. J = a.fullOuterJoin(b).collect
  106. print(J)
  107. my_join()
  108. sc.stop()
  109. '''
  110. Spark Core核心算子回顾
  111. -- Transformation算子编程:
  112. map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等
  113. '''

Spark Streaming

任务1:PySpark数据处理 
任务2:PySpark数据统计
任务3:PySpark分组聚合
任务5:SparkML基础:分类模型 
任务6:SparkML基础:回归模型 
任务7:SparkML:聚类模型 
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/406737
推荐阅读
相关标签
  

闽ICP备14008679号