当前位置:   article > 正文

【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)_maven spark读取sql

maven spark读取sql

需要源码和依赖请点赞关注收藏后评论区留言私信~~~

一、Dataframe操作

步骤如下

1)利用IntelliJ IDEA新建一个maven工程,界面如下

2)修改pom.XML添加相关依赖包

3)在工程名处点右键,选择Open Module Settings

4)配置Scala Sdk,界面如下

5)新建文件夹scala,界面如下:

6) 将文件夹scala设置成Source Root,界面如下:

 

7) 新建scala类,界面如下:

 此类主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,相关代码如下

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. case class Person(name:String,age:Long)
  4. object sparkSqlSchema {
  5. def main(args: Array[String]): Unit = {
  6. //创建Spark运行环境
  7. val spark = SparkSession.builder().appName("sparkSqlSchema").master("local").getOrCreate()
  8. val sc = spark.sparkContext;
  9. //读取文件
  10. val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
  11. //将RDD与样例类关联
  12. val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
  13. //手动导入隐式转换
  14. import spark.implicits._
  15. val personDF: DataFrame = personRdd.toDF
  16. //显示DataFrame的数据
  17. personDF.show()
  18. //显示DataFrame的schema信息
  19. personDF.printSchema()
  20. //显示DataFrame记录数
  21. println(personDF.count())
  22. //显示DataFrame的所有字段
  23. personDF.columns.foreach(println)
  24. //取出DataFrame的第一行记录
  25. println(personDF.head())
  26. //显示DataFrame中name字段的所有值
  27. personDF.select("name").show()
  28. //过滤出DataFrame中年龄大于20的记录
  29. personDF.filter($"age" > 20).show()
  30. //统计DataFrame中年龄大于20的人数
  31. println(personDF.filter($"age" > 20).count())
  32. //统计DataFrame中按照年龄进行分组,求每个组的人数
  33. personDF.groupBy("age").count().show()
  34. //将DataFrame注册成临时表
  35. personDF.createOrReplaceTempView("t_person")
  36. //传入sql语句,进行操作
  37. spark.sql("select * from t_person").show()
  38. spark.sql("select * from t_person where name='王五'").show()
  39. spark.sql("select * from t_person order by age desc").show()
  40. //DataFrame转换成Dataset
  41. var ds=personDF.as[Person]
  42. ds.show()
  43. //关闭操作
  44. sc.stop()
  45. spark.stop()
  46. }
  47. }

二、Spark SQL读写MySQL数据库

下面的代码使用JDBC连接MySQL数据库,并进行读写操作 主要步骤如下

1:新建数据库

2:新建表

3:添加依赖包

4:新建类

5:查看运行结果

代码如下

  1. import java.util.Properties
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}
  4. object sparkSqlMysql {
  5. def main(args: Array[String]): Unit = {
  6. //创建sparkSession对象
  7. val spark: SparkSession = SparkSession.builder()
  8. .appName("sparkSqlMysql")
  9. .master("local")
  10. .getOrCreate()
  11. val sc = spark.sparkContext
  12. //读取数据
  13. val data: RDD[Array[String]] = sc.textFile("D:/people.txt").map(x => x.split(","));
  14. //RDD关联Person
  15. val personRdd: RDD[Person] = data.map(x => Person(x(0), x(1).toLong))
  16. //导入隐式转换
  17. import spark.implicits._
  18. //将RDD转换成DataFrame
  19. val personDF: DataFrame = personRdd.toDF()
  20. personDF.show()
  21. //创建Properties对象,配置连接mysql的用户名和密码
  22. val prop =new Properties()
  23. prop.setProperty("user","root")
  24. prop.setProperty("password","123456")
  25. //将personDF写入MySQL
  26. personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
  27. //从数据库里读取数据
  28. val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person", prop)
  29. mysqlDF.show()
  30. spark.stop()
  31. }
  32. }

三、Spark SQL读写Hive

下面的示例程序连接Hive,并读写Hive下的表 主要步骤如下

1:在pom.xml中添加Hive依赖包

2:连接Hive

3:新建表

4:向Hive表写入数据,新scala类sparksqlToHIVE,主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,然后插入到HIVE的表中。

5:查看运行结果

代码如下

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame,SparkSession}
  3. object sparksqlToHIVE {
  4. def main(args: Array[String]): Unit = {
  5. //设置访问用户名,主要用于访问HDFS下的Hive warehouse目录
  6. System.setProperty("HADOOP_USER_NAME", "root")
  7. //创建sparkSession
  8. val spark: SparkSession = SparkSession.builder()
  9. .appName("sparksqlToHIVE")
  10. .config("executor-cores",1)
  11. .master("local")
  12. .enableHiveSupport() //开启支持Hive
  13. .getOrCreate()
  14. val sc = spark.sparkContext
  15. //读取文件
  16. val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
  17. //将RDD与样例类关联
  18. val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
  19. //手动导入隐式转换
  20. import spark.implicits._
  21. val personDF: DataFrame = personRdd.toDF
  22. //显示DataFrame的数据
  23. personDF.show()
  24. //将DataFrame注册成临时表t_person
  25. personDF.createOrReplaceTempView("t_person")
  26. //显示临时表t_person的数据
  27. spark.sql("select * from t_person").show()
  28. //使用Hive中bigdata的数据库
  29. spark.sql("use bigdata")
  30. //将临时表t_person的数据插入使用Hive中bigdata数据库下的person表中
  31. spark.sql("insert into person select * from t_person")
  32. //显示用Hive中bigdata数据库下的person表数据
  33. spark.sql("select * from person").show()
  34. spark.stop()
  35. }
  36. }

创作不易 觉得有帮助请点赞关注收藏~~~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/693350
推荐阅读
相关标签
  

闽ICP备14008679号