赞
踩
Spark是用Scala开发的,所以进行Spark开发的语言就是Scala,所以先用IDEA安装Scala插件。
在IDEA中,File—Settings—Plugins找到Scala,Install。
安装完会提示重启IDEA,重启即可。
安装完之后,点击File—Project Structure—Global Libraries
点击Download,
加载一段时间后会要求选择Scala版本,我们选个老一点的2.12.11,然后就会自动下载。
创建Maven工程:
由于Spark的学习包括很多模块,所以我们可以把spark项目当做一个父项目,先把原来的src文件夹删掉,新建一个个模块,作为不同模块的子项目。
新建一个Module,
在src/main/java中新建一个包: com.wolf.bigdata.spark.core
在包里新建一个Scala类Test,类型选object
写入测试内容:
- package com.wolf.bigdata.spark.core
-
- object Test {
-
- def main(args: Array[String]): Unit = {
-
- println("hello spark")
- }
-
- }
运行输出结果:
说明Scala配置成功。
说明:这里暂时先用Maven进行配置,以后再进行环境搭建
在spark.core包中新建一个包叫wc
在包里新建一个Scala-class叫Scala01_WordCount,类型为object
Spark编程一般可以分为三大步骤:
- // 1、建立和Spark框架的连接
-
- // 2、执行业务操作
-
- // 3、关闭连接
在pom.xml中添加如下依赖:
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- </dependencies>
为了实现后续功能,原则上还要添加如下两个依赖:
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
还有一些其他依赖,比如spark-hive、spark-streaming,暂时用不到,就先不添加了。
在spark根目录下创建文件夹datas,用来存放要读取的文件
编写Scala01_WordCount代码:
- package com.wolf.bigdata.spark.core.wc
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Scala01_WordCount {
-
- def main(args: Array[String]): Unit = {
-
- // 1、建立和Spark框架的连接
- val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- // 2、执行业务操作
- // (1) read file(s), get lines
- val lines = sc.textFile("datas")
-
- // (2) split lines, get words
- // flatMap
- val words = lines.flatMap(_.split(" "))
- // (3) group words for counting eg: "hello world"-> (hello), (world)
- val wordGroup = words.groupBy(word => word)
- // (4) convert data after grouping [convert (a,a,a) to (a,3)]
- val wordToCount = wordGroup.map{
- case(word,list)=>{
- (word,list.size)
- }
- }
- // (5) show result in cmd
- val array = wordToCount.collect()
- array.foreach(println)
- // 3、关闭连接
- sc.stop()
-
- }
- }

运行,显示结果:
- package com.wolf.bigdata.spark.core.wc
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Scala01_WordCount_another {
-
- def main(args: Array[String]): Unit = {
-
- // 1、建立和Spark框架的连接
- val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- // 2、执行业务操作
- // (1) read file(s), get lines
- val lines = sc.textFile("datas")
-
- // (2) split lines, get words
- // flatMap
- val words = lines.flatMap(_.split(" "))
- // (3) convert word->(word,1)
- val wordToOne = words.map(
- word => (word, 1)
- )
- // (4) group by the "word" in (word,1) (because t._1 means the first num in the tuple)
- val wordGroup = wordToOne.groupBy(t => t._1)
-
- val wordToCount = wordGroup.map{
- case(word,list)=>{
- list.reduce(
- (t1, t2) => {
- (t1._1, t1._2 + t2._2)
- }
- )
- }
- }
- // (5) show result in cmd
- val array = wordToCount.collect()
- array.foreach(println)
- // 3、关闭连接
- sc.stop()
-
- }
- }

