赞
踩
本篇主要介绍数据来源、数据加载进数据库过程
使用Scrapy爬取豆瓣电影数据,然后利用movielens数据集来造一份rating数据。
movie.csv
前2791条数据。movie.csv
的电影ID,因此最后我们得到的电影数据也就有了对应的评分数据。(1)Movie数据
数据表格式为:
mid,title,desc,minute,year,year,language,geners,actors,director
(2)Rating数据
userID,mid,score,timestamp
我们选择MongoDB数据库的原因如下:
接下来,我们在云服务器部署MongoDB,主机远程连接数据库,将文件加载进数据库中。
maven相关依赖版本如下
注意:Spark版本和Spark集群的版本需要一致
scala
:2.11.8
Spark
:2.3.0
<properties> <scala.version>2.11.8</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> </dependencies>
// 加载数据主程序 object DataLoader { val MONGODB_MOVIE_COLLECTION = "Movie" val MONGODB_RATING_COLLECTION = "Rating" val mgo_host = "root" val config = Map( "spark.cores" -> "local[*]", "mongo.uri" -> "mongodb://root:123456@服务器公网IP:27017/recommender", "mongo.db" -> "recommender" ) // 文件位置 val MOVIE_DATA_PATH = "F:\\1-project\\offline\\src\\main\\resources\\file\\movie.csv" val RATING_DATA_PATH = "F:\\1-project\\offline\\src\\main\\resources\\file\\ratings.csv" def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader") val spark = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._ val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH) // 加载数据 // 转为df val movieDF = movieRDD.map( item => { val attr = item.split(",") Movie(attr(0).toInt, attr(1).trim, attr(2).trim, attr(3).trim, attr(4).trim, attr(5).trim, attr(6).trim, attr(7).trim, attr(8).trim) } ).toDF() val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH) val ratingDF = ratingRDD.map(item => { val attr = item.split(",") Rating(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt) }).toDF() implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) // 将数据保存到MongoDB storeDataInMongoDB(movieDF, ratingDF) spark.stop() } }
def storeDataInMongoDB(movieDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit ={ // 新建一个mongodb的连接 val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri)) // 将DF数据写入对应的mongodb表中 movieDF.write .option("uri", mongoConfig.uri) .option("collection", MONGODB_MOVIE_COLLECTION) .mode("overwrite") .format("com.mongodb.spark.sql") .save() ratingDF.write .option("uri", mongoConfig.uri) .option("collection", MONGODB_RATING_COLLECTION) .mode("overwrite") .format("com.mongodb.spark.sql") .save() //对数据表建索引 mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1)) mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1)) mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1)) mongoClient.close() }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。