赞
踩
一、Saprk机器学习介绍 ------------------------------------------------------------------ 1.监督学习 a.有训练数据集,符合规范的数据 b.根据数据集,产生一个推断函数 c.根据产生的推断函数,处理新的数据,从而产生预测结果 d.常见的监督学习:电子邮件的垃圾分类;按照内容标记网页;声音识别等 e.常见监督学习的算法 神经网络 SVM 贝叶斯分类器等 2.非监督学习 a.没有训练数据 b.分析可用数据,找出数据的模式和趋势,从而将数据聚类,分组 c.常用的手段 kmean 自我组织的map 层次聚类等 3.推荐 协同过滤,基于之前的购买点击和分级行为提供最接近的推荐:猜你喜欢,推荐你认识的人 二、朴素贝叶斯算法 ----------------------------------------------- 1.公式 P(B|A) = P(A|B) * P(B) / P(A) 2.解释 A:事件A B:事件B P(B|A):A事件发生时,B事件发生的概率,依赖事件 P(A|B):B事件发生时,A事件发生的概率,依赖事件 P(B):B事件发生的概率,独立事件 P(A):A事件发生的概率,独立事件 三、Spark机器学习库 ---------------------------------------------------- [Estimator] 评估器:操作在数据框DataFrame上的算法 运行在包含了feature和label(结果)的dataFrame之上,对数据进行训练创建model模型。 该模型用于以后的预测。 [Transformer] 数据框转换器 将包含feature的Dataframe变换成了包含了预测的dataframe. 由Estimator创建的model就是Transformer。 [Parameter] Estimator和Transformer使用的数据,通常和机器学习的算法相关。 Spark API给出了一致性API针对算法。 [Pipeline] 将Estimators和Transformers组合在一起,形成机器学习工作流. 四、酒水评估[线性回归算法]和分类[逻辑回归算法]案例学习 ---------------------------------------------------------- 1.数据集下载地址:http://archive.ics.uci.edu/ml/datasets/Wine+Quality 2.线性回归实现酒水评估 ---scala版
- import org.apache.spark.ml.classification.LogisticRegression
- import org.apache.spark.ml.param.ParamMap
- import org.apache.spark.ml.linalg.{Vector, Vectors}
- import org.apache.spark.ml.regression.LinearRegressionModel
- import org.apache.spark.sql.{Row, SparkSession}
-
- //线性回归实现酒水等级评估0-10
- object SparkScalaML1 {
-
- def main(args: Array[String]): Unit = {
- val sess = SparkSession.builder()
- .appName("ml")
- .master("local[4]")
- .getOrCreate();
-
- val sc = sess.sparkContext;
- //数据目录
- val dataDir = "file:///D:/share/spark/ml/winequality-white.csv"
- //定义样例类
- case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
- CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
- FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double, PH:
- Double, Sulphates: Double, Alcohol: Double, Quality: Double)
-
- //变换
- val wineDataRDD = sc.textFile(dataDir)
- .map(_.split(";"))
- .map(
- w => Wine(w(0).toDouble, w(1).toDouble,
- w(2).toDouble, w(3).toDouble, w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble
- , w(9).toDouble, w(10).toDouble, w(11).toDouble)
- )
-
- //转换RDD成DataFrame
- import sess.implicits._
- val trainingDF = wineDataRDD.map(w => (w.Quality,
- Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
- w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
- w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
-
- trainingDF.show()
- println("=============================================")
-
- //创建线性回归对象
- val lr = new LinearRegression()
- //设置最大迭代次数
- //lr.setMaxIter(10).setRegParam(0.01)
- lr.setMaxIter(10)
- //通过线性回归,拟合训练数据,生成model
- val model = lr.fit(trainingDF)
-
- //创建测试Dataframe
- // val testDF = sess.createDataFrame(Seq((1.0,Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
- // (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
- // (0.0,Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
- // (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
- // ).toDF("label", "features")
-
- val testDF = sess.createDataFrame(Seq((5.0000, Vectors.dense(7.4,
- 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68,9.8)),(5.00000,
- Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56,
- 9.4)),(7.00000, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
- 3.36, 0.57, 9.5)))).toDF("label", "features")
-
- //显式测试数据
- testDF.show();
- println("========================")
- //预测测试数据(带标签),评测模型的质量。
- testDF.createOrReplaceTempView("test")
- val tested = model.transform(testDF).select("features", "label", "prediction")
- tested.show();
-
- println("========================")
- //预测无标签的测试数据。
- val predictDF = sess.sql("SELECT features FROM test")
- //预测结果
- val predicted = model.transform(predictDF).select("features", "prediction")
- predicted.show();
- }
- }
3.Java代码实现 =====》 TODO 4.Wine案例机器学习步骤分析 a.读取训练数据,生成训练数据框 b.创建LinearRegression线性回归对象 c.通过训练数据拟合出模型,完成评估管线 d.读取带评级结果的测试数据,生成测试数据框,用于测试[测试数据要包含正确的评级结果,以便于校验模型是否完美预测] e.使用模型对测试数据进行变换,产生新的数据框,抽取特征,完成预测评级,输出预测评级结果 f.使用OK的模型,对生产数据进行变换,完成生产数据的评级 5.逻辑回归实现酒水分类
- import org.apache.spark.ml.classification.LogisticRegression
- import org.apache.spark.ml.linalg.Vectors
- import org.apache.spark.ml.regression.LinearRegression
- import org.apache.spark.sql.SparkSession
-
-
- /**
- * 逻辑回归实现酒水分类 --好/坏 0/1
- */
- object SparkML2 {
-
- def main(args: Array[String]): Unit = {
- val sess = SparkSession.builder()
- .appName("ml")
- .master("local[4]")
- .getOrCreate();
-
- val sc = sess.sparkContext;
- //数据目录
- val dataDir = "file:///D:/share/spark/ml/winequality-white.csv"
- //定义样例类
- case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
- CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
- FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double, PH:
- Double, Sulphates: Double, Alcohol: Double, Quality: Double)
-
- //变换
- val wineDataRDD = sc.textFile(dataDir)
- .map(_.split(";"))
- .map(
- w => Wine(w(0).toDouble, w(1).toDouble,
- w(2).toDouble, w(3).toDouble, w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble
- , w(9).toDouble, w(10).toDouble, w(11).toDouble)
- )
-
- //转换RDD成DataFrame
- import sess.implicits._
- val trainingDF = wineDataRDD.map(w => (if(w.Quality < 7) 0D else
- 1D, Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
- w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
- w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
-
- trainingDF.show()
- println("=============================================")
-
- //创建线性回归对象
- val lr = new LogisticRegression()
- //设置最大迭代次数
- //lr.setMaxIter(10).setRegParam(0.01)
- lr.setMaxIter(10).setRegParam(0.01)
- //通过线性回归,拟合训练数据,生成model
- val model = lr.fit(trainingDF)
-
- //val loadmodel = LinearRegressionModel.load("file:///d:/share/spark/model");
-
- //model.save("file:///d:/share/spark/model")
-
- //创建测试Dataframe
- // val testDF = sess.createDataFrame(Seq((1.0,Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
- // (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
- // (0.0,Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
- // (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
- // ).toDF("label", "features")
-
- val testDF = sess.createDataFrame(Seq((5.0000, Vectors.dense(7.4,
- 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8)), (5.00000,
- Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56,
- 9.4)), (7.00000, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
- 3.36, 0.57, 9.5)))).toDF("label", "features")
-
- //显式测试数据
- testDF.show();
- println("========================")
- //预测测试数据(带标签),评测模型的质量。
- testDF.createOrReplaceTempView("test")
- val tested = model.transform(testDF).select("features", "label", "prediction")
- tested.show();
-
- println("========================")
- //预测无标签的测试数据。
- val predictDF = sess.sql("SELECT features FROM test")
- //预测结果
- val predicted = model.transform(predictDF).select("features", "prediction")
- predicted.show();
- }
-
- }
6.Java代码实现 =====》 TODO 五、模型的保存和加载 ------------------------------------------------------ 1.模型持久化Save //创建线性回归对象 val lr = new LogisticRegression() //设置最大迭代次数 //lr.setMaxIter(10).setRegParam(0.01) lr.setMaxIter(10) //通过线性回归,拟合训练数据,生成model val model = lr.fit(trainingDF) model.save("file:///d:/share/spark/model") 2.模型的加载 val loadmodel = LinearRegressionModel.load("file:///d:/share/spark/model"); 六、垃圾邮件过滤[Hash词频 + 逻辑回归] ------------------------------------------------------
- import org.apache.spark.ml.classification.LogisticRegression
- import org.apache.spark.ml.param.ParamMap
- import org.apache.spark.ml.linalg.{Vector, Vectors}
- import org.apache.spark.sql.{Row, SparkSession}
- import org.apache.spark.ml.Pipeline
- import org.apache.spark.ml.feature.{HashingTF, RegexTokenizer, StopWordsRemover, Tokenizer, Word2Vec}
- /**
- * 垃圾邮件过滤
- */
- object SparkMLSpamFilter {
-
- def main(args: Array[String]): Unit = {
-
- val spark = SparkSession.builder()
- .appName("ml")
- .master("local[4]")
- .getOrCreate();
-
- val sc = spark.sparkContext;
-
- //训练数据
- val training = spark.createDataFrame(Seq(
- ("you@example.com", "hope you are well", 0.0),
- ("raj@example.com", "nice to hear from you", 0.0),
- ("thomas@example.com", "happy holidays", 0.0),
- ("mark@example.com", "see you tomorrow", 0.0),
- ("xyz@example.com", "save money", 1.0),
- ("top10@example.com", "low interest rate", 1.0),
- ("marketing@example.com", "cheap loan", 1.0))
- ).toDF("email", "message", "label")
-
- training.show();
-
- //分词器:将输入的数据转成小写,然后按照若干空格分割开来,输出新的列
- val tokenizer = new Tokenizer().setInputCol("message").setOutputCol("words")
-
- //hash词频 --- 设置桶数,设置输入列,设置输出列
- val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol("words").setOutputCol("features")
-
- //逻辑回归对象 --- 迭代10次,参数0.01
- val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
-
- //新建管线
- val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr));
-
- //管线拟合数据,产生模型
- val model = pipeline.fit(training)
-
- //测试数据
- val test = spark.createDataFrame(Seq(
- ("you@example.com", "how are you"),
- ("jain@example.com", "hope doing well"),
- ("caren@example.com", "want some money"),
- ("zhou@example.com", "secure loan"),
- ("ted@example.com", "need loan"))
- ).toDF("email", "message")
-
- test.show()
-
- //测试数据的结果
- val prediction = model.transform(test).select("email", "message", "prediction")
- //展示测试结果
- prediction.show()
-
- println("==================================")
- //分词,输出words
- val wordsDF = tokenizer.transform(training)
- wordsDF.show();
- println("==================================")
-
- val featurizedDF = hashingTF.transform(wordsDF)
- featurizedDF.show()
- println("==================================")
-
- featurizedDF.createOrReplaceTempView("featurized")
- val selectedFeaturizedFieldstDF = spark.sql("SELECT words, features FROM featurized")
- selectedFeaturizedFieldstDF.show()
- }
- }
七、推荐[最小二乘法] ------------------------------------------------------- 1.最小二乘法ALS 点距离求和的最小值 2.训练数据[test.data] 1,0,1.0 1,1,2.0 1,2,5.0 1,3,5.0 1,4,5.0 2,0,1.0 2,1,2.0 2,2,5.0 2,5,5.0 2,6,4.5 3,1,2.5 3,2,5.0 3,3,4.0 3,4,3.0 4,0,5.0 4,1,5.0 4,2,5.0 4,3,0.0 3.scala实现
- package test.spark.examples.mllib
-
- import org.apache.spark.{SparkConf, SparkContext}
- // $example on$
- import org.apache.spark.mllib.recommendation.ALS
- import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
- import org.apache.spark.mllib.recommendation.Rating
-
- /**
- * 推荐
- */
- object RecomendationDemo {
- def main(args: Array[String]): Unit = {
-
- val conf = new SparkConf().setAppName("CollaborativeFilteringExample").setMaster("local[*]")
-
- val sc = new SparkContext(conf)
- // $example on$
- // 加载和解析数据
- val data = sc.textFile("file:///D:/share/spark/ml/data/mllib/als/test.data")
-
- //将数据转换成Rating评分对象
- val ratings = data.map(
- _.split(',') match
- {
- case Array(user, item, rate) =>
- Rating(user.toInt, item.toInt, rate.toDouble)
- }
- )
-
- println("========== ratings:原始数据集 ===============")
- ratings.collect().foreach(println);
-
- // 使用最小二乘法,构建模型
- val rank = 10
- val numIterations = 10
- val model = ALS.train(ratings, rank, numIterations, 0.01)
-
- //通过测试数据,测试模型的准确性
- //准备测试数据,去掉评分
- val usersProducts = ratings.map { case Rating(user, product, rate) =>
- (user, product)
- }
-
- println("========== usersProducts:测试数据 ===============")
- usersProducts.collect().foreach(println)
-
- //对usersProducts进行预测,产生rate
- val predictions =
- model.predict(usersProducts).map { case Rating(user, product, rate) =>
- ((user, product), rate)
- }
- println("========== predictions:预测结果 ===============")
- predictions.collect().foreach(println)
-
- //对比
- val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
- ((user, product), rate)
- }.join(predictions)
- println("========== ratings + predictions:对比真实和预测结果 ===============")
- ratesAndPreds.collect().foreach(println)
-
- //给2号用户推荐5款商品
- val res = model.recommendProducts(2,5);
- println("========== res:给2号客户的推荐结果 ===============")
- res.foreach(println)
-
- //计算误差
- val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
- val err = (r1 - r2)
- err * err
- }.mean()
- println("Mean Squared Error = " + MSE)
-
- // 保存和加载模型的方式
- model.save(sc, "target/tmp/myCollaborativeFilter")
- val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
- // $example off$
- }
- }
4.java实现
- package test.spark.examples.mllib;
-
- import scala.Tuple2;
- import org.apache.spark.api.java.*;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.mllib.recommendation.ALS;
- import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
- import org.apache.spark.mllib.recommendation.Rating;
- import org.apache.spark.SparkConf;
-
- public class RecomendationDemoJava {
-
- public static void main(String[] args) {
- // $example on$
- SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example").setMaster("local[*]");
- JavaSparkContext jsc = new JavaSparkContext(conf);
-
- //加载和解析数据
- String path = "file:///D:/share/spark/ml/data/mllib/als/test.data";
- JavaRDD<String> data = jsc.textFile(path);
- JavaRDD<Rating> ratings = data.map(
- new Function<String, Rating>() {
- public Rating call(String s) {
- String[] sarray = s.split(",");
- return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
- Double.parseDouble(sarray[2]));
- }
- }
- );
- System.out.println("========== ratings:原始数据集 ===============");
- ratings.collect().forEach( x -> System.out.println(x));
-
-
- //使用最小二乘法构建模型
- int rank = 10;
- int numIterations = 10;
- MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
-
- //通过测试数据,测试模型的准确性
- //准备测试数据,去掉评分
- JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
- new Function<Rating, Tuple2<Object, Object>>() {
- public Tuple2<Object, Object> call(Rating r) {
- return new Tuple2<Object, Object>(r.user(), r.product());
- }
- }
- );
-
- System.out.println(("========== usersProducts:测试数据 ==============="));
- userProducts.collect().forEach(x -> System.out.println(x._1 + ":" + x._2));
-
- //对usersProducts进行预测,产生rate
- JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
- model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
- new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
- public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
- return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
- }
- }
- ));
-
- System.out.println(("========== predictions:预测结果 ==============="));
- predictions.collect().forEach(x-> System.out.println(x._1 + ":" + x._2));
-
- System.out.println(("========== 给2号客户推荐5款商品 ==============="));
- Rating[] ratings1 = model.recommendProducts(2, 5);
- for(Rating r : ratings1)
- {
- System.out.println(r.user() + ":" + r.product() + ":" + r.rating());
- }
-
- JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
- JavaPairRDD.fromJavaRDD(ratings.map(
- new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
- public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
- return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
- }
- }
- )).join(predictions).values();
-
- System.out.println(("========== ratings + predictions:对比真实和预测结果 ==============="));
- ratesAndPreds.collect().forEach(x -> System.out.println(x._1 + ":" + x._2));
-
- //计算误差
- double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
- new Function<Tuple2<Double, Double>, Object>() {
- public Object call(Tuple2<Double, Double> pair) {
- Double err = pair._1() - pair._2();
- return err * err;
- }
- }
- ).rdd()).mean();
- System.out.println("Mean Squared Error = " + MSE);
-
- // 模型的保存和加载
- //model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
- //MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
- //"target/tmp/myCollaborativeFilter");
- // $example off$
-
- jsc.stop();
- }
- }
5.猜你喜欢--推荐 /*******向用户推荐n款商品********/ val res = model.recommendProducts(5,8); /*******将指定的商品推荐给n个用户********/ val res = model.recommendUsers(3,5) /*******向所有用户推荐3种商品********/ val res = model.recommendProductsForUsers(3) 八、电影推荐案例 --------------------------------------------------------------- 1.元数据 0::2::3::1424380312 0::3::1::1424380312 0::5::2::1424380312 0::9::4::1424380312 0::11::1::1424380312 0::12::2::1424380312 1::15::1::1424380312 2::17::1::1424380312 2::19::1::1424380312 ... 2.scala实现
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.ml.evaluation.RegressionEvaluator
- import org.apache.spark.ml.recommendation.ALS
- import org.apache.spark.sql.SparkSession
-
-
- object MovieRecc {
-
- //定义评级样例类【注意:样例类千万不要定义到main函数内部,会解析失败】
- case class Rating0(userId: Int, movieId: Int, rating: Float, timestamp: Long)
-
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf();
- conf.setAppName("movieRecomm");
- conf.setMaster("local[4]")
-
- val spark = SparkSession.builder().config(conf).getOrCreate() ;
- import spark.implicits._
-
- //解析评级
- def parseRating(str: String): Rating0 = {
- val fields = str.split("::")
- assert(fields.size == 4)
- Rating0(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
- }
-
- //转换成Rating的DF对象
- var ratings = spark.sparkContext.textFile("file:///D:\\share\\spark\\ml\\data\\mllib\\als\\sample_movielens_ratings.txt");
- val ratings0 = ratings.map(parseRating)
- val df = ratings0.toDF()
- println("========= df:数据源 ==========================")
- df.collect().foreach(println)
-
- //随机切割训练数据,生成两个一个数组,第一个元素是training,第二个是test
- val Array(training, test) = df.randomSplit(Array(0.99, 0.01))
-
- //构建ALS推荐算法并设置参数
- val als = new ALS().setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("movieId")
- .setRatingCol("rating")
-
- //通过als对象对训练数据进行拟合,生成推荐模型
- val model = als.fit(training)
-
- //使用模型对test,进行结果预测
- val predictions = model.transform(test);
- println("========= predictions 预测结果 ==========================")
- predictions.collect().foreach(println)
- }
- }
3.java实现
- package mllib;
- import java.io.Serializable;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.SparkContext;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.ml.evaluation.RegressionEvaluator;
- import org.apache.spark.ml.recommendation.ALS;
- import org.apache.spark.ml.recommendation.ALSModel;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
-
- public class MovieReccJava {
-
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("movie");
- SparkContext sc = new SparkContext(conf);
- SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
-
- JavaRDD<Rating0> ratingsRDD = spark
- .read().textFile("file:///D:\\share\\spark\\ml\\data\\mllib\\als\\sample_movielens_ratings.txt").javaRDD()
- .map(Rating0::parseRating);
-
- Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating0.class);
- Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.99, 0.01});
- Dataset<Row> training = splits[0];
- Dataset<Row> test = splits[1];
-
- System.out.println(("========= trains 训练数据 =========================="));
- training.show();
- System.out.println(("========= test 测试数据 =========================="));
- test.show();
-
-
- // Build the recommendation model using ALS on the training data
- ALS als = new ALS()
- .setMaxIter(5)
- .setRegParam(0.01)
- .setUserCol("userId")
- .setItemCol("movieId")
- .setRatingCol("rating");
- ALSModel model = als.fit(training);
-
- Dataset<Row> predictions = model.transform(test);
-
- System.out.println(("========= predictions 预测结果 =========================="));
- predictions.show();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。