当前位置:   article > 正文

大数据之Spark(七)--- Spark机器学习,朴素贝叶斯,酒水评估和分类案例学习,垃圾邮件过滤学习案例,电商商品推荐,电影推荐学习案例_在分布式系统中使用spark框架中实现朴素贝叶斯-垃圾邮件过滤算法

在分布式系统中使用spark框架中实现朴素贝叶斯-垃圾邮件过滤算法
一、Saprk机器学习介绍
------------------------------------------------------------------
    1.监督学习
        a.有训练数据集,符合规范的数据
        b.根据数据集,产生一个推断函数
        c.根据产生的推断函数,处理新的数据,从而产生预测结果
        d.常见的监督学习:电子邮件的垃圾分类;按照内容标记网页;声音识别等
        e.常见监督学习的算法
            神经网络
            SVM
            贝叶斯分类器等

    2.非监督学习
        a.没有训练数据
        b.分析可用数据,找出数据的模式和趋势,从而将数据聚类,分组
        c.常用的手段
            kmean
            自我组织的map
            层次聚类等

    3.推荐
        协同过滤,基于之前的购买点击和分级行为提供最接近的推荐:猜你喜欢,推荐你认识的人


二、朴素贝叶斯算法
-----------------------------------------------
    1.公式
        P(B|A) = P(A|B) * P(B) / P(A)

    2.解释
        A:事件A
        B:事件B
        P(B|A):A事件发生时,B事件发生的概率,依赖事件
        P(A|B):B事件发生时,A事件发生的概率,依赖事件
        P(B):B事件发生的概率,独立事件
        P(A):A事件发生的概率,独立事件


三、Spark机器学习库
----------------------------------------------------
    [Estimator]
        评估器:操作在数据框DataFrame上的算法
        运行在包含了feature和label(结果)的dataFrame之上,对数据进行训练创建model模型。
        该模型用于以后的预测。

    [Transformer]
        数据框转换器
        将包含feature的Dataframe变换成了包含了预测的dataframe.
        由Estimator创建的model就是Transformer。

    [Parameter]
        Estimator和Transformer使用的数据,通常和机器学习的算法相关。
        Spark API给出了一致性API针对算法。

    [Pipeline]
        将Estimators和Transformers组合在一起,形成机器学习工作流.