Spark框架提供了更多的功能,比如,可以将分组和聚合使用一个方法进行实现
- package com.wolf.bigdata.spark.core.wc
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Scala03_WordCount_Spark {
-
- def main(args: Array[String]): Unit = {
-
- // 1、建立和Spark框架的连接
- val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(sparConf)
-
- // 2、执行业务操作
- // (1) read file(s), get lines
- val lines = sc.textFile("datas")
-
- // (2) split lines, get words
- // flatMap
- val words = lines.flatMap(_.split(" "))
- // (3) convert word->(word,1)
- val wordToOne = words.map(
- word => (word, 1)
- )
-
- // (4) Spark-> reduceByKey
- val wordToCount = wordToOne.reduceByKey(_ + _)
-
- // (5) show result in cmd
- val array = wordToCount.collect()
- array.foreach(println)
- // 3、关闭连接
- sc.stop()
-
- }
- }

在resources中添加log4j.properties文件,写入内容如下
log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to ERROR. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=ERROR # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
再次运行,只会打印错误的日志,正确的日志不会打印出来。(比赛的时候不要写这个)
Local模式是不需要其他任何节点资源就可以在本地执行 Spark 代码的环境,一般用于教学,调试,演示等,之前在 IDEA 中运行代码的环境我们称之为开发环境,不太一样。
tar -zxvf spark-2.4.5-bin-hadoop2.6.tgz -C /opt/module
重命名
- cd /opt/module/
- mv spark-2.4.5-bin-hadoop2.6 spark-local
进入解压后的路径,执行以下命令
bin/spark-shell
启动成功后,可以输入网址进行 Web UI 监控页面访问
http://hadoop102:4040
在解压缩文件夹下的 data 目录中,添加 word.txt 文件。在命令行工具中执行如下代码指 令(和 IDEA 中代码简化版一致)
sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
按键 Ctrl+C 或输入 Scala 指令:
:quit
local 本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的 集群中去执行,这里我们来看看只使用 Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark 的 Standalone 模式体现了经典的 master-slave 模式。
tar -zxvf spark-2.4.5-bin-hadoop2.6.tgz -C /opt/module
重命名
- cd /opt/module/
- mv spark-2.4.5-bin-hadoop2.6 spark-standalone
(1)进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves
mv slaves.template slaves
(2)修改 slaves 文件,添加 work 节点
vim slaves
- hadoop102
- hadoop103
- hadoop104
(3)修改 spark-env.sh.template 文件名为 spark-env.sh
mv spark-env.sh.template spark-env.sh
(4)修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点
- export JAVA_HOME=/opt/module/jdk1.8.0_171
- SPARK_MASTER_HOST=hadoop102
- SPARK_MASTER_PORT=7077
注:这里的7077在Hadoop3中对应着8020
(5)分发spark-standalone目录到103和104集群上
- scp -r /opt/module/spark-standalone/ wolf@hadoop103:/opt/module/
- scp -r /opt/module/spark-standalone/ wolf@hadoop104:/opt/module/
sbin/start-all.sh
查看三台服务器的运行进程:
查看 Master 资源监控 Web UI 界面:
http://hadoop102:8080
由于 spark-shell 停止掉后,集群监控 linux1:4040 页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。
(1)修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
(2)修改 spark-default.conf 文件,配置日志存储路径
- spark.eventLog.enabled true
- spark.eventLog.dir hdfs://hadoop102:9000/directory
注意:需要启动 hadoop 集群,HDFS 上的 directory 目录需要提前存在。
hadoop fs -mkdir /directory
(3)修改 spark-env.sh 文件, 添加日志配置
- export SPARK_HISTORY_OPTS="
- -Dspark.history.ui.port=18080
- -Dspark.history.fs.logDirectory=hdfs://hadoop102:9000/directory
- -Dspark.history.retainedApplications=30"
(4)分发文件到103和104上
- scp -r /opt/module/spark-standalone/conf wolf@hadoop103:/opt/module/spark-standalone/
- scp -r /opt/module/spark-standalone/conf wolf@hadoop104:/opt/module/spark-standalone/
(5)重新启动Spark集群和历史服务
在Spark目录中
- sbin/start-all.sh
- sbin/start-history-server.sh
(6)重新执行任务
(7)可以去web端查看历史任务
http://hadoop102:18080
这一部分需要用到Zookeeper,我们以后再来看。
独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是也要记住,Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。(貌似不是考点,先跳过了)
注意:该部分是纯理论部分
由上图可以看出,对于 Spark 框架有两个核心组件:
中间的学习部分看不完了,先从案例入手。
pri_user_adv.log:时间戳 省份 城市 用户 广告 ,字段以空格分隔,放在了代码根目录下的datas文件夹中。
统计出每个省份、每个广告被点击数量排行的Top3
代码思路
具体实现
- package com.wolf.bigdata.spark.core.rdd
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark_RDD_Req {
-
- def main(args: Array[String]): Unit = {
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
- val sc = new SparkContext(sparkConf)
-
- // 1.get data
- // raw data:time province city user ad.
- val dataRDD = sc.textFile("datas/pri_user_adv.log")
-
- // 2.process data
- // from raw data to : ((province,ad),1)
- val mapRDD = dataRDD.map(
- line => {
- val datas = line.split(" ")
- ((datas(1) , datas(4)),1)
- }
- )
-
- // 3.reduce(sum)
- // from new data to : ((province,ad),sum)
- val reduceRDD = mapRDD.reduceByKey(_+_)
-
- // 4.convert result
- // from ((province,ad),sum) to (province,(ad,sum))
- val newMapRDD = reduceRDD.map{
- case((prv,ad),sum)=>{
- (prv,(ad,sum))
- }
- }
- // 5. group by province
- // (provinceA,[(adA,sumA),(adB,sumB)...])
- val groupRDD = newMapRDD.groupByKey()
- // 6. sort in group
- // according num , pick out 3
- val resultRDD = groupRDD.mapValues(
- iter=>{
- iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
- }
- )
- // 7. print in cmd
- resultRDD.collect().foreach(println)
-
- sc.stop()
- }
-
- }

