当前位置:   article > 正文

09-Spark基础+PySpark案例_spark pytorch

spark pytorch

一、配置Scala

Spark是用Scala开发的,所以进行Spark开发的语言就是Scala,所以先用IDEA安装Scala插件。

1、安装Scala插件

在IDEA中,File—Settings—Plugins找到Scala,Install。

安装完会提示重启IDEA,重启即可。

安装完之后,点击File—Project Structure—Global Libraries

点击Download,

加载一段时间后会要求选择Scala版本,我们选个老一点的2.12.11,然后就会自动下载。

2、创建Maven工程

创建Maven工程:

 由于Spark的学习包括很多模块,所以我们可以把spark项目当做一个父项目,先把原来的src文件夹删掉,新建一个个模块,作为不同模块的子项目。

新建一个Module,

在src/main/java中新建一个包: com.wolf.bigdata.spark.core

在包里新建一个Scala类Test,类型选object

写入测试内容:

  1. package com.wolf.bigdata.spark.core
  2. object Test {
  3. def main(args: Array[String]): Unit = {
  4. println("hello spark")
  5. }
  6. }

运行输出结果:

说明Scala配置成功。 

二、WordCount案例

1、案例分析

2、Spark环境

说明:这里暂时先用Maven进行配置,以后再进行环境搭建

在spark.core包中新建一个包叫wc

在包里新建一个Scala-class叫Scala01_WordCount,类型为object

Spark编程一般可以分为三大步骤:

  1. // 1、建立和Spark框架的连接
  2. // 2、执行业务操作
  3. // 3、关闭连接

2.1 添加Spark依赖关系

在pom.xml中添加如下依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. </dependencies>

为了实现后续功能,原则上还要添加如下两个依赖:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-mllib_2.12</artifactId>
  9. <version>3.0.0</version>
  10. </dependency>

还有一些其他依赖,比如spark-hive、spark-streaming,暂时用不到,就先不添加了。 

3、功能实现

在spark根目录下创建文件夹datas,用来存放要读取的文件

编写Scala01_WordCount代码:

  1. package com.wolf.bigdata.spark.core.wc
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Scala01_WordCount {
  4. def main(args: Array[String]): Unit = {
  5. // 1、建立和Spark框架的连接
  6. val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
  7. val sc = new SparkContext(sparConf)
  8. // 2、执行业务操作
  9. // (1) read file(s), get lines
  10. val lines = sc.textFile("datas")
  11. // (2) split lines, get words
  12. // flatMap
  13. val words = lines.flatMap(_.split(" "))
  14. // (3) group words for counting eg: "hello world"-> (hello), (world)
  15. val wordGroup = words.groupBy(word => word)
  16. // (4) convert data after grouping [convert (a,a,a) to (a,3)]
  17. val wordToCount = wordGroup.map{
  18. case(word,list)=>{
  19. (word,list.size)
  20. }
  21. }
  22. // (5) show result in cmd
  23. val array = wordToCount.collect()
  24. array.foreach(println)
  25. // 3、关闭连接
  26. sc.stop()
  27. }
  28. }

运行,显示结果:

4、另一种实现方式

  1. package com.wolf.bigdata.spark.core.wc
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Scala01_WordCount_another {
  4. def main(args: Array[String]): Unit = {
  5. // 1、建立和Spark框架的连接
  6. val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
  7. val sc = new SparkContext(sparConf)
  8. // 2、执行业务操作
  9. // (1) read file(s), get lines
  10. val lines = sc.textFile("datas")
  11. // (2) split lines, get words
  12. // flatMap
  13. val words = lines.flatMap(_.split(" "))
  14. // (3) convert word->(word,1)
  15. val wordToOne = words.map(
  16. word => (word, 1)
  17. )
  18. // (4) group by the "word" in (word,1) (because t._1 means the first num in the tuple)
  19. val wordGroup = wordToOne.groupBy(t => t._1)
  20. val wordToCount = wordGroup.map{
  21. case(word,list)=>{
  22. list.reduce(
  23. (t1, t2) => {
  24. (t1._1, t1._2 + t2._2)
  25. }
  26. )
  27. }
  28. }
  29. // (5) show result in cmd
  30. val array = wordToCount.collect()
  31. array.foreach(println)
  32. // 3、关闭连接
  33. sc.stop()
  34. }
  35. }

5、利用Spark进行实现