四、酒水评估[线性回归算法]和分类[逻辑回归算法]案例学习
----------------------------------------------------------
    1.数据集下载地址:http://archive.ics.uci.edu/ml/datasets/Wine+Quality

    2.线性回归实现酒水评估 ---scala版
        
  1. import org.apache.spark.ml.classification.LogisticRegression
  2. import org.apache.spark.ml.param.ParamMap
  3. import org.apache.spark.ml.linalg.{Vector, Vectors}
  4. import org.apache.spark.ml.regression.LinearRegressionModel
  5. import org.apache.spark.sql.{Row, SparkSession}
  6. //线性回归实现酒水等级评估0-10
  7. object SparkScalaML1 {
  8. def main(args: Array[String]): Unit = {
  9. val sess = SparkSession.builder()
  10. .appName("ml")
  11. .master("local[4]")
  12. .getOrCreate();
  13. val sc = sess.sparkContext;
  14. //数据目录
  15. val dataDir = "file:///D:/share/spark/ml/winequality-white.csv"
  16. //定义样例类
  17. case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
  18. CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
  19. FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double, PH:
  20. Double, Sulphates: Double, Alcohol: Double, Quality: Double)
  21. //变换
  22. val wineDataRDD = sc.textFile(dataDir)
  23. .map(_.split(";"))
  24. .map(
  25. w => Wine(w(0).toDouble, w(1).toDouble,
  26. w(2).toDouble, w(3).toDouble, w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble
  27. , w(9).toDouble, w(10).toDouble, w(11).toDouble)
  28. )
  29. //转换RDD成DataFrame
  30. import sess.implicits._
  31. val trainingDF = wineDataRDD.map(w => (w.Quality,
  32. Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
  33. w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
  34. w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
  35. trainingDF.show()
  36. println("=============================================")
  37. //创建线性回归对象
  38. val lr = new LinearRegression()
  39. //设置最大迭代次数
  40. //lr.setMaxIter(10).setRegParam(0.01)
  41. lr.setMaxIter(10)
  42. //通过线性回归,拟合训练数据,生成model
  43. val model = lr.fit(trainingDF)
  44. //创建测试Dataframe
  45. // val testDF = sess.createDataFrame(Seq((1.0,Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
  46. // (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
  47. // (0.0,Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
  48. // (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
  49. // ).toDF("label", "features")
  50. val testDF = sess.createDataFrame(Seq((5.0000, Vectors.dense(7.4,
  51. 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68,9.8)),(5.00000,
  52. Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56,
  53. 9.4)),(7.00000, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
  54. 3.36, 0.57, 9.5)))).toDF("label", "features")
  55. //显式测试数据
  56. testDF.show();
  57. println("========================")
  58. //预测测试数据(带标签),评测模型的质量。
  59. testDF.createOrReplaceTempView("test")
  60. val tested = model.transform(testDF).select("features", "label", "prediction")
  61. tested.show();
  62. println("========================")
  63. //预测无标签的测试数据。
  64. val predictDF = sess.sql("SELECT features FROM test")
  65. //预测结果
  66. val predicted = model.transform(predictDF).select("features", "prediction")
  67. predicted.show();
  68. }
  69. }
    3.Java代码实现 =====》 TODO

    4.Wine案例机器学习步骤分析
        a.读取训练数据,生成训练数据框
        b.创建LinearRegression线性回归对象
        c.通过训练数据拟合出模型,完成评估管线
        d.读取带评级结果的测试数据,生成测试数据框,用于测试[测试数据要包含正确的评级结果,以便于校验模型是否完美预测]
        e.使用模型对测试数据进行变换,产生新的数据框,抽取特征,完成预测评级,输出预测评级结果
        f.使用OK的模型,对生产数据进行变换,完成生产数据的评级

    5.逻辑回归实现酒水分类
        
  1. import org.apache.spark.ml.classification.LogisticRegression
  2. import org.apache.spark.ml.linalg.Vectors
  3. import org.apache.spark.ml.regression.LinearRegression
  4. import org.apache.spark.sql.SparkSession
  5. /**
  6. * 逻辑回归实现酒水分类 --好/坏 0/1
  7. */
  8. object SparkML2 {
  9. def main(args: Array[String]): Unit = {
  10. val sess = SparkSession.builder()
  11. .appName("ml")
  12. .master("local[4]")
  13. .getOrCreate();
  14. val sc = sess.sparkContext;
  15. //数据目录
  16. val dataDir = "file:///D:/share/spark/ml/winequality-white.csv"
  17. //定义样例类
  18. case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
  19. CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
  20. FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double, PH:
  21. Double, Sulphates: Double, Alcohol: Double, Quality: Double)
  22. //变换
  23. val wineDataRDD = sc.textFile(dataDir)
  24. .map(_.split(";"))
  25. .map(
  26. w => Wine(w(0).toDouble, w(1).toDouble,
  27. w(2).toDouble, w(3).toDouble, w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble
  28. , w(9).toDouble, w(10).toDouble, w(11).toDouble)
  29. )
  30. //转换RDD成DataFrame
  31. import sess.implicits._
  32. val trainingDF = wineDataRDD.map(w => (if(w.Quality < 7) 0D else
  33. 1D, Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
  34. w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
  35. w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
  36. trainingDF.show()
  37. println("=============================================")
  38. //创建线性回归对象
  39. val lr = new LogisticRegression()
  40. //设置最大迭代次数
  41. //lr.setMaxIter(10).setRegParam(0.01)
  42. lr.setMaxIter(10).setRegParam(0.01)
  43. //通过线性回归,拟合训练数据,生成model
  44. val model = lr.fit(trainingDF)
  45. //val loadmodel = LinearRegressionModel.load("file:///d:/share/spark/model");
  46. //model.save("file:///d:/share/spark/model")
  47. //创建测试Dataframe
  48. // val testDF = sess.createDataFrame(Seq((1.0,Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
  49. // (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
  50. // (0.0,Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
  51. // (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
  52. // ).toDF("label", "features")
  53. val testDF = sess.createDataFrame(Seq((5.0000, Vectors.dense(7.4,
  54. 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8)), (5.00000,
  55. Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56,
  56. 9.4)), (7.00000, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
  57. 3.36, 0.57, 9.5)))).toDF("label", "features")
  58. //显式测试数据
  59. testDF.show();
  60. println("========================")
  61. //预测测试数据(带标签),评测模型的质量。
  62. testDF.createOrReplaceTempView("test")
  63. val tested = model.transform(testDF).select("features", "label", "prediction")
  64. tested.show();
  65. println("========================")
  66. //预测无标签的测试数据。
  67. val predictDF = sess.sql("SELECT features FROM test")
  68. //预测结果
  69. val predicted = model.transform(predictDF).select("features", "prediction")
  70. predicted.show();
  71. }
  72. }
    6.Java代码实现 =====》 TODO


