当前位置:   article > 正文

Spark on Hive——Spark远程连接Hive,对Hive中的表进行操作

spark on hive

目录

1.开启hadoop集群

2.开启mysql

3.开启hive远程连接

4.将hive下面的hive-site.xml和jar包拷贝到spark的conf目录下

5.重启spark-shell

6.查询数据库的表

7.IDEA操作连接hive

8.Spark读取Hive中的库

9.Spark操作Hive中指定表

10.Spark处理后的数据保存在Hive中——三种方法

11.Spark连接Mysql,操作Mysql中的表


        在Spark中操作Hive,此时的HIve基于内存,而不是MR,速度更快。

1.开启hadoop集群

  1. start-dfs.sh
  2. start-yarn.sh

2.开启mysql

mysql -uroot -p

3.开启hive远程连接

  1. nohup hive --service hiveserver2 &
  2. nohup hive --service metastore &

4.将hive下面的hive-site.xml和jar包拷贝到spark的conf目录下

  1. [root@lxm147 jars]# pwd
  2. /opt/soft/spark312/conf
  3. [root@lxm147 conf]# cp /opt/soft/hive312/conf/hive-site.xml /opt/soft/spark312/conf/
  4. [root@lxm147 jars]# pwd
  5. /opt/soft/spark312/jars
  6. [root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-beeline-3.1.2.jar ./
  7. [root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-cli-3.1.2.jar ./
  8. [root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-exec-3.1.2.jar ./
  9. [root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-jdbc-3.1.2.jar ./
  10. [root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-metastore-3.1.2.jar ./
  11. [root@lxm147 jars]# cp /opt/soft/hive312/lib/mysql-connector-java-8.0.29.jar ./

hive-site.xml内容如下:

下面的配置一定要加上,否则无法连接:
<property>
<name>hive.metastore.uris</name>
<value>thrift://lxm147:9083</value>
</property>

  1. <?xml version="1.0" encoding="UTF-8" standalone="no"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. <property>
  5. <name>hive.metastore.warehouse.dir</name>
  6. <value>/opt/soft/hive312/warehouse</value>
  7. <description></description>
  8. </property>
  9. <property>
  10. <name>hive.metastore.db.type</name>
  11. <value>mysql</value>
  12. <description>使用连接的数据库</description>
  13. </property>
  14. <property>
  15. <name>javax.jdo.option.ConnectionURL</name>
  16. <value>jdbc:mysql://192.168.180.141:3306/hive147?createDatabaseIfNotExist=true</value>
  17. <description>Mysql的url</description>
  18. </property>
  19. <property>
  20. <name>javax.jdo.option.ConnectionDriverName</name>
  21. <value>com.mysql.cj.jdbc.Driver</value>
  22. <description></description>
  23. </property>
  24. <property>
  25. <name>javax.jdo.option.ConnectionUserName</name>
  26. <value>root</value>
  27. <description></description>
  28. </property>
  29. <property>
  30. <name>javax.jdo.option.ConnectionPassword</name>
  31. <value>root</value>
  32. <description></description>
  33. </property>
  34. <property>
  35. <name>hive.metastore.schema.verification</name>
  36. <value>false</value>
  37. <description>关闭schema验证</description>
  38. </property>
  39. <property>
  40. <name>hive.cli.print.current.db</name>
  41. <value>true</value>
  42. <description>提示当前数据库名</description>
  43. </property>
  44. <property>
  45. <name>hive.cli.print.header</name>
  46. <value>true</value>
  47. <description>查询输出时带列名一起输出</description>
  48. </property>
  49. <property>
  50. <name>hive.server2.active.passive.ha.enable</name>
  51. <value>true</value>
  52. </property>
  53. <property>
  54. <name>hive.metastore.local</name>
  55. <value>false</value>
  56. <description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
  57. </property>
  58. <property>
  59. <name>hive.metastore.uris</name>
  60. <value>thrift://lxm147:9083</value>
  61. </property>
  62. <property>
  63. <name>hive.zookeeper.quorum</name>
  64. <value>192.168.180.147</value>
  65. </property>
  66. <property>
  67. <name>hbase.zookeeper.quorum</name>
  68. <value>192.168.180.147</value>
  69. </property>
  70. <property>
  71. <name>hive.aux.jars.path</name>
  72. <value>file:///opt/soft/hive312/lib/hive-hbase-handler-3.1.2.jar,file:///opt/soft/hive312/lib/zookeeper-3.4.6.jar,file:///opt/soft/hive312/lib/hbase-client-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-common-2.3.5-tests.jar,file:///opt/soft/hive312/lib/hbase-server-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-common-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-protocol-2.3.5.jar,file:///opt/soft/hive312/lib/htrace-core-3.2.0-incubating.jar</value>
  73. </property>
  74. </configuration>

5.重启spark-shell

[root@lxm147 jars]# spark-shell

6.查询数据库的表

  1. scala> spark.table("db_hive2.login_events").show
  2. 2023-04-04 13:33:35,138 WARN conf.HiveConf: HiveConf of name hive.metastore.local does not exist
  3. 2023-04-04 13:33:35,138 WARN conf.HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist
  4. 2023-04-04 13:33:35,139 WARN conf.HiveConf: HiveConf of name hive.metastore.db.type does not exist
  5. +-------+-------------------+
  6. |user_id| login_datetime|
  7. +-------+-------------------+
  8. | 100|2021-12-01 19:00:00|
  9. | 100|2021-12-01 19:30:00|
  10. | 100|2021-12-02 21:01:00|
  11. | 100|2021-12-03 11:01:00|
  12. | 101|2021-12-01 19:05:00|
  13. | 101|2021-12-01 21:05:00|
  14. | 101|2021-12-03 21:05:00|
  15. | 101|2021-12-05 15:05:00|
  16. | 101|2021-12-06 19:05:00|
  17. | 102|2021-12-01 19:55:00|
  18. | 102|2021-12-01 21:05:00|
  19. | 102|2021-12-02 21:57:00|
  20. | 102|2021-12-03 19:10:00|
  21. | 104|2021-12-04 21:57:00|
  22. | 104|2021-12-02 22:57:00|
  23. | 105|2021-12-01 10:01:00|
  24. +-------+-------------------+

7.IDEA操作连接hive

  1. import org.apache.spark.sql.SparkSession
  2. object SparkGive {
  3. def main(args: Array[String]): Unit = {
  4. // 创建会话
  5. val spark: SparkSession = SparkSession.builder().appName("sparkhive")
  6. .master("local[*]")
  7. .config("hive.metastore.uris","thrift://192.168.180.147:9083")
  8. .enableHiveSupport()
  9. .getOrCreate()
  10. // 关闭会话
  11. spark.close()
  12. }
  13. }

8.Spark读取Hive中的库

  1. // 读取数据库
  2. spark.sql("show databases").show()
  3. /*
  4. +---------+
  5. |namespace|
  6. +---------+
  7. | atguigu|
  8. | bigdata|
  9. | db_hive2|
  10. | default|
  11. | lalian|
  12. | mydb|
  13. | shopping|
  14. +---------+
  15. */

9.Spark操作Hive中指定表

  1. // 读取指定数据库指定表
  2. val ecd = spark.table("shopping.ext_customer_details")
  3. ecd.show()
  4. +-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
  5. |customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
  6. +-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
  7. |customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
  8. | 1| Spencer| Raffeorty|sraffeorty0@dropb...| Male| 9274 Lyons Court| China| Khmer|Safety Technician...| jcb| 3589373385487669|
  9. | 2| Cherye| Poynor| cpoynor1@51.la|Female|1377 Anzinger Avenue| China| Czech| Research Nurse| instapayment| 6376594861844533|
  10. | 3| Natasha| Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane| China| Yiddish|Budget/Accounting...| visa| 4041591905616356|
  11. | 4| Huntley| Seally| hseally3@prlog.org| Male| 694 Del Sol Lane| China| Albanian|Environmental Spe...| laser| 677118310740263477|
  12. | 5| Druci| Coad| dcoad4@weibo.com|Female| 16 Debs Way| China| Hebrew| Teacher| jcb| 3537287259845047|
  13. +-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
  14. import spark.implicits._
  15. import org.apache.spark.sql.functions._
  16. // 过滤首行
  17. val cuDF: Dataset[Row] = ecd.filter($"customer_id" =!= "customer_id")
  18. +-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
  19. |customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
  20. +-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
  21. | 1| Spencer| Raffeorty|sraffeorty0@dropb...| Male| 9274 Lyons Court| China| Khmer|Safety Technician...| jcb| 3589373385487669|
  22. | 2| Cherye| Poynor| cpoynor1@51.la|Female|1377 Anzinger Avenue| China| Czech| Research Nurse| instapayment| 6376594861844533|
  23. | 3| Natasha| Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane| China| Yiddish|Budget/Accounting...| visa| 4041591905616356|
  24. | 4| Huntley| Seally| hseally3@prlog.org| Male| 694 Del Sol Lane| China| Albanian|Environmental Spe...| laser| 677118310740263477|
  25. | 5| Druci| Coad| dcoad4@weibo.com|Female| 16 Debs Way| China| Hebrew| Teacher| jcb| 3537287259845047|
  26. +-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
  27. // 求每个国家有多少人
  28. val userDF: DataFrame = cuDF.groupBy("country").agg(count("customer_id").as("userNum"))
  29. userDF.printSchema()
  30. /*
  31. root
  32. |-- country: string (nullable = true)
  33. |-- userNum: long (nullable = false)*/
  34. userDF.show()
  35. /*
  36. +-------------+-------+
  37. | country|userNum|
  38. +-------------+-------+
  39. | China| 426|
  40. |United States| 51|
  41. | Canada| 23|
  42. +-------------+-------+*/

10.Spark处理后的数据保存在Hive中——三种方法

  1. // 将计算出的结果保存
  2. // userDF.write.saveAsTable("shopping.userDF")
  3. // userDF.write.mode("append").saveAsTable("shopping.userDF")
  4. // 使用枚举也可以进行存储
  5. userDF.write.mode(SaveMode.Append).saveAsTable("shopping.userDF")
  6. // 也可以将spark处理好的数据写入Mysql,用BI工具展示

11.Spark连接Mysql,操作Mysql中的表

  1. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  2. import java.util.Properties
  3. object DataFrameMysql {
  4. def main(args: Array[String]): Unit = {
  5. // 创建会话
  6. val spark: SparkSession = SparkSession.builder().appName("dataframemysql")
  7. .master("local[*]")
  8. .getOrCreate()
  9. import spark.implicits._
  10. import org.apache.spark.sql.functions._
  11. // spark连接Mysql数据库,注意指定数据库名
  12. val url = "jdbc:mysql://192.168.180.141:3306/mysql50"
  13. val user = "root"
  14. val pwd = "数据库密码"
  15. val driver = "com.mysql.cj.jdbc.Driver"
  16. val properties = new Properties()
  17. properties.setProperty("user", user)
  18. properties.setProperty("password", pwd)
  19. properties.setProperty("driver", driver)
  20. // spark读取所有文件,所有文件的区别
  21. // spark读取数据库下的表,转换为DataFrame
  22. val studentDF: DataFrame = spark.read.jdbc(url, "student", properties)
  23. val scDF: DataFrame = spark.read.jdbc(url, "sc", properties)
  24. val courseDF: DataFrame = spark.read.jdbc(url, "course", properties)
  25. val teacherDF: DataFrame = spark.read.jdbc(url, "teacher", properties)
  26. studentDF.show()
  27. scDF.show()
  28. courseDF.show()
  29. teacherDF.show()
  30. // 将scDF作为新表进行存储
  31. scDF.write.mode(SaveMode.Overwrite).jdbc(url, "sc2", properties)
  32. spark.close()
  33. }
  34. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/872698?site
推荐阅读
相关标签
  

闽ICP备14008679号