运行结果
按照每个品类的点击、下单、支付的量来统计热门品类。
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下 单数;下单数再相同,就比较支付数。
实现方案一
分别统计每个品类点击的次数,下单的次数和支付的次数:
(品类,点击总数)(品类,下单总数)(品类,支付总数)
实现方案二
一次性统计每个品类点击的次数,下单的次数和支付的次数:
(品类,(点击总数,下单总数,支付总数))
实现方案三
使用累加器的方式聚合数据
方案一的代码:
- package com.wolf.bigdata.spark.core.req
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_Top10CateMethod1 {
- def main(args: Array[String]): Unit = {
-
- val sparConf = new SparkConf().setMaster("local[*]").setAppName("Top10CateMethod1")
- val sc = new SparkContext(sparConf)
-
- // 1.get raw data
- val actionRDD = sc.textFile("datas/user_visit_action.txt")
-
- // 2.get click count (ID,click_count)
- val clickActionRDD = actionRDD.filter(
- action => {
- val datas = action.split("_") //each action is divided by "_"
- datas(6) != "-1"
- }
- )
- val clickCountRDD = clickActionRDD.map(
- action => {
- val datas = action.split(("_"))
- (datas(6), 1)
- }
- ).reduceByKey(_ + _)
- // 3.get order count (ID,order_count)
- val orderActionRDD = actionRDD.filter(
- action => {
- val datas = action.split("_") //each action is divided by "_"
- datas(8) != "null"
- }
- ) // orderid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
- val orderCountRDD = orderActionRDD.flatMap(
- action => {
- val datas = action.split("_")
- val cid = datas(8)
- val cids = cid.split(",")
- cids.map(id => (id, 1))
- }
- ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
-
- // 4. get pay count (ID,pay_count)
- val payActionRDD = actionRDD.filter(
- action => {
- val datas = action.split("_") //each action is divided by "_"
- datas(10) != "null"
- }
- ) // payid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
- val payCountRDD = payActionRDD.flatMap(
- action => {
- val datas = action.split("_")
- val cid = datas(10)
- val cids = cid.split(",")
- cids.map(id => (id, 1))
- }
- ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
- // 5. sort , get top 10 ***
- // click_count > order_count > pay_count
- // tuple sort:firstly compare the first, and then compare the second...
- // from [(ID,click_count),(ID,order_count),(ID,pay_count)] to (ID,(click_count,order_count,pay_count))
- // join(x) zip(x) leftOuterJoin(x) cogroup(v)
- // cogroup = connect + group
- val cogroupRDD = clickCountRDD.cogroup(orderCountRDD, payCountRDD)
- val analysisRDD = cogroupRDD.mapValues{
- case(clickIter,orderIter,payIter)=>{
- var clickCnt = 0
- val iter1 = clickIter.iterator
- if(iter1.hasNext){
- clickCnt = iter1.next()
- }
- var orderCnt = 0
- val iter2 = orderIter.iterator
- if (iter2.hasNext) {
- orderCnt = iter2.next()
- }
- var payCnt = 0
- val iter3 = payIter.iterator
- if (iter3.hasNext) {
- payCnt = iter3.next()
- }
- (clickCnt,orderCnt,payCnt)
- }
- }
-
- val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
-
- // 6. collect result ,and print in cmd
- resultRDD.foreach(println)
-
- }
- }

运行结果
方案一有两个问题:
1、actionRDD被重复使用多次
2、cogroup方法可能存在Shuffle,影响性能
问题1的解决方法:把actionRDD放在缓存中
代码实现:
actionRDD.cache()
问题2的解决方法:
当前数据的样子是这样的:(ID,click_count),(ID,order_count),(ID,pay_count)
我们可以不使用cogroup,直接把数据变成:(ID,(click_count,0,0)),(ID,(0,order_count,0)),(ID,(0,0,pay_count)),然后聚合,这样就不需要用cogroup了。
改进后的方案一代码:
- package com.wolf.bigdata.spark.core.req
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_Top10CateMethod2 {
- def main(args: Array[String]): Unit = {
-
- val sparConf = new SparkConf().setMaster("local[*]").setAppName("Top10CateMethod1")
- val sc = new SparkContext(sparConf)
-
- // 1.get raw data
- val actionRDD = sc.textFile("datas/user_visit_action.txt")
- actionRDD.cache()
- // 2.get click count (ID,click_count)
- val clickActionRDD = actionRDD.filter(
- action => {
- val datas = action.split("_") //each action is divided by "_"
- datas(6) != "-1"
- }
- )
- val clickCountRDD = clickActionRDD.map(
- action => {
- val datas = action.split(("_"))
- (datas(6), 1)
- }
- ).reduceByKey(_ + _)
- // 3.get order count (ID,order_count)
- val orderActionRDD = actionRDD.filter(
- action => {
- val datas = action.split("_") //each action is divided by "_"
- datas(8) != "null"
- }
- ) // orderid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
- val orderCountRDD = orderActionRDD.flatMap(
- action => {
- val datas = action.split("_")
- val cid = datas(8)
- val cids = cid.split(",")
- cids.map(id => (id, 1))
- }
- ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
-
- // 4. get pay count (ID,pay_count)
- val payActionRDD = actionRDD.filter(
- action => {
- val datas = action.split("_") //each action is divided by "_"
- datas(10) != "null"
- }
- ) // payid=>1,2,3 ,but we want [(1,1),(2,1),(3,1)] ,so flat it
- val payCountRDD = payActionRDD.flatMap(
- action => {
- val datas = action.split("_")
- val cid = datas(10)
- val cids = cid.split(",")
- cids.map(id => (id, 1))
- }
- ).reduceByKey(_ + _) // [(1,num1),(2,num2)..]
- // 5. sort , get top 10 ***
- // click_count > order_count > pay_count
- // tuple sort:firstly compare the first, and then compare the second...
- // from [(ID,click_count),(ID,order_count),(ID,pay_count)] to
- // (ID,(click_count,0,0)),(ID,(0,order_count,0)),(ID,(0,0,pay_count))
- // then reduce
- val rdd1 = clickCountRDD.map {
- case (cid, cnt) => {
- (cid, (cnt, 0, 0))
- }
- }
- val rdd2 = orderCountRDD.map {
- case (cid, cnt) => {
- (cid, (0, cnt, 0))
- }
- }
- val rdd3 = payCountRDD.map {
- case (cid, cnt) => {
- (cid, (0, 0, cnt))
- }
- }
- // union together and then reduce
- val sourceRDD = rdd1.union(rdd2).union(rdd3)
- // reduce
- val analysisRDD = sourceRDD.reduceByKey(
- (t1, t2) => {
- (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
- }
- )
- val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
-
- // 6. collect result ,and print in cmd
- resultRDD.foreach(println)
-
- }
- }

运行结果:
改进后仍存在的问题:存在大量的Shuffle操作(reduceByKey)
其实,我们可以改变思路,进而优化代码:
方案二的代码:
- package com.wolf.bigdata.spark.core.req
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object Spark01_Top10CateMethod3 {
- def main(args: Array[String]): Unit = {
-
- val sparConf = new SparkConf().setMaster("local[*]").setAppName("Top10CateMethod1")
- val sc = new SparkContext(sparConf)
-
- // 1.get raw data
- val actionRDD = sc.textFile("datas/user_visit_action.txt")
-
- // 2.convert data structure
- // if click: (ID,(1,0,0))
- // if order: (ID,(0,1,0))
- // if pay: (ID,(0,0,1))
- val flatRDD = actionRDD.flatMap(
- action => {
- val datas = action.split("_")
- if (datas(6) != "-1") {
- // click
- List((datas(6), (1, 0, 0)))
- }
- else if (datas(8) != "null") {
- // order
- val ids = datas(8).split(",")
- ids.map(id => (id, (0, 1, 0)))
- }
- else if (datas(10) != "null") {
- //pay
- val ids = datas(10).split(",")
- ids.map(id => (id, (0, 0, 1)))
- }
- else {
- Nil
- }
- }
- )
-
- // 3. reduce the same ID
- // (ID,(cnt1,cnt2,cnt3)
- val analysisRDD = flatRDD.reduceByKey(
- (t1, t2) => {
- (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
- }
- )
-
- // 4.sort descending,Top 10
- val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
-
- // 5. collect result ,and print in cmd
- resultRDD.foreach(println)
-
- }
- }

这样只需要一个reduceByKey,也即只需要一个Shuffle操作
运行结果
方案三用到了自定义累加器,以后再学习。
这里不做具体学习,只是写一点案例的代码。
在pom中引入如下依赖
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_2.12</artifactId>
- <version>3.0.0</version>
- </dependency>
- package com.wolf.bigdata.spark.core.req
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.{Row, DataFrame}
- import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
- import org.apache.spark.ml.recommendation.ALS
-
- object Test01 {
- def main(args: Array[String]): Unit = {
- // 创建SparkSession: 首先,创建一个SparkSession,这是与Spark进行交互的入口点。
- val spark = SparkSession.builder()
- .appName("MovieRecommendation")
- .master("local[*]") // 这里设置本地模式,[*] 表示使用所有 CPU 核心
- .getOrCreate()
- // 读取数据: 使用Spark DataFrame API读取用户-电影评分数据。
- val data = spark.read
- .option("header", "true")
- .option("inferSchema", "true")
- .csv("datas/capter5_2ml.csv")
- // 创建ALS模型: 配置ALS模型的参数
-
- val als = new ALS()
- .setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("movieId")
- .setRatingCol("rating")
- // 训练模型: 使用训练数据对ALS模型进行训练
- val model = als.fit(data)
- // 生成用户推荐: 使用recommendForUserSubset方法为指定用户生成电影推荐。
- // 请替换user_ids为你要生成推荐的用户ID列表
- val user_ids = Seq(1, 2, 3)
- // val user_df = user_ids.toDF("userId")
- val userRows = user_ids.map(id => Row(id))
- val userSchema = List(StructField("userId", IntegerType, nullable = false))
- val user_df = spark.createDataFrame(spark.sparkContext.parallelize(userRows), StructType(userSchema))
- val user_recommendations = model.recommendForUserSubset(user_df, 10)
- // 生成电影推荐: 使用recommendForItemSubset方法为指定电影生成用户推荐。
- // 请替换movie_ids为你要生成推荐的电影ID列表
- val movie_ids = Seq(1, 2, 3, 4, 5)
- // val movie_df = movie_ids.toDF("movieId")
- val movieRows = movie_ids.map(id => Row(id))
- val movieSchema = List(StructField("movieId", IntegerType, nullable = false))
- val movie_df = spark.createDataFrame(spark.sparkContext.parallelize(movieRows), StructType(movieSchema))
- val movie_recommendations = model.recommendForItemSubset(movie_df, 10)
- // 结果输出: 将生成的推荐结果输出,例如,打印到控制台。
- // 输出用户推荐
- // user_recommendations.show()
- user_recommendations.take(10).foreach(println)
- // 输出电影推荐
- // movie_recommendations.show()
- movie_recommendations.take(10).foreach(println)
-
- // 关闭SparkSession: 最后,确保关闭SparkSession以释放资源。
- spark.stop()
-
- }
-
- }

使用Spark MLlib中的使用ALS算法给每个用户推荐某个商品。
注意:对原始数据添加列名,方便后续操作
首先,创建一个SparkSession,这是与Spark进行交互的入口点。
- import org.apache.spark.sql.SparkSession
-
- val spark = SparkSession.builder()
- .appName("MovieRecommendation")
- .master("local[*]") // 这里设置本地模式,[*] 表示使用所有 CPU 核心
- .getOrCreate()
使用Spark DataFrame API读取用户-电影评分数据。
- val data = spark.read
- .option("header", "true")
- .option("inferSchema", "true")
- .csv("datas/capter5_2ml.csv")
- import org.apache.spark.ml.recommendation.ALS
-
- val als = new ALS()
- .setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("movieId")
- .setRatingCol("rating")
使用训练数据对ALS模型进行训练
val model = als.fit(data)
使用recommendForUserSubset方法为指定用户生成电影推荐。
- import org.apache.spark.sql.{Row, DataFrame}
- import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
-
- val user_ids = Seq(1, 2, 3)
- val userRows = user_ids.map(id => Row(id))
- val userSchema = List(StructField("userId", IntegerType, nullable = false))
- val user_df = spark.createDataFrame(spark.sparkContext.parallelize(userRows), StructType(userSchema))
- val user_recommendations = model.recommendForUserSubset(user_df, 10)
使用recommendForItemSubset方法为指定电影生成用户推荐。
- val movie_ids = Seq(1, 2, 3, 4, 5)
- val movieRows = movie_ids.map(id => Row(id))
- val movieSchema = List(StructField("movieId", IntegerType, nullable = false))
- val movie_df = spark.createDataFrame(spark.sparkContext.parallelize(movieRows), StructType(movieSchema))
- val movie_recommendations = model.recommendForItemSubset(movie_df, 10)
将生成的推荐结果输出,例如,打印到控制台。
- // 输出用户推荐
- // user_recommendations.show()
- user_recommendations.take(10).foreach(println)
- // 输出电影推荐
- // movie_recommendations.show()
- movie_recommendations.take(10).foreach(println)
spark.stop()
- package com.wolf.bigdata.spark.core.req
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.{Row, DataFrame}
- import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
- import org.apache.spark.ml.recommendation.ALS
-
- object Test01 {
- def main(args: Array[String]): Unit = {
- // 创建SparkSession: 首先,创建一个SparkSession,这是与Spark进行交互的入口点。
- val spark = SparkSession.builder()
- .appName("MovieRecommendation")
- .master("local[*]") // 这里设置本地模式,[*] 表示使用所有 CPU 核心
- .getOrCreate()
- // 读取数据: 使用Spark DataFrame API读取用户-电影评分数据。
- val data = spark.read
- .option("header", "true")
- .option("inferSchema", "true")
- .csv("datas/capter5_2ml.csv")
- // 创建ALS模型: 配置ALS模型的参数
-
- val als = new ALS()
- .setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("movieId")
- .setRatingCol("rating")
- // 训练模型: 使用训练数据对ALS模型进行训练
- val model = als.fit(data)
- // 生成用户推荐: 使用recommendForUserSubset方法为指定用户生成电影推荐。
- // 请替换user_ids为你要生成推荐的用户ID列表
- val user_ids = Seq(1, 2, 3)
- // val user_df = user_ids.toDF("userId")
- val userRows = user_ids.map(id => Row(id))
- val userSchema = List(StructField("userId", IntegerType, nullable = false))
- val user_df = spark.createDataFrame(spark.sparkContext.parallelize(userRows), StructType(userSchema))
- val user_recommendations = model.recommendForUserSubset(user_df, 10)
- // 生成电影推荐: 使用recommendForItemSubset方法为指定电影生成用户推荐。
- // 请替换movie_ids为你要生成推荐的电影ID列表
- val movie_ids = Seq(1, 2, 3, 4, 5)
- // val movie_df = movie_ids.toDF("movieId")
- val movieRows = movie_ids.map(id => Row(id))
- val movieSchema = List(StructField("movieId", IntegerType, nullable = false))
- val movie_df = spark.createDataFrame(spark.sparkContext.parallelize(movieRows), StructType(movieSchema))
- val movie_recommendations = model.recommendForItemSubset(movie_df, 10)
- // 结果输出: 将生成的推荐结果输出,例如,打印到控制台。
- // 输出用户推荐
- // user_recommendations.show()
- user_recommendations.take(10).foreach(println)
- // 输出电影推荐
- // movie_recommendations.show()
- movie_recommendations.take(10).foreach(println)
-
- // 关闭SparkSession: 最后,确保关闭SparkSession以释放资源。
- spark.stop()
-
- }
-
- }

pip install pyspark
or
pip install -i https://pypi.tuna/tsinghua.edu.cn/simple pyspark
Numpy
pip install numpy -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
使用Spark MLlib中的使用ALS算法给每个用户推荐某个商品。
注意:对原始数据添加列名,方便后续操作
- from pyspark.sql import SparkSession
- # 创建SparkSession
- spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
-
- # 读取数据
- data = spark.read.csv("capter5_2ml.csv", header=True, inferSchema=True)
- from pyspark.ml.recommendation import ALS
- als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(data)
- user_ids = [1]
- # 创建一个DataFrame来存储指定用户的ID
- user_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids])
- # 使用ALS模型生成前十名用户的电影推荐
- user_recommendations = model.recommendForUserSubset(user_df, 10)
- # 创建一个DataFrame包含前十名用户的ID
- user_ids_top10 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- top_users_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids_top10])
-
- # 使用ALS模型生成前十名用户的电影推荐
- top_users_recommendations = model.recommendForUserSubset(top_users_df, 10)
- from pyspark.sql import Row
- # 输出一组用户推荐
- for row in user_recommendations.collect():
- user_id = row.userId
- recommendations = [(r.movieId, r.rating) for r in row.recommendations]
- print(f"User {user_id} recommendations: {recommendations}")
-
- # 输出前十名用户的电影推荐
- for row in top_users_recommendations.collect():
- user_id = row.userId
- recommendations = [(r.movieId, r.rating) for r in row.recommendations]
- print(f"User {user_id} recommendations: {recommendations}")
spark.stop()
- from pyspark.sql import SparkSession
- from pyspark.sql import Row
- from pyspark.ml.recommendation import ALS
-
- # 创建SparkSession
- spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
-
- # 读取数据
- data = spark.read.csv("capter5_2ml.csv", header=True, inferSchema=True)
-
- # ① 设置迭代次数为5次,惩罚系数为0.01,得到评分的矩阵形式
- als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
-
- # ② 对模型进行拟合,训练出合适的模型
- model = als.fit(data)
-
- # ③ 为一组指定的用户生成十大电影推荐
- # 请替换user_ids为你要生成推荐的用户ID列表
- # user_ids = [1, 217, 381]
- user_ids = [1]
- # 创建一个DataFrame来存储用户ID
- user_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids])
-
- user_recommendations = model.recommendForUserSubset(user_df, 10)
-
- #
- # # 请替换movie_ids为你要生成推荐的电影ID列表
- # movie_ids = [1, 3, 6, 1023, 1024]
- #
- # # 创建一个DataFrame来存储电影ID
- # movie_df = spark.createDataFrame([Row(movieId=mid) for mid in movie_ids])
- #
- # movie_recommendations = model.recommendForItemSubset(movie_df, 10)
-
- # ④ 生成前十名用户推荐的一组指定的电影
- # 创建一个DataFrame包含前十名用户的ID
- user_ids_top10 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- top_users_df = spark.createDataFrame([Row(userId=uid) for uid in user_ids_top10])
-
- # 使用ALS模型生成前十名用户的电影推荐
- top_users_recommendations = model.recommendForUserSubset(top_users_df, 10)
-
- # ⑤ 对结果进行正确输出
- # 输出1Group用户推荐
- for row in user_recommendations.collect():
- user_id = row.userId
- recommendations = [(r.movieId, r.rating) for r in row.recommendations]
- print(f"User {user_id} recommendations: {recommendations}")
-
- # # 输出电影推荐
- # for row in movie_recommendations.collect():
- # movie_id = row.movieId
- # recommendations = [(r.userId, r.rating) for r in row.recommendations]
- # print(f"Movie {movie_id} recommendations: {recommendations}")
-
- # 输出前十名用户的电影推荐
- for row in top_users_recommendations.collect():
- user_id = row.userId
- recommendations = [(r.movieId, r.rating) for r in row.recommendations]
- print(f"User {user_id} recommendations: {recommendations}")
- # 关闭SparkSession
- spark.stop()