五、模型的保存和加载
------------------------------------------------------
    1.模型持久化Save
        //创建线性回归对象
        val lr = new LogisticRegression()
        //设置最大迭代次数
        //lr.setMaxIter(10).setRegParam(0.01)
        lr.setMaxIter(10)
        //通过线性回归,拟合训练数据,生成model
        val model = lr.fit(trainingDF)
        model.save("file:///d:/share/spark/model")

    2.模型的加载
        val loadmodel = LinearRegressionModel.load("file:///d:/share/spark/model");


六、垃圾邮件过滤[Hash词频 + 逻辑回归]
------------------------------------------------------
    
  1. import org.apache.spark.ml.classification.LogisticRegression
  2. import org.apache.spark.ml.param.ParamMap
  3. import org.apache.spark.ml.linalg.{Vector, Vectors}
  4. import org.apache.spark.sql.{Row, SparkSession}
  5. import org.apache.spark.ml.Pipeline
  6. import org.apache.spark.ml.feature.{HashingTF, RegexTokenizer, StopWordsRemover, Tokenizer, Word2Vec}
  7. /**
  8. * 垃圾邮件过滤
  9. */
  10. object SparkMLSpamFilter {
  11. def main(args: Array[String]): Unit = {
  12. val spark = SparkSession.builder()
  13. .appName("ml")
  14. .master("local[4]")
  15. .getOrCreate();
  16. val sc = spark.sparkContext;
  17. //训练数据
  18. val training = spark.createDataFrame(Seq(
  19. ("you@example.com", "hope you are well", 0.0),
  20. ("raj@example.com", "nice to hear from you", 0.0),
  21. ("thomas@example.com", "happy holidays", 0.0),
  22. ("mark@example.com", "see you tomorrow", 0.0),
  23. ("xyz@example.com", "save money", 1.0),
  24. ("top10@example.com", "low interest rate", 1.0),
  25. ("marketing@example.com", "cheap loan", 1.0))
  26. ).toDF("email", "message", "label")
  27. training.show();
  28. //分词器:将输入的数据转成小写,然后按照若干空格分割开来,输出新的列
  29. val tokenizer = new Tokenizer().setInputCol("message").setOutputCol("words")
  30. //hash词频 --- 设置桶数,设置输入列,设置输出列
  31. val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol("words").setOutputCol("features")
  32. //逻辑回归对象 --- 迭代10次,参数0.01
  33. val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
  34. //新建管线
  35. val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr));
  36. //管线拟合数据,产生模型
  37. val model = pipeline.fit(training)
  38. //测试数据
  39. val test = spark.createDataFrame(Seq(
  40. ("you@example.com", "how are you"),
  41. ("jain@example.com", "hope doing well"),
  42. ("caren@example.com", "want some money"),
  43. ("zhou@example.com", "secure loan"),
  44. ("ted@example.com", "need loan"))
  45. ).toDF("email", "message")
  46. test.show()
  47. //测试数据的结果
  48. val prediction = model.transform(test).select("email", "message", "prediction")
  49. //展示测试结果
  50. prediction.show()
  51. println("==================================")
  52. //分词,输出words
  53. val wordsDF = tokenizer.transform(training)
  54. wordsDF.show();
  55. println("==================================")
  56. val featurizedDF = hashingTF.transform(wordsDF)
  57. featurizedDF.show()
  58. println("==================================")
  59. featurizedDF.createOrReplaceTempView("featurized")
  60. val selectedFeaturizedFieldstDF = spark.sql("SELECT words, features FROM featurized")
  61. selectedFeaturizedFieldstDF.show()
  62. }
  63. }
