赞
踩
当用户和产品的ID是数值型时,ALS算法实现效率会更高,Int的最大值(Int.MaxValue = 2147483647),通过代码来观察我们的数据(user_artist_data.txt)。
val rawUserArtistData = spark.read.textFile("hdfs://...")
val userArtistDF = rawUserArtistData.map{ line =>
val Array(user, artist, _*) = line.split(' ')
(user.toInt, artist.toInt)
}.toDF("user","artist")
userArtistDF.agg(min("user"),max("user"),min("artist"),max("artist")).show()
读取第二份文件(artist_data.txt),它是难以分辨的数字ID所对应的艺术家的名字,文件包含了用制表符分割的艺术家ID和艺术家的名字
val rawArtistData = spark.read.textFile("hdfs://..")
val artistByID = rawArtistData.flatMap{ line =>
val (id,name) = line.span(_!= '\t')
if (name.isEmpty){
None
} else {
try{
Some((id.toInt,name.trim))
}catch{
case _:NumberFormatException => None
}
}
}.toDF("id","name")
读取第三份文件(artist_alias.txt),它将拼写错误的艺术家ID或非标准化的艺术家ID映射为艺术家的正规名字。其中每行有两个ID,用制表符分隔,将“不良的”艺术家ID映射到“良好的”ID,而不是简单地把它作为包含艺术家ID二元组的数据集。
val rawArtistAlias = spark.read.textFile("hdfs://...")
val artistAlias = rawArtistAlias.flatMap{ line =>
val Array(artist,alias) = line.split("\t")
if (artist.isEmpty){
None
}else{
Some((artist.toInt,alias.toInt))
}
}.collect().toMap
import org.apache.spark.ml.recommendation._
import scala.util.Random
val model = new ALS().
setSeed(Random.nextLong()). //设置随机种子
setImplicitPrefs(true).
setRank(10).
setRegParam(0.01).
setAlpha(1.0).
setMaxIter(5).
setUserCol("user").
setItemCol("artist").
setRatingCol("count").
setPredictionCol("prediction").
fit(trainData)
val userID = 2093760
val existingArtistIDs = trainData
.filter($"user" === userID) //找到用户2093760对应的行
.select("artist").as[Int].collect() //收集艺术家ID的整型集合
artistByID.filter($"id" isin (existingArtistIDs:_*)).show() //过滤艺术家;_*变长参数语法
def makeRecommendations(
model: ALSModel,
userID: Int,
howMany: Int): DataFrame ={
val toRecommend = model.itemFactors.
select($"id".as("artist")).
withColumn("user",lit(userID)) //选择所有艺术家ID与对应的目标用户ID
model.transform(toRecommend).
select("artist","prediction").
orderBy($"prediction".desc).
limit(howMany) //对所有艺术家评分,并返回其中分值最高的
}
val topRecommendations = makeRecommendations(model,userID,5)
topRecommendations.show()
val recommendArtistIDs= topRecommendations.select("artist").as[Int].collect()
artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()
def areaUnderCurve( positiveData: DataFrame, bAllArtistIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive". // Make predictions for each of them, including a numeric score val positivePredictions = predictFunction(positiveData.select("user", "artist")). withColumnRenamed("prediction", "positivePrediction") // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other artists, excluding those that are "positive" for the user. val negativeData = positiveData.select("user", "artist").as[(Int,Int)]. groupByKey { case (user, _) => user }. flatMapGroups { case (userID, userIDAndPosArtistIDs) => val random = new Random() val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet val negative = new ArrayBuffer[Int]() val allArtistIDs = bAllArtistIDs.value var i = 0 // Make at most one pass over all artists to avoid an infinite loop. // Also stop when number of negative equals positive set size while (i < allArtistIDs.length && negative.size < posItemIDSet.size) { val artistID = allArtistIDs(random.nextInt(allArtistIDs.length)) // Only add new distinct IDs if (!posItemIDSet.contains(artistID)) { negative += artistID } i += 1 } // Return the set with user ID added back negative.map(artistID => (userID, artistID)) }.toDF("user", "artist") val allData = buildCounts(rawUserArtistData, bArtistAlias) //前文已经定义 val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData.cache() cvData.cache() val allArtistIDs = allData.select("artist").as[Int].distinct().collect() //去重并收集给驱动程序 val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs) val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(rank).setRegParam(regParam). setAlpha(alpha).setMaxIter(20). setUserCol("user").setItemCol("artist"). setRatingCol("count").setPredictionCol("prediction"). fit(trainData) areaUnderCurve(cvData, bAllArtistIDs, model.transform)
def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
val listenCounts = train.groupBy("artist").
agg(sum("count").as("prediction")).
select("artist", "prediction")
allData.
join(listenCounts, Seq("artist"), "left_outer").
select("user", "artist", "prediction")
}
val mostListenedAUC = areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData))
println(mostListenedAUC)
val evaluations = for (rank <- Seq(5, 30); regParam <- Seq(1.0, 0.0001); alpha <- Seq(1.0, 40.0)) //这里表示为3层嵌套for循环,rank循环里嵌套着regParam循环,里面再嵌套着alpha循环 yield { val model = new ALS(). setSeed(Random.nextLong()). setImplicitPrefs(true). setRank(rank).setRegParam(regParam). setAlpha(alpha).setMaxIter(20). setUserCol("user").setItemCol("artist"). setRatingCol("count").setPredictionCol("prediction"). fit(trainData) val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform) model.userFactors.unpersist() // 立即释放模型占用的资源 model.itemFactors.unpersist() (auc, (rank, regParam, alpha)) } evaluations.sorted.reverse.foreach(println) // 按第一个值(AUC)的降序排列并输出
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。