当前位置:   article > 正文

【 Spark编程基础 】实验3_实验三spark编程基础实验报告

实验三spark编程基础实验报告

准备工作

启动Hadoop集群 & Spark

• 启动Hadoop集群start-all.sh
• 启动Sparkcd /usr/local/spark/spark-2.3.3-bin-hadoop2.6/# ./sbin/start-all.sh

实验数据说明。

• 数据为1970年到2016年,每年各球队的球员比赛数据统计,数据文件的格式如图1所示:
在这里插入图片描述

图1. NBA球员评估数据

• 篮球数据缩写说明如图2所示:
在这里插入图片描述
图2.篮球数据缩写说明

• 获取数据集:

数据集路径:https://staticfile.eec-cn.com/dataSet/systemLib/80c034eedda84db5aeec2a1558c51cc6.zip

• 将下载的数据集,解压缩到/home/data/目录下

mkdir /home/data
cd /home/data
wget https://staticfile.eec-cn.com/dataSet/systemLib/80c034eedda84db5aeec2a1558c51cc6.zip
unzip 80c034eedda84db5aeec2a1558c51cc6.zip
  • 1
  • 2
  • 3
  • 4

#1 集群启动

• 启动Hadoop集群

cd /opt/module/hadoop
./sbin/start-all.sh
  • 1
  • 2

• 启动Spark

 cd /opt/module/spark
 ./sbin/start-all.sh
  • 1
  • 2

#2 上传实验数据到HDFS

• 创建数据目录

cd /opt/module/hadoop
hadoop fs -mkdir -p /home/student/data
  • 1
  • 2

• 上传数据集

hadoop fs -put /home/data/nba_data/* /home/student/data
  • 1

#3 分析实现过程

启动IDEA,创建Python文件

  • 启动IDEA
cd /opt/module/idea-IU-223.8836.41
./bin/idea.sh
  • 1
  • 2
  • 登录自己的JetBrain账号 /

  • 创建Python项目

SparkSession

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if __name__ == '__main__':
    conf = SparkConf().setAppName("spark sql demo").setMaster("local[*]")
    sparkSession = SparkSession.builder.config(conf=conf).getOrCreate()
    # 处理有列明文件
    ds = sparkSession.read.format('csv').option("header", "true").load(
        "hdfs://node1:8020/home/student/data/leagues_NBA_2016_per_game_per_game.csv")
    # 查看表
    ds.show()
    # 查看表结构
    ds.printSchema()
    
    # 查看某一列 类似于MySQL: select Player,Tm from people
    ds.select("Player", "Tm").show()
    # 查看多列并作计算
    # 基于当前列进行加1
    ds.select("Player", ds.Age + 1).show()
    # 设置过滤条件
    ds.filter("Age > 21").show()
    # 做聚合操作
    ds.groupBy("Tm").count().show()

    # 上述多个条件进行组合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age
    ds.select("Player", (ds.Age + 1).alias("Age")).filter("Age > 21").groupBy("Tm").count().show()

    # 直接使用spark SQL进行查询
    # 先注册为临时表
    ds.createOrReplaceTempView("nba")
    ds.cache()
    sqlDF = sparkSession.sql("SELECT * FROM nba")
    sqlDF.show()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 查看表
    在这里插入图片描述
  • 查看表结构
    在这里插入图片描述
  • 查看某一列
    在这里插入图片描述
  • 查看多列并计算
    在这里插入图片描述
  • 设置过滤条件
    在这里插入图片描述
  • 聚合操作
    在这里插入图片描述

SparkContext(可选)

from pyspark import SparkConf, SparkContext, SQLContext, Row
from pyspark.sql.types import IntegerType, StructType, StructField, StringType
​
if __name__ == '__main__':
    conf = SparkConf().setAppName("HiveDemo").setMaster("local")
    sc = SparkContext(conf=conf)
    # Spark1.0中访问SparkSQL的方式,现在还保留,为了向后兼容,Spark2.0以后用SparkSession可以代替
    sqlContext = SQLContext(sc)
    # 设置conf,配置AppName,运行的Master(这里设置为本地模式
    # 创建一个sc的SQLContext对象
    # 创建一个sqlcontext对象(也可以是SQLContext的子类对象,如 HiveContext)
    # 加载数据源
    datas = sc.textFile("hdfs://node1:8020/home/student/data/leagues_NBA_2016_per_game_per_game.csv")
    # RDD转换为DataFrame有两种方式:(这里使用了第二种)
    # 使用反射方式推断元数据
    # 使用编程接口来创建DataFrame.
    rowRDD = datas.map(lambda line: line.split(",")).map(lambda data: Row(int(data[0]), data[1], data[2]))
    # 创建出元素为ROW的RDD# 流程简介:从原始的RDD创建一个元素为row的RDD;接下来创建一个structType,来代表ROW,最后将动态定义的
    # 元数据应用到RDD(ROW)上
    structType = StructType([
        StructField("Rk", IntegerType(), True),
        StructField("Player", StringType(), True),
        StructField("Age", IntegerType(), True),
        StructField("Tm", StringType(), True)
        # 通过编程的方式动态的构造元数据
    ])
    # 通过sqlContext的createDataFrame方法,创建DataFrame,
    # 将row类型的RDD和数据结构structType结合到一起
    stuDF = sqlContext.createDataFrame(rowRDD, structType)
    stuDF.show()
    # show方法可以把里面的数据显示出来
    stuDF.registerTempTable("nba")
    # 注册为临时表,这样就可以使用SQL语句了.
    sqlContext.sql("select Player from nba where Tm='SAC'").show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

SparkSession和SparkContext关系
在这里插入图片描述

#4 扩展功能

  1. 合并nba所有csv数据(已实现)
  2. 清洗数据,讲cvs文件中多余列名去掉(已实现)

#5 问题发现

处理有列名文件和无列名文件

  • 有列名文件:
data = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.122:9000/20200107/cust.csv")
  • 1
  • 无列名文件:
data = spark.read.format("csv").load("hdfs://192.168.56.122:9000/20200107/cust.csv")
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/819839
推荐阅读
相关标签
  

闽ICP备14008679号