赞
踩
启动Hadoop集群 & Spark
• 启动Hadoop集群start-all.sh
• 启动Sparkcd /usr/local/spark/spark-2.3.3-bin-hadoop2.6/# ./sbin/start-all.sh
实验数据说明。
• 数据为1970年到2016年,每年各球队的球员比赛数据统计,数据文件的格式如图1所示:
图1. NBA球员评估数据
• 篮球数据缩写说明如图2所示:
图2.篮球数据缩写说明
• 获取数据集:
数据集路径:https://staticfile.eec-cn.com/dataSet/systemLib/80c034eedda84db5aeec2a1558c51cc6.zip
• 将下载的数据集,解压缩到/home/data/目录下
mkdir /home/data
cd /home/data
wget https://staticfile.eec-cn.com/dataSet/systemLib/80c034eedda84db5aeec2a1558c51cc6.zip
unzip 80c034eedda84db5aeec2a1558c51cc6.zip
• 启动Hadoop集群
cd /opt/module/hadoop
./sbin/start-all.sh
• 启动Spark
cd /opt/module/spark
./sbin/start-all.sh
• 创建数据目录
cd /opt/module/hadoop
hadoop fs -mkdir -p /home/student/data
• 上传数据集
hadoop fs -put /home/data/nba_data/* /home/student/data
启动IDEA,创建Python文件
cd /opt/module/idea-IU-223.8836.41
./bin/idea.sh
登录自己的JetBrain账号 /
创建Python项目
SparkSession
from pyspark import SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as F if __name__ == '__main__': conf = SparkConf().setAppName("spark sql demo").setMaster("local[*]") sparkSession = SparkSession.builder.config(conf=conf).getOrCreate() # 处理有列明文件 ds = sparkSession.read.format('csv').option("header", "true").load( "hdfs://node1:8020/home/student/data/leagues_NBA_2016_per_game_per_game.csv") # 查看表 ds.show() # 查看表结构 ds.printSchema() # 查看某一列 类似于MySQL: select Player,Tm from people ds.select("Player", "Tm").show() # 查看多列并作计算 # 基于当前列进行加1 ds.select("Player", ds.Age + 1).show() # 设置过滤条件 ds.filter("Age > 21").show() # 做聚合操作 ds.groupBy("Tm").count().show() # 上述多个条件进行组合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age ds.select("Player", (ds.Age + 1).alias("Age")).filter("Age > 21").groupBy("Tm").count().show() # 直接使用spark SQL进行查询 # 先注册为临时表 ds.createOrReplaceTempView("nba") ds.cache() sqlDF = sparkSession.sql("SELECT * FROM nba") sqlDF.show()
SparkContext(可选)
from pyspark import SparkConf, SparkContext, SQLContext, Row from pyspark.sql.types import IntegerType, StructType, StructField, StringType if __name__ == '__main__': conf = SparkConf().setAppName("HiveDemo").setMaster("local") sc = SparkContext(conf=conf) # Spark1.0中访问SparkSQL的方式,现在还保留,为了向后兼容,Spark2.0以后用SparkSession可以代替 sqlContext = SQLContext(sc) # 设置conf,配置AppName,运行的Master(这里设置为本地模式 # 创建一个sc的SQLContext对象 # 创建一个sqlcontext对象(也可以是SQLContext的子类对象,如 HiveContext) # 加载数据源 datas = sc.textFile("hdfs://node1:8020/home/student/data/leagues_NBA_2016_per_game_per_game.csv") # RDD转换为DataFrame有两种方式:(这里使用了第二种) # 使用反射方式推断元数据 # 使用编程接口来创建DataFrame. rowRDD = datas.map(lambda line: line.split(",")).map(lambda data: Row(int(data[0]), data[1], data[2])) # 创建出元素为ROW的RDD # 流程简介:从原始的RDD创建一个元素为row的RDD;接下来创建一个structType,来代表ROW,最后将动态定义的 # 元数据应用到RDD(ROW)上 structType = StructType([ StructField("Rk", IntegerType(), True), StructField("Player", StringType(), True), StructField("Age", IntegerType(), True), StructField("Tm", StringType(), True) # 通过编程的方式动态的构造元数据 ]) # 通过sqlContext的createDataFrame方法,创建DataFrame, # 将row类型的RDD和数据结构structType结合到一起 stuDF = sqlContext.createDataFrame(rowRDD, structType) stuDF.show() # show方法可以把里面的数据显示出来 stuDF.registerTempTable("nba") # 注册为临时表,这样就可以使用SQL语句了. sqlContext.sql("select Player from nba where Tm='SAC'").show()
SparkSession和SparkContext关系
处理有列名文件和无列名文件
data = spark.read.format("csv").option("header","true").load("hdfs://192.168.56.122:9000/20200107/cust.csv")
data = spark.read.format("csv").load("hdfs://192.168.56.122:9000/20200107/cust.csv")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。