赞
踩
这章主要是关于 Spark的SQL操作,如何把Spark与HIve连接起来,可以参考下面的文字链接: Spark SQL 操作 MySQL数据库和 Hive数据仓库
接着Hive时候的操作,继续。
要启动,hadoop集群、hive服务、spark。
/usr/hadoop/hadoop-2.7.3/sbin/start-all.sh
hive --service metastore
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh
进入数据库,导入sql包,定义变量存储sql(" ")的结果,分组groupby(订单),再count统计(),show显示出来。
import spark.sql sql("use badou") val orders=sql("select * from orders") val products=sql("select * from products") val priors=sql("select * from priors") priors.show(10) // 查看有order_id中product_id --方式一: priors.groupBy("product_id").count().show --方式二: val proCnt = priors.groupBy("product_id").count() proCnt.show(10) show 默认显示20条数据 show(10):显示指定的条数 show(1,false) 显示的记录数 和针对字符过长进行格式化显示
cache方法:加载到内存
val proCnt = priors.groupBy("product_id").count().cache //没有执行,只是加载到内存中
proCnt.show(10) //第一次运行会慢
proCnt.show(100) //再次运行就直接内存中读取非常快
proCnt.unpersist //在内存中直接移除
场景:当一个商品被重复购买,重复购买的比率越高(这类商品可以理解为消耗品,抽纸,洗发水等等),那下一次购买的可能性很高。
预测:购买这些商品的用户,下一次最容易购买哪些商品。
要求:1、对orders的col列:"eval_set"=="test"
过滤,输出结果。
1、先把orders存到内存,这样查找时候省时间
orders.cache
orders.show(10) //执行cache操作
【注意】:进行条件:等于是 “===”
--1、filter过滤:
orders.filter(col("eval_set")==="test").show(5)
--2、where过滤:
orders.where(col("eval_set")==="test").show(5)
对上面的结果再进行过滤,只显示周二的结果:filter(col("order_dow")==="1")
--两个过滤:where+filter
orders.where(col("eval_set")==="test").filter(col("order_dow")===1).show(10)
再次购买的列在priors表里
val priors=sql("select * from priors")
priors.cache
priors.show(10)
可以发现:reordered=1,代表重新购买。统计商品重新购买的次数
限制列读取 priors表 priors.select(col("product_id"),col("reordered")).show(5) 方式一:简陋版 priors.filter(col("reordered")==="1").groupBy("product_id").count().show(10) 方式二: priors.selectExpr("product_id","cast(reordered as int)").filter(col("reordered")===1).groupBy("product_id").sum().show(10) 方式三: priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").sum("reordered").show(10) 方式四:理想版 priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered")).show(10) 方式五:我们需求版 priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)
我们发现结果的列显示sum(reordered),avg(reordered) 我们想重命名一下,用withColumnRenamed("old_name","new_name")
进行重命名。
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).withColumnRenamed("sum(reordered)","sum_re").withColumnRenamed("avg(reordered)","avg_re")show(5)
sum(重新购买的商品) / count (全部商品)
1、重复购买的商品量
val productSumRe = priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"))
2、总的商品量
val proCnt = priors.groupBy("product_id").count()
方式一: scala priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5) 方式二:SQL --join连接表,表1.join(表2, "表都有的列名") val jCnt = proCnt.join(productSumRe, "product_id") jCnt.show(5) jCnt.selectExpr("*", "sum_re/count as mean_re").show(5) --------------------*******------------------- 方式三:udf:User Defined Function,用户自定义函数。 withColumn表示增加一列,withColumn("new_col","运算的结果") udf((x_1:,x_2)=>) import org.apache.spark.sql.functions._ val avg_udf = udf((sm:Long,cnt:Long)=>sm.toDouble/cnt.toDouble) // 实现sum/count jCnt.withColumn("mean_re", avg_udf(col("sum_re"),col("count"))).show(5) --对avg_udf格式化:sum_re/count 结果给 mean_re。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。