Q:为什么数据中的rating最高分只有五分,但是预测出的分数有很多都超过了五分?
A(GPT生成):
在协同过滤推荐系统中,ALS(交替最小二乘法)模型的预测评分可以超过原始数据中的评分范围。这是因为ALS模型是一种基于矩阵分解的模型,它试图学习用户和物品的潜在特征向量,以最小化预测评分与实际评分之间的均方误差。这意味着模型可以生成比原始评分范围更广的预测值。
在你的情况下,原始数据的评分范围为1到5分,但ALS模型的预测评分可以超过这个范围。这并不一定是一个问题,因为ALS模型的目标是提供一组推荐,而不仅仅是复制现有的评分。模型可能会认为某些用户对某些电影的兴趣更高,因此给出了更高的预测评分。
通常,当使用ALS模型时,我们关心的是生成有序的推荐列表,而不是特定的评分值。你可以根据预测评分的高低来排序推荐列表,并向用户推荐最高评分的物品。当然,你可以选择对生成的预测评分进行某种标准化,以确保它们在1到5分的范围内,但这通常不是必要的。
如果PySpark没有自动检测到JAVA_HOME:
- import os
-
- os.environ['JAVA_HOME'] = ""
下载、解压
安装依赖
yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel gcc make
- ./configure --prefix=/opt/module/Python-3.9.18
- make
- make install
- // 建立软链接
- sudo ln -s /opt/module/Python-3.9.18/bin/python3.9 /usr/bin/python3
- sudo ln -s /opt/module/Python-3.9.18/bin/pip3.9 /usr/bin/pip3
-
- // 配置环境变量
- vi ~/.bash_profile
-
- #配置python
- export PYTHON_HOME=/opt/module/Python-3.9.18
- export PATH=$PYTHON_HOME/bin:$PATH
- // 激活环境变量
- source ~/.bash_profile
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。