七、推荐[最小二乘法]
-------------------------------------------------------
    1.最小二乘法ALS
        点距离求和的最小值

    2.训练数据[test.data]
        1,0,1.0
        1,1,2.0
        1,2,5.0
        1,3,5.0
        1,4,5.0
        2,0,1.0
        2,1,2.0
        2,2,5.0
        2,5,5.0
        2,6,4.5
        3,1,2.5
        3,2,5.0
        3,3,4.0
        3,4,3.0
        4,0,5.0
        4,1,5.0
        4,2,5.0
        4,3,0.0

    3.scala实现
        
  1. package test.spark.examples.mllib
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. // $example on$
  4. import org.apache.spark.mllib.recommendation.ALS
  5. import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
  6. import org.apache.spark.mllib.recommendation.Rating
  7. /**
  8. * 推荐
  9. */
  10. object RecomendationDemo {
  11. def main(args: Array[String]): Unit = {
  12. val conf = new SparkConf().setAppName("CollaborativeFilteringExample").setMaster("local[*]")
  13. val sc = new SparkContext(conf)
  14. // $example on$
  15. // 加载和解析数据
  16. val data = sc.textFile("file:///D:/share/spark/ml/data/mllib/als/test.data")
  17. //将数据转换成Rating评分对象
  18. val ratings = data.map(
  19. _.split(',') match
  20. {
  21. case Array(user, item, rate) =>
  22. Rating(user.toInt, item.toInt, rate.toDouble)
  23. }
  24. )
  25. println("========== ratings:原始数据集 ===============")
  26. ratings.collect().foreach(println);
  27. // 使用最小二乘法,构建模型
  28. val rank = 10
  29. val numIterations = 10
  30. val model = ALS.train(ratings, rank, numIterations, 0.01)
  31. //通过测试数据,测试模型的准确性
  32. //准备测试数据,去掉评分
  33. val usersProducts = ratings.map { case Rating(user, product, rate) =>
  34. (user, product)
  35. }
  36. println("========== usersProducts:测试数据 ===============")
  37. usersProducts.collect().foreach(println)
  38. //对usersProducts进行预测,产生rate
  39. val predictions =
  40. model.predict(usersProducts).map { case Rating(user, product, rate) =>
  41. ((user, product), rate)
  42. }
  43. println("========== predictions:预测结果 ===============")
  44. predictions.collect().foreach(println)
  45. //对比
  46. val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  47. ((user, product), rate)
  48. }.join(predictions)
  49. println("========== ratings + predictions:对比真实和预测结果 ===============")
  50. ratesAndPreds.collect().foreach(println)
  51. //给2号用户推荐5款商品
  52. val res = model.recommendProducts(2,5);
  53. println("========== res:给2号客户的推荐结果 ===============")
  54. res.foreach(println)
  55. //计算误差
  56. val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  57. val err = (r1 - r2)
  58. err * err
  59. }.mean()
  60. println("Mean Squared Error = " + MSE)
  61. // 保存和加载模型的方式
  62. model.save(sc, "target/tmp/myCollaborativeFilter")
  63. val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
  64. // $example off$
  65. }
  66. }
    4.java实现
        
  1. package test.spark.examples.mllib;
  2. import scala.Tuple2;
  3. import org.apache.spark.api.java.*;
  4. import org.apache.spark.api.java.function.Function;
  5. import org.apache.spark.mllib.recommendation.ALS;
  6. import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
  7. import org.apache.spark.mllib.recommendation.Rating;
  8. import org.apache.spark.SparkConf;
  9. public class RecomendationDemoJava {
  10. public static void main(String[] args) {
  11. // $example on$
  12. SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example").setMaster("local[*]");
  13. JavaSparkContext jsc = new JavaSparkContext(conf);
  14. //加载和解析数据
  15. String path = "file:///D:/share/spark/ml/data/mllib/als/test.data";
  16. JavaRDD<String> data = jsc.textFile(path);
  17. JavaRDD<Rating> ratings = data.map(
  18. new Function<String, Rating>() {
  19. public Rating call(String s) {
  20. String[] sarray = s.split(",");
  21. return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
  22. Double.parseDouble(sarray[2]));
  23. }
  24. }
  25. );
  26. System.out.println("========== ratings:原始数据集 ===============");
  27. ratings.collect().forEach( x -> System.out.println(x));
  28. //使用最小二乘法构建模型
  29. int rank = 10;
  30. int numIterations = 10;
  31. MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
  32. //通过测试数据,测试模型的准确性
  33. //准备测试数据,去掉评分
  34. JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
  35. new Function<Rating, Tuple2<Object, Object>>() {
  36. public Tuple2<Object, Object> call(Rating r) {
  37. return new Tuple2<Object, Object>(r.user(), r.product());
  38. }
  39. }
  40. );
  41. System.out.println(("========== usersProducts:测试数据 ==============="));
  42. userProducts.collect().forEach(x -> System.out.println(x._1 + ":" + x._2));
  43. //对usersProducts进行预测,产生rate
  44. JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
  45. model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
  46. new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
  47. public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
  48. return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
  49. }
  50. }
  51. ));
  52. System.out.println(("========== predictions:预测结果 ==============="));
  53. predictions.collect().forEach(x-> System.out.println(x._1 + ":" + x._2));
  54. System.out.println(("========== 给2号客户推荐5款商品 ==============="));
  55. Rating[] ratings1 = model.recommendProducts(2, 5);
  56. for(Rating r : ratings1)
  57. {
  58. System.out.println(r.user() + ":" + r.product() + ":" + r.rating());
  59. }
  60. JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
  61. JavaPairRDD.fromJavaRDD(ratings.map(
  62. new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
  63. public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
  64. return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
  65. }
  66. }
  67. )).join(predictions).values();
  68. System.out.println(("========== ratings + predictions:对比真实和预测结果 ==============="));
  69. ratesAndPreds.collect().forEach(x -> System.out.println(x._1 + ":" + x._2));
  70. //计算误差
  71. double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
  72. new Function<Tuple2<Double, Double>, Object>() {
  73. public Object call(Tuple2<Double, Double> pair) {
  74. Double err = pair._1() - pair._2();
  75. return err * err;
  76. }
  77. }
  78. ).rdd()).mean();
  79. System.out.println("Mean Squared Error = " + MSE);
  80. // 模型的保存和加载
  81. //model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
  82. //MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
  83. //"target/tmp/myCollaborativeFilter");
  84. // $example off$
  85. jsc.stop();
  86. }
  87. }
    5.猜你喜欢--推荐
        /*******向用户推荐n款商品********/
        val res = model.recommendProducts(5,8);

        /*******将指定的商品推荐给n个用户********/
        val res = model.recommendUsers(3,5)

        /*******向所有用户推荐3种商品********/
        val res = model.recommendProductsForUsers(3)


