赞
踩
目录
使用spark技术和mysql交互的时候往往会遇到以下几种情况,需要编写不同的api方式来满足开发过程中的不同需求,这里使用的语言为scala变成语言;
连接hive获取:
- //conHive方法在DBConUtil类中;
- def conHive(appName:String):SparkSession={
- SparkSession.builder()
- //.master("local[2]")
- .appName(appName)
- .config("spark.sql.broadcastTimeout","36000")
- // .config("spark.default.parallelism",1000)
- .config("hive.exec.dynamici.partition", true)
- .config("hive.exec.dynamic.partition.mode", "nonstrict")
- .enableHiveSupport()
- .getOrCreate()
- }
-
- val spark: SparkSession = DBConUtil.conHive("test")
其他方式获取:
- val spark: SparkSession = SparkSession
- .builder()
- .appName("test")
- .master("local[*]")
- .getOrCreate()
- val properties = new Properties()
- properties.setProperty("user","mysqldb")
- properties.setProperty("password","pwd")
- val url="jdbc:mysql://ip:3306/test?characterEncoding=utf8&useSSL=true"
- var df= spark.read.jdbc(url,"table1",properties)
- .select("name","age","sex")
- .where("age>20")
这里的整体写入mysql的操作的含义是将条件筛选之后的DataFram或着DataSet直接写入mysql,调用的是spark官方提供的api。所以首先要创建出来一个DataFram或者DataSet数据集,接下来就是直接写入;
- val properties = new Properties()
- properties.setProperty("user", mysqlUser)
- properties.setProperty("password", mysqlPwd)
- df.repartition(80).write.mode(SaveMode.Append).option("driver","com.mysql.jdbc.Driver")
- .jdbc(mysqlUrl, mysqlRetTable, properties)
存储模式主要包含如下几种:
- SaveMode.ErrorIfExists【默认】模式,该模式下,如果数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
- SaveMode.Append 如果表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
- SaveMode.Overwrite 重写模式,其实质是先将已有的表及其数据全都删除,再重新创建该表,最后插入新的数据;
- SaveMode.Ignore 若表不存在,则创建表,并存入数据;在表存在的情况下,直接跳过数据的存储,不会报错。
有时候在写spark程序的时候需要对mysql中的单行或者多行的某些字段进行更新操作,spark api并没有提供这些操作,这里需要自己写原生的JDBC操作更新或者批量更新mysql记录;
- val connection: Connection = JdbcTemplateUtil.
- getConnection("jdbc:mysql://ip:3306/url_analyse?characterEncoding=utf8&useSSL=false",
- "mysqldb", "pwd")
- JdbcTemplateUtil.executeSql(connection,"insert into test01(id,test) values(?,?)",Array("117","aa"))
-
- //批量插入
- // var arrayBuffer = new ArrayBuffer[Array[String]]()
- // arrayBuffer += Array("220","bb")
- // arrayBuffer += Array("330","cc")
- // arrayBuffer += Array("440","dd")
- // JdbcTemplateUtil.executeBatchSql(connection,"insert into test01(id,test) values(?,?)",arrayBuffer)
- import com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
-
- import java.sql.{Connection, DriverManager}
- import scala.collection.mutable.ArrayBuffer
-
- object JdbcTemplateUtil {
- /**
- * 单条操作
- * @param sql
- * @param params
- */
- def executeSql(conn: Connection, sql: String, params: Array[String]): Unit = {
- try {
- val ps = conn.prepareStatement(sql)
- if (params != null) {
- for (i <- params.indices)
- ps.setString(i + 1, params(i))
- }
- val update = ps.executeUpdate()
- ps.close()
- } catch {
- case e: Exception => println(">>>Execute Sql Exception..." + e)
- }
- }
- /**
- * 批量操作
- * @param sql
- * @param paramList
- */
- def executeBatchSql(conn: Connection, sql: String, paramList: ArrayBuffer[Array[String]]): Unit = {
- try {
- val ps = conn.prepareStatement(sql)
- conn.setAutoCommit(false)
- for (params: Array[String] <- paramList) {
- if (params != null) {
- for (i <- params.indices) ps.setString(i + 1, params(i))
- ps.addBatch()
- }
- }
- ps.executeBatch()
- conn.commit()
- ps.close()
- conn.close()
- } catch {
- case e: Exception => println(">>>Execute Batch Sql Exception..." + e)
- }
- }
-
- /**
- * 获取mysql连接
- * @param url
- * @param user
- * @param pwd
- * @return
- */
- def getConnection(url:String,user:String,pwd:String):Connection={
- //classOf[com.mysql.cj.jdbc.Driver]
- Class.forName("com.mysql.jdbc.Driver")
- DriverManager.getConnection(url,user,pwd)
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。