当前位置:   article > 正文

头歌:Spark案例剖析 - 谷歌网页排名引擎PageRank实战

头歌:Spark案例剖析 - 谷歌网页排名引擎PageRank实战

第1关:海量数据导入:SparkSQL大数据导入处理

任务描述


工欲善其事必先利其器,大数据分析中最重要的是熟练掌握数据导入工具的使用方法。Spark SQL是Spark自带的数据库,本关你将应用Spark SQL的数据导入工具实现文本数据的导入。其中,graphx-wiki-vertices.txt文件中含有网页及其id数据,graphx-wiki-edges.txt文件中含有网页及其连接网页id数据。

相关知识


Spark与Spark SQL介绍
Spark由Spark SQL,Spark Mlib,Spark Streaming以及Spark GraphX等模块构成。我们通常从Spark SQL或者Spark Streaming中导入数据,再利用Spark Mlib或者Spark GraphX等模块进行数据处理分析。

Spark SQL摆脱了对Hive的依赖性,比起之前的Hive无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。当使用其他编程语言运行Spark SQL时,将返回数据类型为Dataset或者DataFrame的结果。你还可以使用命令行或通过JDBC/ODBC与Spark SQL界面进行交互。Spark SQL有两个分支,sqlContext和hiveContext,sqlContext目前只支持SQL语法解析器;hiveContext现在支持SQL语法解析器和hivesql语法解析器,用户可以通过配置切换成SQL语法解析器,来运行hiveSQL不支持的语法。下面我们重点介绍Spark SQL的初始化,数据库的使用,外部数据的导入,从而将网页数据导入数据库中方便之后处理。

初始化SparkSQL


利用Scala语言使用Spark SQL首先应该导入Spark SQL的相关包,如下例所示:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.sql._


也可通过bin/spark-sql命令,通过类似MySQL命令行交互的方式直接访问SparkSQL数据库。

查看使用数据库


在spark-sql中查询使用数据库和基本的关系数据库使用方法类似。

  1. show databases;
  2. use mydatabase


通过上述命令即可查询现有数据库并使用数据库。

从文本文件中导入数据
CREATE TABLE vertices(ID BigInt,Title String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n';
LOAD DATA LOCAL IN PATH 'path/file.txt' INTO TABLE vertices;
上述命令行完成了两样事,创建表vertices并将本地的file.txt导入表中。表中数据按照tab缩进以及换行符进行区分。语句1创建表vertices并创建两个字段ID和Title,其中ID的字段属性为BigInt,Title属性为String。语句二将路径中的file.txt文件导入表中。

利用Scala语言进行数据库操作
val spark = SparkSession.builder.master("local").appName("tester").enableHiveSupport().getOrCreate()
spark.sql("use default")
上述代码通过SparkSession的enableHiveSupport()方法支持Hive数据库的基本使用。通过spark.sql()方法起到对数据库的操作。

编程要求


本关任务主要是利用Spark SQL创建表vertices以及edges,并将本地graphx-wiki-vertices.txt和graphx-wiki-edges.txt两个文件的数据分别导入到表中。

评测说明


评测耗时说明
本实训目前是基于Spark单机模式的运行方式,完成以上评测流程所需时间较长(全过程耗时约118秒),请耐心等待

补全下述代码,使之编译通过。

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.sql._
  4. object SparkSQLHive {
  5.     def main(args: Array[String]) = {
  6.         val sparkConf = new SparkConf().setAppName("PageRank")
  7.         val sc = new SparkContext(sparkConf)
  8.         val spark = SparkSession.builder.master("local").appName("tester").enableHiveSupport().getOrCreate()
  9.         spark.sql("use default")
  10.         import spark.implicits._
  11.         //drop table if it exists
  12.         spark.sql("DROP TABLE IF EXISTS vertices"
  13.         spark.sql("DROP TABLE IF EXISTS edges")
  14.         //create table here
  15.         spark.sql("CREATE TABLE IF NOT EXISTS vertices(ID BigInt,Title String)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n&
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/529515
推荐阅读
相关标签
  

闽ICP备14008679号