八、电影推荐案例
---------------------------------------------------------------
    1.元数据
        0::2::3::1424380312
        0::3::1::1424380312
        0::5::2::1424380312
        0::9::4::1424380312
        0::11::1::1424380312
        0::12::2::1424380312
        1::15::1::1424380312
        2::17::1::1424380312
        2::19::1::1424380312
        ...

    2.scala实现
        
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.ml.evaluation.RegressionEvaluator
  3. import org.apache.spark.ml.recommendation.ALS
  4. import org.apache.spark.sql.SparkSession
  5. object MovieRecc {
  6. //定义评级样例类【注意:样例类千万不要定义到main函数内部,会解析失败】
  7. case class Rating0(userId: Int, movieId: Int, rating: Float, timestamp: Long)
  8. def main(args: Array[String]): Unit = {
  9. val conf = new SparkConf();
  10. conf.setAppName("movieRecomm");
  11. conf.setMaster("local[4]")
  12. val spark = SparkSession.builder().config(conf).getOrCreate() ;
  13. import spark.implicits._
  14. //解析评级
  15. def parseRating(str: String): Rating0 = {
  16. val fields = str.split("::")
  17. assert(fields.size == 4)
  18. Rating0(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  19. }
  20. //转换成Rating的DF对象
  21. var ratings = spark.sparkContext.textFile("file:///D:\\share\\spark\\ml\\data\\mllib\\als\\sample_movielens_ratings.txt");
  22. val ratings0 = ratings.map(parseRating)
  23. val df = ratings0.toDF()
  24. println("========= df:数据源 ==========================")
  25. df.collect().foreach(println)
  26. //随机切割训练数据,生成两个一个数组,第一个元素是training,第二个是test
  27. val Array(training, test) = df.randomSplit(Array(0.99, 0.01))
  28. //构建ALS推荐算法并设置参数
  29. val als = new ALS().setMaxIter(5)
  30. .setRegParam(0.01)
  31. .setUserCol("userId")
  32. .setItemCol("movieId")
  33. .setRatingCol("rating")
  34. //通过als对象对训练数据进行拟合,生成推荐模型
  35. val model = als.fit(training)
  36. //使用模型对test,进行结果预测
  37. val predictions = model.transform(test);
  38. println("========= predictions 预测结果 ==========================")
  39. predictions.collect().foreach(println)
  40. }
  41. }
    3.java实现
        













  1. package mllib;
  2. import java.io.Serializable;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.SparkContext;
  5. import org.apache.spark.api.java.JavaRDD;
  6. import org.apache.spark.ml.evaluation.RegressionEvaluator;
  7. import org.apache.spark.ml.recommendation.ALS;
  8. import org.apache.spark.ml.recommendation.ALSModel;
  9. import org.apache.spark.sql.Dataset;
  10. import org.apache.spark.sql.Row;
  11. import org.apache.spark.sql.SparkSession;
  12. public class MovieReccJava {
  13. public static void main(String[] args) {
  14. SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("movie");
  15. SparkContext sc = new SparkContext(conf);
  16. SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  17. JavaRDD<Rating0> ratingsRDD = spark
  18. .read().textFile("file:///D:\\share\\spark\\ml\\data\\mllib\\als\\sample_movielens_ratings.txt").javaRDD()
  19. .map(Rating0::parseRating);
  20. Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating0.class);
  21. Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.99, 0.01});
  22. Dataset<Row> training = splits[0];
  23. Dataset<Row> test = splits[1];
  24. System.out.println(("========= trains 训练数据 =========================="));
  25. training.show();
  26. System.out.println(("========= test 测试数据 =========================="));
  27. test.show();
  28. // Build the recommendation model using ALS on the training data
  29. ALS als = new ALS()
  30. .setMaxIter(5)
  31. .setRegParam(0.01)
  32. .setUserCol("userId")
  33. .setItemCol("movieId")
  34. .setRatingCol("rating");
  35. ALSModel model = als.fit(training);
  36. Dataset<Row> predictions = model.transform(test);
  37. System.out.println(("========= predictions 预测结果 =========================="));
  38. predictions.show();
  39. }
  40. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/932737
推荐阅读
相关标签
  

闽ICP备14008679号