赞
踩
介绍
1、RDD能调用的函数,DataFrame大部分也能调用
2、这一部分的API主要用于个人的测试
分类
代码
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import re from pyspark import StorageLevel from pyspark.sql import SparkSession if __name__ == '__main__': # todo:0-设置系统环境变量 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' # todo:1-构建SparkContext spark = SparkSession \ .builder \ .appName("MovieApp") \ .master("local[2]") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate() # todo:2-数据处理:读取、转换、保存 people_df = spark.read.json("../datas/resources/people.json") # 基本算子:count/collect/take/first/head/tail/foreach/foreachPartition/distinct/union/unionAll/coalesce/repartition count = people_df.count() print(f"总共有:{count}行") collect = people_df.collect() print(f"转换成列表以后的内容是:{collect}") take = people_df.take(2) print(f"前三行的内容是:{take}") first = people_df.first() print(f"前一行的内容是:{first}") head = people_df.head(2) print(f"表格的前两行的内容是:{head}") tail = people_df.tail(1) print(f"最后一行的内容是:{tail}") people_df.foreach(lambda row: print(row)) people_df.foreachPartition(lambda part: print(*part)) print("union的结果") people_df_other = spark.read.json("../datas/resources/people.json") people_df.union(people_df_other).show() print("unionAll的结果") people_df.unionAll(people_df_other).show() print("distinct的结果") people_df.unionAll(people_df_other).distinct().show() print(f"原来的分区数:{people_df.rdd.getNumPartitions()}") print(f"减少后分区数:{people_df.coalesce(1).rdd.getNumPartitions()}") print(f"增大后分区数:{people_df.repartition(4).rdd.getNumPartitions()}") # 持久化算子 people_df.cache() people_df.persist(StorageLevel.MEMORY_AND_DISK_2) people_df.unpersist(blocking=True) # 其他算子 columns = people_df.columns print(f"所有列的名称:{columns}") schema = people_df.schema print(f"所有列的信息:{schema}") rdd = people_df.rdd rdd.foreach(lambda x: print(x)) people_df.printSchema() # todo:3-关闭SparkContext spark.stop()
介绍
DSL风格就是将我们之前的每一个SQL关键字转为函数来实现编程,和SQL风格没有任何本质区别
分类
应用-将电影评分案例转为DSL风格
# 导包 import datetime import os import re from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col, count, avg,round from pyspark.sql.types import Row, StructField, StructType, StringType, DoubleType, IntegerType, LongType if __name__ == "__main__": # 配置JDK的路径,就是前面解压的那个路径 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['YARN_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' #获取SparkSession对象 spark = SparkSession.builder\ .appName("wordcount_sparksql")\ .master("local[2]")\ .config("spark.sql.shuffle.partitions",2)\ .getOrCreate() #1、读取每一个文件,将文件转为RDD #2、将RDD元素进行切割,转为元组RDD rdd1 = spark.sparkContext.textFile("/spark/spark_sql/ratings.dat")\ .map(lambda x:re.split("::",x))\ .map(lambda x:(x[0],x[1],int(x[2]),int(x[3]))) rdd2 = spark.sparkContext.textFile("/spark/spark_sql/movies.dat") \ .map(lambda x: re.split("::", x)) \ .map(lambda x: (x[0], x[1], x[2])) #3、将元组RDD转为DataFrame dataFrame1 = rdd1.toDF(['user_id','movie_id','grade','time']) dataFrame2 = rdd2.toDF(['movie_id','movie_name','category']) #4、使用DSL风格来实现需求 """ #join:dataFrame2另外一张表, movie_id是关联字段 leftouter是连表方式 #select 字段挑选、 #groupBy 分组 #agg 聚合,可以写多个聚合方式 #where 条件筛选 没有having #order by 排序,可以执行多个排序方式 """ dataFrame3 = dataFrame1.join(dataFrame2,dataFrame1.movie_id == dataFrame2.movie_id,'leftouter')\ .select(col('movie_name'),col('grade'))\ .groupBy(col('movie_name'))\ .agg( count("*").alias("movie_grade_cnt"), round(avg(col('grade')),2).alias('movie_grade_avg') )\ .where(col('movie_grade_cnt') > 2000) \ .orderBy(col('movie_grade_cnt').desc(),col('movie_grade_avg').asc()) dataFrame3.show() #关闭spark spark.stop()
1、我们希望从外部读取一个结构化数据文件直接转为DataFrame,而不需要再转为RDD
2、如果我们读取的不是一个结构化的文件,则我们可以先转为RDD,经过转换再转为DataFrame
语法
#方式1-只能读取text文件
dataFrame1 = spark.read.text("../datas/resources/people.txt")
#方式2-通用 !!!!!!!!!!!
dataFrame1 = spark.read.format("text").load("../datas/resources/people.txt")
#方式3-通用
dataFrame1 = spark.read.load(path="../datas/resources/people.txt", format="text")
dataFrame1.printSchema()
dataFrame1.show()
结论:
1、我们能不用text方法读取就不用,因为text方式会把文件每一行无差别读取,不能转为表格
特点
text方式读取默认是将文件每一行当做一列,一般不符合需求
代码
dataFrame1 = spark.read.text("/spark/spark_sql/input/people.txt")
dataFrame1.show()
语法
#方式1-只能读取csv文件
dataFrame1 = spark.read.csv("../datas/resources/people.txt")
#方式2-通用 !!!!!!!!!!!
dataFrame1 = spark.read.format("csv").load("../datas/resources/people.txt")
#方式3-通用
dataFrame1 = spark.read.load(path="../datas/resources/people.txt", format="csv")
dataFrame1.printSchema()
dataFrame1.show()
option选项
.option('sep',',')\ #分隔符
.option('header','true')\ #指定表头作为列名
.option('inferSchema','true')\ #自动解析表结构信息
.load("/spark/spark_sql/input/people.txt") #加载文件的位置
代码
方式1-没有表头,分隔符默认是逗号
# my_schema = StructType([
# StructField("name", StringType(), True),
# StructField("age", IntegerType(), True)
# ]
# )
my_schema = "name string,age int" #是不是相当于建了一张表,指定列的名字和列的类型
dataFrame1 = spark.read.csv("/spark/spark_sql/input/people.txt",schema=my_schema)
dataFrame1.show()
#csv默认的分隔符是逗号
方式2-有表头,分隔符手动指定
dataFrame1 = spark.read.format("csv")\
.option('sep',',')\ #指定分隔符
.option('header','true')\ #将第一行当表头
.option('inferSchema','true')\ #自定推断表字段信息
.load("/spark/spark_sql/input/people.txt") #读取的文件路径
dataFrame1.printSchema()
dataFrame1.show()
#结论:该方式系统可以自动推断表字段信息,如果推断的信息和你想的不一致,必须手动来指定表结构
方式3-没有表头,分隔符手动指定
my_schema = "user_id string,movie_id string,grade int,time int"
dataFrame1 = spark.read.format("csv") \
.option('sep', '::') \
.schema(my_schema)\
.load("/spark/spark_sql/input/ratings.dat")
dataFrame1.printSchema()
dataFrame1.show()
#结论:该方式用来读取任意分隔符的文件,需要手动指定表字段的Schema信息
介绍
1、PARQUET是一种列存储格式,和ORC一样,都是常用的存储格式
2、PARQUET文件是看不懂的,这个Spark默认的存储格式
3、PARQUET文件内部自带表结构信息,所以读取时不需要指定任何参数
语法
#方式1-只能读取.parquet文件
dataFrame1 = spark.read.parquet("../datas/resources/people.txt")
#方式2-通用 !!!!!!!!!!!
dataFrame1 = spark.read.format("parquet").load("../datas/resources/people.txt")
#方式3-通用
dataFrame1 = spark.read.load(path="../datas/resources/people.txt", format="parquet")
dataFrame1.printSchema()
dataFrame1.show()
代码
dataFrame1 = spark.read.format("parquet").load("/spark/spark_sql/input/users.parquet")
dataFrame1.show()
介绍
1、json是一种半结构化数据
2、json文件自带表结构信息
语法
#方式1-只能读取json文件
dataFrame1 = spark.read.json("../datas/resources/people.txt")
#方式2-通用 !!!!!!!!!!!
dataFrame1 = spark.read.format("json").load("../datas/resources/people.txt")
#方式3-通用
dataFrame1 = spark.read.load(path="../datas/resources/people.txt", format="json")
dataFrame1.printSchema()
dataFrame1.show()
代码
#------------------使用默认的列名----------- dataFrame1 = spark.read.format("json").load("/spark/spark_sql/input/people.json") dataFrame1.printSchema() dataFrame1.show() #------------------自己修改列名------------- dataFrame1 = spark.read.format("json").\ load("/spark/spark_sql/input/people.json")\ .withColumnRenamed('age','age2')\ .withColumnRenamed('name','name2') dataFrame1.printSchema() dataFrame1.show() #结论:json文件自带表字段信息,不需要指定Schema
# 导包 import datetime import os import re from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col, count, avg,round from pyspark.sql.types import Row, StructField, StructType, StringType, DoubleType, IntegerType, LongType if __name__ == "__main__": # 配置JDK的路径,就是前面解压的那个路径 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['YARN_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' #获取SparkSession对象 spark = SparkSession.builder\ .appName("wordcount_sparksql")\ .master("local[2]")\ .config("spark.sql.shuffle.partitions",2)\ .getOrCreate() #============1-使用Text方式读取外部文件,转为DataFrame==================== """ dataFrame1 = spark.read.text("/spark/spark_sql/input/people.txt") dataFrame1.show() """ # ============2-使用CSV方式读取外部文件,转为DataFrame==================== # my_schema = StructType([ # StructField("name", StringType(), True), # StructField("age", IntegerType(), True) # ] # ) """ my_schema = "name string,age int" #是不是相当于建了一张表,指定列的名字和列的类型 dataFrame1 = spark.read.csv("/spark/spark_sql/input/people.txt",schema=my_schema) dataFrame1.show() """ """ dataFrame1 = spark.read.format("csv")\ .option('sep',',')\ .option('header','true')\ .option('inferSchema','true')\ .load("/spark/spark_sql/input/people.txt") dataFrame1.printSchema() dataFrame1.show() """ """ my_schema = "user_id string,movie_id string,grade int,time int" dataFrame1 = spark.read.format("csv") \ .option('sep', '::') \ .schema(my_schema)\ .load("/spark/spark_sql/input/ratings.dat") dataFrame1.printSchema() dataFrame1.show() """ # ============3-使用PARQUET方式读取外部文件,转为DataFrame==================== """ dataFrame1 = spark.read.format("parquet").load("/spark/spark_sql/input/users.parquet") dataFrame1.show() """ # ============4-使用json方式读取外部文件,转为DataFrame==================== dataFrame1 = spark.read.format("json").\ load("/spark/spark_sql/input/people.json")\ .withColumnRenamed('age','age2')\ .withColumnRenamed('name','name2') dataFrame1.printSchema() dataFrame1.show() #关闭spark spark.stop()
我们可以在分析完之后,将分析完的结果重新保存到某一个存储系统,比如本地或者hdfs系统
#方式1-以文本格式写入,要求太多,建议放弃 dataFrame1.write.mode("append").format("text").save("/spark/spark_sql/output") #方式2-以csv格式写入,推荐 dataFrame3.write\ .mode("overwrite")\ .format("csv")\ .option("sep", "\t")\ .save("/spark/spark_sql/output") #方式3-以json格式写入 dataFrame3.write\ .mode("overwrite")\ .format("json")\ .save("/spark/spark_sql/output") #方式4-以parquet格式写入 dataFrame3.write\ .mode("overwrite")\ .format("parquet")\ .save("/spark/spark_sql/output") #解释mode写入方式 --append: 追加模式,当数据存在时,继续追加 --overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据; --error/errorifexists: 如果目标存在就报错,默认的模式 --ignore: 忽略,数据存在时不做任何操作
# 导包 import datetime import os import re from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col, count, avg,round from pyspark.sql.types import Row, StructField, StructType, StringType, DoubleType, IntegerType, LongType if __name__ == "__main__": # 配置JDK的路径,就是前面解压的那个路径 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['YARN_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' #获取SparkSession对象 spark = SparkSession.builder\ .appName("wordcount_sparksql")\ .master("local[2]")\ .config("spark.sql.shuffle.partitions",2)\ .getOrCreate() #1、读取每一个文件,将文件转为RDD #2、将RDD元素进行切割,转为元组RDD rdd1 = spark.sparkContext.textFile("/spark/spark_sql/ratings.dat")\ .map(lambda x:re.split("::",x))\ .map(lambda x:(x[0],x[1],int(x[2]),int(x[3]))) rdd2 = spark.sparkContext.textFile("/spark/spark_sql/movies.dat") \ .map(lambda x: re.split("::", x)) \ .map(lambda x: (x[0], x[1], x[2])) #3、将元组RDD转为DataFrame dataFrame1 = rdd1.toDF(['user_id','movie_id','grade','time']) dataFrame2 = rdd2.toDF(['movie_id','movie_name','category']) #4、将DataFrame注册成表 dataFrame1.createOrReplaceTempView('t1_movie_grade') #电影评分表 dataFrame2.createOrReplaceTempView('t2_movie_name') #电影名字表 #5、编写SparkSQL #统计评分次数大于2000的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数 dataFrame3 = spark.sql(""" select movie_name, count(*) as movie_grade_cnt , round(avg(grade),2) as movie_grade_avg from t1_movie_grade as t1 left join t2_movie_name as t2 on t1.movie_id = t2.movie_id group by movie_name having movie_grade_cnt > 2000 order by movie_grade_cnt desc """) #将分析的结果保存到HDFS #--------------方式1-text方式----------------- #dataFrame3.write.mode("overwrite").format("text").save("/spark/spark_sql/output") #--------------方式2-csv方式----------------- #option("sep", "\t")执行写入文件的分隔符 """ dataFrame3.write\ .mode("overwrite")\ .format("csv")\ .option("sep", "\t")\ .save("/spark/spark_sql/output") """ # --------------方式3-json方式----------------- """ dataFrame3.write\ .mode("overwrite")\ .format("json")\ .save("/spark/spark_sql/output") """ # --------------方式4-parquet方式----------------- dataFrame3.write\ .mode("overwrite")\ .format("parquet")\ .save("/spark/spark_sql/output") #关闭spark spark.stop()
希望SparkSQL可以去读取MySQL中的数据
#扩展
一个框架要想去连接MySQL,必须获取MySQL的四大信息:mysql主机地址,数据库和表,用户名、密码
1、准备将MySQL的驱动mysql-connector-java-5.1.32.jar
2、将驱动包放在${ANACONDA_HOME}/Lib/site-packages/pyspark/jars 目录
${ANACONDA_HOME}:是你Anaconda的安装目录
Linux系统
# 分别进入三台主机的以下目录
cd /export/server/anaconda3/lib/python3.8/site-packages/pyspark/jars
# 上传jar包mysql-connector-java-5.1.32.jar到对应的目录下
rz
# 导包 import datetime import os import re from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col, count, avg,round from pyspark.sql.types import Row, StructField, StructType, StringType, DoubleType, IntegerType, LongType if __name__ == "__main__": # 配置JDK的路径,就是前面解压的那个路径 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['YARN_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' #获取SparkSession对象 spark = SparkSession.builder\ .appName("wordcount_sparksql")\ .master("local[2]")\ .config("spark.sql.shuffle.partitions",2)\ .getOrCreate() #设置日志级别为WARN spark.sparkContext.setLogLevel("WARN") #SparkSQL从mysql读取数据 #==================SparkSQL和MySQL关联=========================== #mysql的主机地址 url = "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true" #mysql的数据库名和表名 table_info = "db_company.emp" #mysql的用户名、密码及驱动 prop = {'user': 'root', 'password': '123456', 'driver': 'com.mysql.jdbc.Driver'} #将mysql中的表转为DataFrame dataFrame1 = spark.read.jdbc(url=url,table=table_info,properties=prop) #dataFrame1.show() #将DataFrame的注册为Spark的表 dataFrame1.createOrReplaceTempView("t1") #使用SparkSql来操作表 dataFrame2 = spark.sql(""" select * from t1 order by sal desc limit 10 """) dataFrame2.show() #关闭spark spark.stop()
1、SparkSQL希望对Hive中已经表进行分析,底层使用Spark引擎
2、Spark是通过读取Hive的元数据进行数据的读取,所以必须开启Hive的MetaStore服务
#在node1启动HDFS
start-dfs.sh
#在node1上启动HiveMetaStore 服务
start-metastore.sh
#特别说明: 如果你用的是自己的虚拟机需要把老师发的start-metastore.sh脚本,修改权限777之后放在/export/server/hive/bin/hive目录,然后启动:/export/server/hive/bin/hive/start-metastore.sh
local模式
#在node1进入配置文件目录创建配置文件
cd /export/server/spark-local/conf
vim hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.itcast.cn:9083</value>
</property>
</configuration>
yarn模式
#在node1,node2,node3 分别进入配置文件目录创建配置文件,该文件不存在,需要创建
cd /export/server/spark-yarn/conf
vim hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1.itcast.cn:9083</value>
</property>
</configuration>
1、进入Spark-Shell终端
#本地模式
/export/server/spark-local/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2
#yarn模式
/export/server/spark-yarn/bin/pyspark --master yarn --conf spark.sql.shuffle.partitions=2
2、执行以下命令
#-----------------测试1-执行sparksql-------------------- #你在这个终端中看到一切数据都是hive的 # 列举 spark.sql("show databases").show() spark.sql("show tables in db_hive") .show() # DQL分析 spark.sql(""" select d.dname, round(avg(e.sal), 2) as avg_sal from db_hive.emp e join db_hive.dept d on e.deptno = d.deptno group by d.dname order by avg_sal desc """).show() # DDL建表 spark.sql("create table db_hive.tb_test(word string)") #-----------------测试2-将hive表转为DataFrame-------------------- # 读取Hive表构建DataFrame hiveData = spark.read.table("db_hive.emp") hiveData.printSchema() hiveData.show()
# 导包 import datetime import os import re from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col, count, avg,round from pyspark.sql.types import Row, StructField, StructType, StringType, DoubleType, IntegerType, LongType if __name__ == "__main__": # 配置JDK的路径,就是前面解压的那个路径 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['YARN_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' #根据Hive的参数来获取SparkSession """ #指定hive的数据存储目录 .config("spark.sql.warehouse.dir", 'hdfs://node1.itcast.cn:8020/user/hive/warehouse')\ #指定hive的metastore服务的地址 .config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")\ #打开Spark和hive集成的开关 .enableHiveSupport()\ """ spark = SparkSession \ .builder \ .appName("SparkSQLAppName") \ .master("local[2]") \ .config("spark.sql.shuffle.partitions", 2) \ .config("spark.sql.warehouse.dir", 'hdfs://node1.itcast.cn:8020/user/hive/warehouse')\ .config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")\ .enableHiveSupport()\ .getOrCreate() #直接来分析Hive中的表 #按照部分求平均薪资 dataFrame1 = spark.sql(""" select deptno,round(avg(sal),2) as sal_avg from db_hive.emp group by deptno order by sal_avg desc """) dataFrame1.show() #关闭spark spark.stop()
1、我们希望在DataGrip窗口直接敲代码,底层使用Spark引擎
2、DataGrip要连接spark,需要提前开启ThriftServer服务,这个服务类似与Hive的hiveserver2,主要作用是对DataGrip提交的SQL进行解析
#在node1上启动ThriftServer服务
#---------------local模式------------------------
/export/server/spark-local/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10001 \
--hiveconf hive.server2.thrift.bind.host=node1.itcast.cn \
--master local[2] \
--conf spark.sql.shuffle.partitions=2
#---------------yarn模式------------------------
/export/server/spark-yarn/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10001 \
--hiveconf hive.server2.thrift.bind.host=node1.itcast.cn \
--master yarn \
--conf spark.sql.shuffle.partitions=2
1、创建Project,如果已经有Project,则忽略
2、配置
show databases ;
select deptno,round(avg(sal),2) as sal_avg
from db_hive.emp group by deptno order by sal_avg desc
1、当你在写SparkSQL时,如果任何一个SQL函数,或者DSL函数都不能满足你的需求,你还有最后一招:自定义函数
2、自定义函数就是按照自己的需求去定义函数
UDF变量名 = spark.udf.register(UDF函数名, 函数的处理逻辑)
#定义:spark.udf.register()
UDF变量名:DSL中调用UDF使用的
UDF函数名:SQL中调用UDF使用
01 周杰伦 150/175 -> 150斤/175cm
02 周杰 130/185
03 周华健 148/178
--------转为---------------
01 周杰伦 150斤/175cm
02 周杰 130斤/185cm
03 周华健 148斤/178cm
# 导包 import datetime import os import re from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col, count, avg,round from pyspark.sql.types import Row, StructField, StructType, StringType, DoubleType, IntegerType, LongType if __name__ == "__main__": # 配置JDK的路径,就是前面解压的那个路径 os.environ['JAVA_HOME'] = '/export/server/jdk' os.environ['HADOOP_HOME'] = '/export/server/hadoop' os.environ['HADOOP_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['YARN_CONF_DIR'] = '/export/server/hadoop/etc/hadoop' os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3' #创建SparkSession对象 spark = SparkSession.builder \ .appName("wordcount_sparksql") \ .master("local[2]") \ .config("spark.sql.shuffle.partitions", 2) \ .getOrCreate() #读取文件,将文件转为DataFrame dataFrame1 = spark.read.format("csv")\ .option('sep','\t')\ .load('/spark/spark_sql/input/music.tsv')\ .withColumnRenamed('_c0','id')\ .withColumnRenamed('_c1','name')\ .withColumnRenamed('_c2','info') #将dataFrame1注册成表 dataFrame1.createOrReplaceTempView("t1") #-------------定义函数开始-------------------- def func1(x): list = re.split("\\/",x) #这里的x就是: 150/175 return list[0]+'斤'+'/'+ list[1] + 'cm' yyy = spark.udf.register('xxx', lambda x:func1(x)) #-------------定义函数结束-------------------- #在SQL中使用自定义函数 # spark.sql(""" # select # id, # name, # xxx(info) as info # from t1 # """).show() #在DSL中使用自定义函数 dataFrame1.select( col('id'), col('name'), yyy(col('info')).alias("info") ).show() #关闭spark spark.stop()
ter(“local[2]”)
.config(“spark.sql.shuffle.partitions”, 2)
.getOrCreate()
#读取文件,将文件转为DataFrame dataFrame1 = spark.read.format("csv")\ .option('sep','\t')\ .load('/spark/spark_sql/input/music.tsv')\ .withColumnRenamed('_c0','id')\ .withColumnRenamed('_c1','name')\ .withColumnRenamed('_c2','info') #将dataFrame1注册成表 dataFrame1.createOrReplaceTempView("t1") #-------------定义函数开始-------------------- def func1(x): list = re.split("\\/",x) #这里的x就是: 150/175 return list[0]+'斤'+'/'+ list[1] + 'cm' yyy = spark.udf.register('xxx', lambda x:func1(x)) #-------------定义函数结束-------------------- #在SQL中使用自定义函数 # spark.sql(""" # select # id, # name, # xxx(info) as info # from t1 # """).show() #在DSL中使用自定义函数 dataFrame1.select( col('id'), col('name'), yyy(col('info')).alias("info") ).show() #关闭spark spark.stop()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。