Spark框架提供了更多的功能,比如,可以将分组和聚合使用一个方法进行实现

  1. package com.wolf.bigdata.spark.core.wc
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Scala03_WordCount_Spark {
  4. def main(args: Array[String]): Unit = {
  5. // 1、建立和Spark框架的连接
  6. val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
  7. val sc = new SparkContext(sparConf)
  8. // 2、执行业务操作
  9. // (1) read file(s), get lines
  10. val lines = sc.textFile("datas")
  11. // (2) split lines, get words
  12. // flatMap
  13. val words = lines.flatMap(_.split(" "))
  14. // (3) convert word->(word,1)
  15. val wordToOne = words.map(
  16. word => (word, 1)
  17. )
  18. // (4) Spark-> reduceByKey
  19. val wordToCount = wordToOne.reduceByKey(_ + _)
  20. // (5) show result in cmd
  21. val array = wordToCount.collect()
  22. array.foreach(println)
  23. // 3、关闭连接
  24. sc.stop()
  25. }
  26. }

6、编写日志输出格式化文件

在resources中添加log4j.properties文件,写入内容如下

  1. log4j.rootCategory=ERROR, console
  2. log4j.appender.console=org.apache.log4j.ConsoleAppender
  3. log4j.appender.console.target=System.err
  4. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  5. log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
  6. HH:mm:ss} %p %c{1}: %m%n
  7. # Set the default spark-shell log level to ERROR. When running the spark-shell,
  8. the
  9. # log level for this class is used to overwrite the root logger's log level, so
  10. that
  11. # the user can have different defaults for the shell and regular Spark apps.
  12. log4j.logger.org.apache.spark.repl.Main=ERROR
  13. # Settings to quiet third party logs that are too verbose
  14. log4j.logger.org.spark_project.jetty=ERROR
  15. log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
  16. log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
  17. log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
  18. log4j.logger.org.apache.parquet=ERROR
  19. log4j.logger.parquet=ERROR
  20. # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent
  21. UDFs in SparkSQL with Hive support
  22. log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
  23. log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

再次运行,只会打印错误的日志,正确的日志不会打印出来。(比赛的时候不要写这个)

三、Spark运行环境

1、Local模式

Local模式是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等,之前在 IDEA 中运行代码的环境我们称之为开发环境,不太一样。

1.1 解压缩文件

tar -zxvf spark-2.4.5-bin-hadoop2.6.tgz -C /opt/module

重命名

  1. cd /opt/module/
  2. mv spark-2.4.5-bin-hadoop2.6 spark-local

1.2 启动local环境

进入解压后的路径,执行以下命令

bin/spark-shell 

启动成功后,可以输入网址进行 Web UI 监控页面访问

http://hadoop102:4040

 

 1.3 命令行工具简单使用

在解压缩文件夹下的 data 目录中,添加 word.txt 文件。在命令行工具中执行如下代码指 令(和 IDEA 中代码简化版一致)

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

1.4 退出local模式

按键 Ctrl+C 或输入 Scala 指令:

:quit

1.5 提交应用

2、Standalone模式

local 本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的 集群中去执行,这里我们来看看只使用 Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark 的 Standalone 模式体现了经典的 master-slave 模式。

集群规划: 

 2.1 解压缩文件

tar -zxvf spark-2.4.5-bin-hadoop2.6.tgz -C /opt/module

重命名

  1. cd /opt/module/
  2. mv spark-2.4.5-bin-hadoop2.6 spark-standalone

2.2 修改配置文件

(1)进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves

mv slaves.template slaves

(2)修改 slaves 文件,添加 work 节点

vim slaves
  1. hadoop102
  2. hadoop103
  3. hadoop104

(3)修改 spark-env.sh.template 文件名为 spark-env.sh

mv spark-env.sh.template spark-env.sh

(4)修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点

  1. export JAVA_HOME=/opt/module/jdk1.8.0_171
  2. SPARK_MASTER_HOST=hadoop102
  3. SPARK_MASTER_PORT=7077

注:这里的7077在Hadoop3中对应着8020

(5)分发spark-standalone目录到103和104集群上

  1. scp -r /opt/module/spark-standalone/ wolf@hadoop103:/opt/module/
  2. scp -r /opt/module/spark-standalone/ wolf@hadoop104:/opt/module/

2.3 启动集群

sbin/start-all.sh

查看三台服务器的运行进程:

查看 Master 资源监控 Web UI 界面:

http://hadoop102:8080

 2.4 提交应用

2.5 配置历史服务

由于 spark-shell 停止掉后,集群监控 linux1:4040 页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。 

(1)修改 spark-defaults.conf.template 文件名为 spark-defaults.conf

mv spark-defaults.conf.template spark-defaults.conf

(2)修改 spark-default.conf 文件,配置日志存储路径

  1. spark.eventLog.enabled true
  2. spark.eventLog.dir hdfs://hadoop102:9000/directory

注意:需要启动 hadoop 集群,HDFS 上的 directory 目录需要提前存在。

hadoop fs -mkdir /directory

