当前位置:   article > 正文

【Spark】spark对mysql的操作_spark支持mysql

spark支持mysql

目录

一、前言

二、使用技巧

1、读取mysql满足条件的行记录

2、整体写入mysql的操作

3、更新mysql的某行记录


一、前言

        使用spark技术和mysql交互的时候往往会遇到以下几种情况,需要编写不同的api方式来满足开发过程中的不同需求,这里使用的语言为scala变成语言;

  • 读取mysql满足条件的行记录
  • 整体写入mysql的操作
  • 更新mysql的某行记录

二、使用技巧

1、读取mysql满足条件的行记录

  • 首先需要初始化SparkSession对象,这里比较常用通过连接hive的api获取、同理其他方式获取也可以;

        连接hive获取:

  1. //conHive方法在DBConUtil类中;
  2. def conHive(appName:String):SparkSession={
  3. SparkSession.builder()
  4. //.master("local[2]")
  5. .appName(appName)
  6. .config("spark.sql.broadcastTimeout","36000")
  7. // .config("spark.default.parallelism",1000)
  8. .config("hive.exec.dynamici.partition", true)
  9. .config("hive.exec.dynamic.partition.mode", "nonstrict")
  10. .enableHiveSupport()
  11. .getOrCreate()
  12. }
  13. val spark: SparkSession = DBConUtil.conHive("test")

        其他方式获取:

  1. val spark: SparkSession = SparkSession
  2. .builder()
  3. .appName("test")
  4. .master("local[*]")
  5. .getOrCreate()
  • 然后使用初始化好的SparkSession对象进行mysql数据库数据的读取操作;
  1. val properties = new Properties()
  2. properties.setProperty("user","mysqldb")
  3. properties.setProperty("password","pwd")
  4. val url="jdbc:mysql://ip:3306/test?characterEncoding=utf8&useSSL=true"
  5. var df= spark.read.jdbc(url,"table1",properties)
  6. .select("name","age","sex")
  7. .where("age>20")

2、整体写入mysql的操作

        这里的整体写入mysql的操作的含义是将条件筛选之后的DataFram或着DataSet直接写入mysql,调用的是spark官方提供的api。所以首先要创建出来一个DataFram或者DataSet数据集,接下来就是直接写入;

  1. val properties = new Properties()
  2. properties.setProperty("user", mysqlUser)
  3. properties.setProperty("password", mysqlPwd)
  4. df.repartition(80).write.mode(SaveMode.Append).option("driver","com.mysql.jdbc.Driver")
  5. .jdbc(mysqlUrl, mysqlRetTable, properties)

存储模式主要包含如下几种:

  1. SaveMode.ErrorIfExists【默认】模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
  2. SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
  3. SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
  4. SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。

3、更新mysql的某行记录

        有时候在写spark程序的时候需要对mysql中的单行或者多行的某些字段进行更新操作,spark api并没有提供这些操作,这里需要自己写原生的JDBC操作更新或者批量更新mysql记录;

  1. val connection: Connection = JdbcTemplateUtil.
  2. getConnection("jdbc:mysql://ip:3306/url_analyse?characterEncoding=utf8&useSSL=false",
  3. "mysqldb", "pwd")
  4. JdbcTemplateUtil.executeSql(connection,"insert into test01(id,test) values(?,?)",Array("117","aa"))
  5. //批量插入
  6. // var arrayBuffer = new ArrayBuffer[Array[String]]()
  7. // arrayBuffer += Array("220","bb")
  8. // arrayBuffer += Array("330","cc")
  9. // arrayBuffer += Array("440","dd")
  10. // JdbcTemplateUtil.executeBatchSql(connection,"insert into test01(id,test) values(?,?)",arrayBuffer)
  1. import com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  2. import java.sql.{Connection, DriverManager}
  3. import scala.collection.mutable.ArrayBuffer
  4. object JdbcTemplateUtil {
  5. /**
  6. * 单条操作
  7. * @param sql
  8. * @param params
  9. */
  10. def executeSql(conn: Connection, sql: String, params: Array[String]): Unit = {
  11. try {
  12. val ps = conn.prepareStatement(sql)
  13. if (params != null) {
  14. for (i <- params.indices)
  15. ps.setString(i + 1, params(i))
  16. }
  17. val update = ps.executeUpdate()
  18. ps.close()
  19. } catch {
  20. case e: Exception => println(">>>Execute Sql Exception..." + e)
  21. }
  22. }
  23. /**
  24. * 批量操作
  25. * @param sql
  26. * @param paramList
  27. */
  28. def executeBatchSql(conn: Connection, sql: String, paramList: ArrayBuffer[Array[String]]): Unit = {
  29. try {
  30. val ps = conn.prepareStatement(sql)
  31. conn.setAutoCommit(false)
  32. for (params: Array[String] <- paramList) {
  33. if (params != null) {
  34. for (i <- params.indices) ps.setString(i + 1, params(i))
  35. ps.addBatch()
  36. }
  37. }
  38. ps.executeBatch()
  39. conn.commit()
  40. ps.close()
  41. conn.close()
  42. } catch {
  43. case e: Exception => println(">>>Execute Batch Sql Exception..." + e)
  44. }
  45. }
  46. /**
  47. * 获取mysql连接
  48. * @param url
  49. * @param user
  50. * @param pwd
  51. * @return
  52. */
  53. def getConnection(url:String,user:String,pwd:String):Connection={
  54. //classOf[com.mysql.cj.jdbc.Driver]
  55. Class.forName("com.mysql.jdbc.Driver")
  56. DriverManager.getConnection(url,user,pwd)
  57. }
  58. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/205340
推荐阅读
相关标签
  

闽ICP备14008679号