(3)修改 spark-env.sh 文件, 添加日志配置

  1. export SPARK_HISTORY_OPTS="
  2. -Dspark.history.ui.port=18080
  3. -Dspark.history.fs.logDirectory=hdfs://hadoop102:9000/directory
  4. -Dspark.history.retainedApplications=30"

(4)分发文件到103和104上

  1. scp -r /opt/module/spark-standalone/conf wolf@hadoop103:/opt/module/spark-standalone/
  2. scp -r /opt/module/spark-standalone/conf wolf@hadoop104:/opt/module/spark-standalone/

(5)重新启动Spark集群和历史服务

在Spark目录中

  1. sbin/start-all.sh
  2. sbin/start-history-server.sh

(6)重新执行任务

(7)可以去web端查看历史任务

http://hadoop102:18080

2.6 配置高可用(HA)

这一部分需要用到Zookeeper,我们以后再来看。

3、Yarn模式

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是也要记住,Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。(貌似不是考点,先跳过了)

 四、Spark运行架构

注意:该部分是纯理论部分

 由上图可以看出,对于 Spark 框架有两个核心组件:

1、Driver

2、Executor

 

3、Master&Worker

 4、ApplicationMaster

 

五、Spark-Core编程

 中间的学习部分看不完了,先从案例入手。

案例一:各省份广告点击数前三名

1、数据准备

pri_user_adv.log:时间戳 省份 城市 用户 广告 ,字段以空格分隔,放在了代码根目录下的datas文件夹中。

2、需求描述

统计出每个省份、每个广告被点击数量排行的Top3

3、需求分析

4、功能实现

代码思路

具体实现

  1. package com.wolf.bigdata.spark.core.rdd
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark_RDD_Req {
  4. def main(args: Array[String]): Unit = {
  5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  6. val sc = new SparkContext(sparkConf)
  7. // 1.get data
  8. // raw data:time province city user ad.
  9. val dataRDD = sc.textFile("datas/pri_user_adv.log")
  10. // 2.process data
  11. // from raw data to : ((province,ad),1)
  12. val mapRDD = dataRDD.map(
  13. line => {
  14. val datas = line.split(" ")
  15. ((datas(1) , datas(4)),1)
  16. }
  17. )
  18. // 3.reduce(sum)
  19. // from new data to : ((province,ad),sum)
  20. val reduceRDD = mapRDD.reduceByKey(_+_)
  21. // 4.convert result
  22. // from ((province,ad),sum) to (province,(ad,sum))
  23. val newMapRDD = reduceRDD.map{
  24. case((prv,ad),sum)=>{
  25. (prv,(ad,sum))
  26. }
  27. }
  28. // 5. group by province
  29. // (provinceA,[(adA,sumA),(adB,sumB)...])
  30. val groupRDD = newMapRDD.groupByKey()
  31. // 6. sort in group
  32. // according num , pick out 3
  33. val resultRDD = groupRDD.mapValues(
  34. iter=>{
  35. iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
  36. }
  37. )
  38. // 7. print in cmd
  39. resultRDD.collect().foreach(println)
  40. sc.stop()
  41. }
  42. }

运行结果

案例二:电商统计用户行为信息

 1、数据准备

 

 2、需求描述

按照每个品类的点击、下单、支付的量来统计热门品类。 

 鞋 点击数 下单数 支付数

衣服 点击数 下单数 支付数

电脑 点击数 下单数 支付数 

本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下 单数;下单数再相同,就比较支付数。 

3、需求分析 

实现方案一

分别统计每个品类点击的次数,下单的次数和支付的次数:

(品类,点击总数)(品类,下单总数)(品类,支付总数) 

实现方案二

一次性统计每个品类点击的次数,下单的次数和支付的次数:

(品类,(点击总数,下单总数,支付总数)) 

实现方案三

使用累加器的方式聚合数据 

4、具体实现

方案一的代码:

  1. package com.wolf.bigdata.spark.core.req
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark01_Top10CateMethod1 {
  4. def main(args: Array[String]): Unit = {
  5. val sparConf = new SparkConf().setMaster("local[*]").setAppName("Top10CateMethod1")
  6. val sc = new SparkContext(sparConf)
  7. // 1.get raw data
  8. val actionRDD = sc.textFile("datas/user_visit_action.txt")
  9. // 2.get click count (ID,click_count)
  10. val clickActionRDD = actionRDD.filter(
  11. action => {
  12. val datas = action.split("_") //each action is divided by "_"
  13. datas(6) != "-1"
  14. }
  15. )
  16. val clickCountRDD = clickActionRDD.map(
  17. action => {
  18. val datas = action.split(("_"))
  19. (datas(6), 1)
  20. }
  21. ).reduceByKey(_ + _)
  22. // 3.get order count (ID,order_count)
  23. val orderActionRDD = actionRDD.filter(
  24. action => {
  25. val datas = action.split("_") //each action is divided by "_"
  26. datas(8) != "null"
  27. }
  28. ) // orderid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
  29. val orderCountRDD = orderActionRDD.flatMap(
  30. action => {
  31. val datas = action.split("_")
  32. val cid = datas(8)
  33. val cids = cid.split(",")
  34. cids.map(id => (id, 1))
  35. }
  36. ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
  37. // 4. get pay count (ID,pay_count)
  38. val payActionRDD = actionRDD.filter(
  39. action => {
  40. val datas = action.split("_") //each action is divided by "_"
  41. datas(10) != "null"
  42. }
  43. ) // payid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
  44. val payCountRDD = payActionRDD.flatMap(
  45. action => {
  46. val datas = action.split("_")
  47. val cid = datas(10)
  48. val cids = cid.split(",")
  49. cids.map(id => (id, 1))
  50. }
  51. ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
  52. // 5. sort , get top 10 ***
  53. // click_count > order_count > pay_count
  54. // tuple sort:firstly compare the first, and then compare the second...
  55. // from [(ID,click_count),(ID,order_count),(ID,pay_count)] to (ID,(click_count,order_count,pay_count))
  56. // join(x) zip(x) leftOuterJoin(x) cogroup(v)
  57. // cogroup = connect + group
  58. val cogroupRDD = clickCountRDD.cogroup(orderCountRDD, payCountRDD)
  59. val analysisRDD = cogroupRDD.mapValues{
  60. case(clickIter,orderIter,payIter)=>{
  61. var clickCnt = 0
  62. val iter1 = clickIter.iterator
  63. if(iter1.hasNext){
  64. clickCnt = iter1.next()
  65. }
  66. var orderCnt = 0
  67. val iter2 = orderIter.iterator
  68. if (iter2.hasNext) {
  69. orderCnt = iter2.next()
  70. }
  71. var payCnt = 0
  72. val iter3 = payIter.iterator
  73. if (iter3.hasNext) {
  74. payCnt = iter3.next()
  75. }
  76. (clickCnt,orderCnt,payCnt)
  77. }
  78. }
  79. val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
  80. // 6. collect result ,and print in cmd
  81. resultRDD.foreach(println)
  82. }
  83. }

运行结果

 方案一有两个问题:

1、actionRDD被重复使用多次

2、cogroup方法可能存在Shuffle,影响性能

问题1的解决方法:把actionRDD放在缓存中

代码实现:

 actionRDD.cache()

问题2的解决方法:

当前数据的样子是这样的:(ID,click_count),(ID,order_count),(ID,pay_count)

我们可以不使用cogroup,直接把数据变成:(ID,(click_count,0,0)),(ID,(0,order_count,0)),(ID,(0,0,pay_count)),然后聚合,这样就不需要用cogroup了。

改进后的方案一代码:

  1. package com.wolf.bigdata.spark.core.req
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark01_Top10CateMethod2 {
  4. def main(args: Array[String]): Unit = {
  5. val sparConf = new SparkConf().setMaster("local[*]").setAppName("Top10CateMethod1")
  6. val sc = new SparkContext(sparConf)
  7. // 1.get raw data
  8. val actionRDD = sc.textFile("datas/user_visit_action.txt")
  9. actionRDD.cache()
  10. // 2.get click count (ID,click_count)
  11. val clickActionRDD = actionRDD.filter(
  12. action => {
  13. val datas = action.split("_") //each action is divided by "_"
  14. datas(6) != "-1"
  15. }
  16. )
  17. val clickCountRDD = clickActionRDD.map(
  18. action => {
  19. val datas = action.split(("_"))
  20. (datas(6), 1)
  21. }
  22. ).reduceByKey(_ + _)
  23. // 3.get order count (ID,order_count)
  24. val orderActionRDD = actionRDD.filter(
  25. action => {
  26. val datas = action.split("_") //each action is divided by "_"
  27. datas(8) != "null"
  28. }
  29. ) // orderid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
  30. val orderCountRDD = orderActionRDD.flatMap(
  31. action => {
  32. val datas = action.split("_")
  33. val cid = datas(8)
  34. val cids = cid.split(",")
  35. cids.map(id => (id, 1))
  36. }
  37. ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
  38. // 4. get pay count (ID,pay_count)
  39. val payActionRDD = actionRDD.filter(
  40. action => {
  41. val datas = action.split("_") //each action is divided by "_"
  42. datas(10) != "null"
  43. }
  44. ) // payid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
  45. val payCountRDD = payActionRDD.flatMap(
  46. action => {
  47. val datas = action.split("_")
  48. val cid = datas(10)
  49. val cids = cid.split(",")
  50. cids.map(id => (id, 1))
  51. }
  52. ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
  53. // 5. sort , get top 10 ***
  54. // click_count > order_count > pay_count
  55. // tuple sort:firstly compare the first, and then compare the second...
  56. // from [(ID,click_count),(ID,order_count),(ID,pay_count)] to
  57. // (ID,(click_count,0,0)),(ID,(0,order_count,0)),(ID,(0,0,pay_count))
  58. // then reduce
  59. val rdd1 = clickCountRDD.map {
  60. case (cid, cnt) => {
  61. (cid, (cnt, 0, 0))
  62. }
  63. }
  64. val rdd2 = orderCountRDD.map {
  65. case (cid, cnt) => {
  66. (cid, (0, cnt, 0))
  67. }
  68. }
  69. val rdd3 = payCountRDD.map {
  70. case (cid, cnt) => {
  71. (cid, (0, 0, cnt))
  72. }
  73. }
  74. // union together and then reduce
  75. val sourceRDD = rdd1.union(rdd2).union(rdd3)
  76. // reduce
  77. val analysisRDD = sourceRDD.reduceByKey(
  78. (t1, t2) => {
  79. (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
  80. }
  81. )
  82. val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
  83. // 6. collect result ,and print in cmd
  84. resultRDD.foreach(println)
  85. }
  86. }

运行结果:

改进后仍存在的问题:存在大量的Shuffle操作(reduceByKey)

其实,我们可以改变思路,进而优化代码:

方案二的代码:

  1. package com.wolf.bigdata.spark.core.req
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark01_Top10CateMethod3 {
  4. def main(args: Array[String]): Unit = {
  5. val sparConf = new SparkConf().setMaster("local[*]").setAppName("Top10CateMethod1")
  6. val sc = new SparkContext(sparConf)
  7. // 1.get raw data
  8. val actionRDD = sc.textFile("datas/user_visit_action.txt")
  9. // 2.convert data structure
  10. // if click: (ID,(1,0,0))
  11. // if order: (ID,(0,1,0))
  12. // if pay: (ID,(0,0,1))
  13. val flatRDD = actionRDD.flatMap(
  14. action => {
  15. val datas = action.split("_")
  16. if (datas(6) != "-1") {
  17. // click
  18. List((datas(6), (1, 0, 0)))
  19. }
  20. else if (datas(8) != "null") {
  21. // order
  22. val ids = datas(8).split(",")
  23. ids.map(id => (id, (0, 1, 0)))
  24. }
  25. else if (datas(10) != "null") {
  26. //pay
  27. val ids = datas(10).split(",")
  28. ids.map(id => (id, (0, 0, 1)))
  29. }
  30. else {
  31. Nil
  32. }
  33. }
  34. )
  35. // 3. reduce the same ID
  36. // (ID,(cnt1,cnt2,cnt3)
  37. val analysisRDD = flatRDD.reduceByKey(
  38. (t1, t2) => {
  39. (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
  40. }
  41. )
  42. // 4.sort descending,Top 10
  43. val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
  44. // 5. collect result ,and print in cmd
  45. resultRDD.foreach(println)
  46. }
  47. }

这样只需要一个reduceByKey,也即只需要一个Shuffle操作

运行结果

方案三用到了自定义累加器,以后再学习。

六、Spark MLlib

这里不做具体学习,只是写一点案例的代码。

Spark MLlib

引入依赖

在pom中引入如下依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-sql_2.12</artifactId>
  9. <version>3.0.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-mllib_2.12</artifactId>
  14. <version>3.0.0</version>
  15. </dependency>

代码

  1. package com.wolf.bigdata.spark.core.req
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.{Row, DataFrame}
  4. import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
  5. import org.apache.spark.ml.recommendation.ALS
  6. object Test01 {
  7. def main(args: Array[String]): Unit = {
  8. // 创建SparkSession: 首先,创建一个SparkSession,这是与Spark进行交互的入口点。
  9. val spark = SparkSession.builder()
  10. .appName("MovieRecommendation")
  11. .master("local[*]") // 这里设置本地模式,[*] 表示使用所有 CPU 核心
  12. .getOrCreate()
  13. // 读取数据: 使用Spark DataFrame API读取用户-电影评分数据。
  14. val data = spark.read
  15. .option("header", "true")
  16. .option("inferSchema", "true")
  17. .csv("datas/capter5_2ml.csv")
  18. // 创建ALS模型: 配置ALS模型的参数
  19. val als = new ALS()
  20. .setMaxIter(5)
  21. .setRegParam(0.01)
  22. .setUserCol("userId")
  23. .setItemCol("movieId")
  24. .setRatingCol("rating")
  25. // 训练模型: 使用训练数据对ALS模型进行训练
  26. val model = als.fit(data)
  27. // 生成用户推荐: 使用recommendForUserSubset方法为指定用户生成电影推荐。
  28. // 请替换user_ids为你要生成推荐的用户ID列表
  29. val user_ids = Seq(1, 2, 3)
  30. // val user_df = user_ids.toDF("userId")
  31. val userRows = user_ids.map(id => Row(id))
  32. val userSchema = List(StructField("userId", IntegerType, nullable = false))
  33. val user_df = spark.createDataFrame(spark.sparkContext.parallelize(userRows), StructType(userSchema))
  34. val user_recommendations = model.recommendForUserSubset(user_df, 10)
  35. // 生成电影推荐: 使用recommendForItemSubset方法为指定电影生成用户推荐。
  36. // 请替换movie_ids为你要生成推荐的电影ID列表
  37. val movie_ids = Seq(1, 2, 3, 4, 5)
  38. // val movie_df = movie_ids.toDF("movieId")
  39. val movieRows = movie_ids.map(id => Row(id))
  40. val movieSchema = List(StructField("movieId", IntegerType, nullable = false))
  41. val movie_df = spark.createDataFrame(spark.sparkContext.parallelize(movieRows), StructType(movieSchema))
  42. val movie_recommendations = model.recommendForItemSubset(movie_df, 10)
  43. // 结果输出: 将生成的推荐结果输出,例如,打印到控制台。
  44. // 输出用户推荐
  45. // user_recommendations.show()
  46. user_recommendations.take(10).foreach(println)
  47. // 输出电影推荐
  48. // movie_recommendations.show()
  49. movie_recommendations.take(10).foreach(println)
  50. // 关闭SparkSession: 最后,确保关闭SparkSession以释放资源。
  51. spark.stop()
  52. }
  53. }

案例

使用Spark MLlib中的使用ALS算法给每个用户推荐某个商品。

注意:对原始数据添加列名,方便后续操作

要求

1、创建SparkSession

 首先,创建一个SparkSession,这是与Spark进行交互的入口点。

  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession.builder()
  3. .appName("MovieRecommendation")
  4. .master("local[*]") // 这里设置本地模式,[*] 表示使用所有 CPU 核心
  5. .getOrCreate()

2、读取数据

使用Spark DataFrame API读取用户-电影评分数据。

  1. val data = spark.read
  2. .option("header", "true")
  3. .option("inferSchema", "true")
  4. .csv("datas/capter5_2ml.csv")

3、创建ALS模型并配置ALS模型的参数

  1. import org.apache.spark.ml.recommendation.ALS
  2. val als = new ALS()
  3. .setMaxIter(5)
  4. .setRegParam(0.01)
  5. .setUserCol("userId")
  6. .setItemCol("movieId")
  7. .setRatingCol("rating")

4、训练模型

使用训练数据对ALS模型进行训练

val model = als.fit(data)

5、生成用户推荐

使用recommendForUserSubset方法为指定用户生成电影推荐。

  1. import org.apache.spark.sql.{Row, DataFrame}
  2. import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
  3. val user_ids = Seq(1, 2, 3)
  4. val userRows = user_ids.map(id => Row(id))
  5. val userSchema = List(StructField("userId", IntegerType, nullable = false))
  6. val user_df = spark.createDataFrame(spark.sparkContext.parallelize(userRows), StructType(userSchema))
  7. val user_recommendations = model.recommendForUserSubset(user_df, 10)

6、生成电影推荐

使用recommendForItemSubset方法为指定电影生成用户推荐。

  1. val movie_ids = Seq(1, 2, 3, 4, 5)
  2. val movieRows = movie_ids.map(id => Row(id))
  3. val movieSchema = List(StructField("movieId", IntegerType, nullable = false))
  4. val movie_df = spark.createDataFrame(spark.sparkContext.parallelize(movieRows), StructType(movieSchema))
  5. val movie_recommendations = model.recommendForItemSubset(movie_df, 10)

7、结果输出

将生成的推荐结果输出,例如,打印到控制台。

  1. // 输出用户推荐
  2. // user_recommendations.show()
  3. user_recommendations.take(10).foreach(println)
  4. // 输出电影推荐
  5. // movie_recommendations.show()
  6. movie_recommendations.take(10).foreach(println)

8、关闭Session

spark.stop()

完整代码

  1. package com.wolf.bigdata.spark.core.req
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.{Row, DataFrame}
  4. import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
  5. import org.apache.spark.ml.recommendation.ALS
  6. object Test01 {
  7. def main(args: Array[String]): Unit = {
  8. // 创建SparkSession: 首先,创建一个SparkSession,这是与Spark进行交互的入口点。
  9. val spark = SparkSession.builder()
  10. .appName("MovieRecommendation")
  11. .master("local[*]") // 这里设置本地模式,[*] 表示使用所有 CPU 核心
  12. .getOrCreate()
  13. // 读取数据: 使用Spark DataFrame API读取用户-电影评分数据。
  14. val data = spark.read
  15. .option("header", "true")
  16. .option("inferSchema", "true")
  17. .csv("datas/capter5_2ml.csv")
  18. // 创建ALS模型: 配置ALS模型的参数
  19. val als = new ALS()
  20. .setMaxIter(5)
  21. .setRegParam(0.01)
  22. .setUserCol("userId")
  23. .setItemCol("movieId")
  24. .setRatingCol("rating")
  25. // 训练模型: 使用训练数据对ALS模型进行训练
  26. val model = als.fit(data)
  27. // 生成用户推荐: 使用recommendForUserSubset方法为指定用户生成电影推荐。
  28. // 请替换user_ids为你要生成推荐的用户ID列表
  29. val user_ids = Seq(1, 2, 3)
  30. // val user_df = user_ids.toDF("userId")
  31. val userRows = user_ids.map(id => Row(id))
  32. val userSchema = List(StructField("userId", IntegerType, nullable = false))
  33. val user_df = spark.createDataFrame(spark.sparkContext.parallelize(userRows), StructType(userSchema))
  34. val user_recommendations = model.recommendForUserSubset(user_df, 10)
  35. // 生成电影推荐: 使用recommendForItemSubset方法为指定电影生成用户推荐。
  36. // 请替换movie_ids为你要生成推荐的电影ID列表
  37. val movie_ids = Seq(1, 2, 3, 4, 5)
  38. // val movie_df = movie_ids.toDF("movieId")
  39. val movieRows = movie_ids.map(id => Row(id))
  40. val movieSchema = List(StructField("movieId", IntegerType, nullable = false))
  41. val movie_df = spark.createDataFrame(spark.sparkContext.parallelize(movieRows), StructType(movieSchema))
  42. val movie_recommendations = model.recommendForItemSubset(movie_df, 10)
  43. // 结果输出: 将生成的推荐结果输出,例如,打印到控制台。
  44. // 输出用户推荐
  45. // user_recommendations.show()
  46. user_recommendations.take(10).foreach(println)
  47. // 输出电影推荐
  48. // movie_recommendations.show()
  49. movie_recommendations.take(10).foreach(println)
  50. // 关闭SparkSession: 最后,确保关闭SparkSession以释放资源。
  51. spark.stop()
  52. }
  53. }

 结果截图

PySpark

PySpark安装

pip install pyspark

or

pip install -i https://pypi.tuna/tsinghua.edu.cn/simple pyspark

Numpy

pip install numpy -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

案例

使用Spark MLlib中的使用ALS算法给每个用户推荐某个商品。

 注意:对原始数据添加列名,方便后续操作

要求

0、创建Session,读取数据

  1. from pyspark.sql import SparkSession
  2. # 创建SparkSession
  3. spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
  4. # 读取数据
  5. data = spark.read.csv("capter5_2ml.csv", header=True, inferSchema=True)

 ①设置迭代次数为5次,惩罚系数为0.01,得到评分的矩阵形式(2分)。

  1. from pyspark.ml.recommendation import ALS
  2. als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

②对模型进行拟合,训练出合适的模型(2分)。

model = als.fit(data)

③为一组指定的用户生成十大电影推荐(4分)。

  1. user_ids = [1]
  2. # 创建一个DataFrame来存储指定用户的ID
  3. user_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids])
  4. # 使用ALS模型生成前十名用户的电影推荐
  5. user_recommendations = model.recommendForUserSubset(user_df, 10)

④生成前十名用户推荐的一组指定的电影(4分)。

  1. # 创建一个DataFrame包含前十名用户的ID
  2. user_ids_top10 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  3. top_users_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids_top10])
  4. # 使用ALS模型生成前十名用户的电影推荐
  5. top_users_recommendations = model.recommendForUserSubset(top_users_df, 10)

⑤对结果进行正确输出(1分)。

  1. from pyspark.sql import Row
  2. # 输出一组用户推荐
  3. for row in user_recommendations.collect():
  4. user_id = row.userId
  5. recommendations = [(r.movieId, r.rating) for r in row.recommendations]
  6. print(f"User {user_id} recommendations: {recommendations}")
  7. # 输出前十名用户的电影推荐
  8. for row in top_users_recommendations.collect():
  9. user_id = row.userId
  10. recommendations = [(r.movieId, r.rating) for r in row.recommendations]
  11. print(f"User {user_id} recommendations: {recommendations}")

关闭Session

spark.stop()

建模过程,建模思路描述(2分):

完整代码

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import Row
  3. from pyspark.ml.recommendation import ALS
  4. # 创建SparkSession
  5. spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
  6. # 读取数据
  7. data = spark.read.csv("capter5_2ml.csv", header=True, inferSchema=True)
  8. # ① 设置迭代次数为5次,惩罚系数为0.01,得到评分的矩阵形式
  9. als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
  10. # ② 对模型进行拟合,训练出合适的模型
  11. model = als.fit(data)
  12. # ③ 为一组指定的用户生成十大电影推荐
  13. # 请替换user_ids为你要生成推荐的用户ID列表
  14. # user_ids = [1, 217, 381]
  15. user_ids = [1]
  16. # 创建一个DataFrame来存储用户ID
  17. user_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids])
  18. user_recommendations = model.recommendForUserSubset(user_df, 10)
  19. #
  20. # # 请替换movie_ids为你要生成推荐的电影ID列表
  21. # movie_ids = [1, 3, 6, 1023, 1024]
  22. #
  23. # # 创建一个DataFrame来存储电影ID
  24. # movie_df = spark.createDataFrame([Row(movieId=mid) for mid in movie_ids])
  25. #
  26. # movie_recommendations = model.recommendForItemSubset(movie_df, 10)
  27. # ④ 生成前十名用户推荐的一组指定的电影
  28. # 创建一个DataFrame包含前十名用户的ID
  29. user_ids_top10 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  30. top_users_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids_top10])
  31. # 使用ALS模型生成前十名用户的电影推荐
  32. top_users_recommendations = model.recommendForUserSubset(top_users_df, 10)
  33. # ⑤ 对结果进行正确输出
  34. # 输出1Group用户推荐
  35. for row in user_recommendations.collect():
  36. user_id = row.userId
  37. recommendations = [(r.movieId, r.rating) for r in row.recommendations]
  38. print(f"User {user_id} recommendations: {recommendations}")
  39. # # 输出电影推荐
  40. # for row in movie_recommendations.collect():
  41. # movie_id = row.movieId
  42. # recommendations = [(r.userId, r.rating) for r in row.recommendations]
  43. # print(f"Movie {movie_id} recommendations: {recommendations}")
  44. # 输出前十名用户的电影推荐
  45. for row in top_users_recommendations.collect():
  46. user_id = row.userId
  47. recommendations = [(r.movieId, r.rating) for r in row.recommendations]
  48. print(f"User {user_id} recommendations: {recommendations}")
  49. # 关闭SparkSession
  50. spark.stop()

结果截图:

问题:

Q:为什么数据中的rating最高分只有五分,但是预测出的分数有很多都超过了五分?

A(GPT生成):

在协同过滤推荐系统中,ALS(交替最小二乘法)模型的预测评分可以超过原始数据中的评分范围。这是因为ALS模型是一种基于矩阵分解的模型,它试图学习用户和物品的潜在特征向量,以最小化预测评分与实际评分之间的均方误差。这意味着模型可以生成比原始评分范围更广的预测值。

在你的情况下,原始数据的评分范围为1到5分,但ALS模型的预测评分可以超过这个范围。这并不一定是一个问题,因为ALS模型的目标是提供一组推荐,而不仅仅是复制现有的评分。模型可能会认为某些用户对某些电影的兴趣更高,因此给出了更高的预测评分。

通常,当使用ALS模型时,我们关心的是生成有序的推荐列表,而不是特定的评分值。你可以根据预测评分的高低来排序推荐列表,并向用户推荐最高评分的物品。当然,你可以选择对生成的预测评分进行某种标准化,以确保它们在1到5分的范围内,但这通常不是必要的。

一些其他问题

JAVA_HOME问题

如果PySpark没有自动检测到JAVA_HOME:

  1. import os
  2. os.environ['JAVA_HOME'] = ""

Linux下Python安装

下载、解压

安装依赖

yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel gcc make

  1. ./configure --prefix=/opt/module/Python-3.9.18
  2. make
  3. make install
  4. // 建立软链接
  5. sudo ln -s /opt/module/Python-3.9.18/bin/python3.9 /usr/bin/python3
  6. sudo ln -s /opt/module/Python-3.9.18/bin/pip3.9 /usr/bin/pip3
  7. // 配置环境变量
  8. vi ~/.bash_profile
  9. #配置python
  10. export PYTHON_HOME=/opt/module/Python-3.9.18
  11. export PATH=$PYTHON_HOME/bin:$PATH
  12. // 激活环境变量
  13. source ~/.bash_profile

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号