当前位置:   article > 正文

使用 Flink CDC 实现 MySQL 数据,表结构实时入 Apache Doris_flinkcdc同步mysql到doris

flinkcdc同步mysql到doris

背景

  1. 现有数据库:mysql
  2. 数据:库表较多,每个企业用户一个分库,每个企业下的表均不同,无法做到聚合,且表可以被用户随意改动,增删改列等,增加表
  3. 分析:用户自定义分析,通过拖拽定义图卡,要求实时,点击确认即出现相应结果,其中有无法预判的过滤
  4. 问题:随业务增长,企业用户越来越多,mysql压力越来越大,已经出现一些图卡加载过慢[mysql sql]

同步流程

  1. 脚本读取mysql中需要同步的企业,在获取需要同步的表,以字段 member_id,table 字段存储doris中表A,
  2. 脚本读取doris 表A数据,获取mysql中的schema,通过转换,获取doris建表语句,连接doris执行语句
  3. cancel flink 任务,并重新启动flink任务【重启只适合添加新库,新表不用重启】
    1. 每次重启连接doris 表A,获取database,组装 databaseList,tableList,tablseList 使用正则,database1.*,database2.*,对库内所有表进行监听,这样可以达到mysql添加新表时将新表加入同步队列
    2. doris目前还不支持同步数据时同步修改表结构【据大佬说应该1.2+会支持】,不过cdc可以获取ddlsql,可以通过jdbc的方式连接doris去执行ddlsql,因为sql有点差异,需要转换才能执行,结合mysql新表,可以在ddl获取create 对doris进项建表
    3. 在将数据导入之doris时,速度导入过快都会出现导入失败,-235错误,可以使用控制读取binlog数量+window聚合 去批量导入
          如需要导入表B的数据有{"id":1,"name":"小明"},{"id":2,"name":"小红"},如果执行两次put显然时不太合理的,可以使用jsonArr的方式[{"id":1,"name":"小明"},{"id":2,"name":"小红"}]一次导入

代码

        python 带码不在赘述,git:GitHub - xiaofeicn/MysqlToDorisTable

        Flink CDC

          flink中需要感知新表,每日重启时获取doris 表A数据,并组装成databaseList,tableList的参数,代码如下,代码有注释

        FlinkCDCMysql2Doris.scala

  1. package com.xxxx.mysql2doris
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import com.zbkj.util.{DorisStreamLoad, FlinkCDCSyncETL, KafkaUtil, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema}
  4. import com.ververica.cdc.connectors.mysql.source.MySqlSource
  5. import com.ververica.cdc.connectors.mysql.table.StartupOptions
  6. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
  7. import com.zbkj.util.KafkaUtil.proper
  8. import net.sf.json.JSONObject
  9. import org.apache.flink.api.common.eventtime.WatermarkStrategy
  10. import org.apache.flink.api.common.restartstrategy.RestartStrategies
  11. import org.apache.flink.api.common.time.Time
  12. import org.apache.flink.api.common.typeinfo.BasicTypeInfo
  13. import org.apache.flink.streaming.api.CheckpointingMode
  14. import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils}
  15. import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
  16. import org.slf4j.{Logger, LoggerFactory}
  17. import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
  18. import org.apache.flink.api.java.typeutils.RowTypeInfo
  19. import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
  20. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
  21. import org.apache.kafka.connect.json.JsonConverterConfig
  22. import java.util.Properties
  23. import java.util.concurrent.TimeUnit
  24. import scala.collection.JavaConverters.asScalaIteratorConverter
  25. object FlinkCDCMysql2Doris {
  26. PropertiesManager.initUtil()
  27. val props: PropertiesUtil = PropertiesManager.getUtil
  28. val log: Logger = LoggerFactory.getLogger(this.getClass)
  29. def main(args: Array[String]): Unit = {
  30. val env = StreamExecutionEnvironment.getExecutionEnvironment
  31. // 并行度
  32. env.setParallelism(props.parallelism)
  33. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  34. /**
  35. * checkpoint的相关设置
  36. */
  37. // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
  38. env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE)
  39. // 设定Checkpoint超时时间,默认为10分钟
  40. env.getCheckpointConfig.setCheckpointTimeout(600000)
  41. /** 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多
  42. * 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒) */
  43. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
  44. // 默认情况下,只有一个检查点可以运行
  45. // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
  46. // env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
  47. /** 外部检查点
  48. * 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复 */
  49. env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  50. /** 如果有更近的保存点时,是否将作业回退到该检查点 */
  51. env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
  52. // 设置可以允许的checkpoint失败数
  53. env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
  54. //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
  55. env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
  56. /**
  57. * 重启策略的配置
  58. */
  59. // 重启3次,每次失败后等待10000毫秒
  60. // env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS)))
  61. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
  62. /**
  63. * 获取同步表配置
  64. * database table
  65. */
  66. val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
  67. .setDrivername("com.mysql.jdbc.Driver")
  68. .setDBUrl("jdbc:mysql://%s:%d/%s".format(props.doris_host, props.doris_port, props.sync_config_db))
  69. .setUsername(props.doris_user)
  70. .setPassword(props.doris_password)
  71. .setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table))
  72. .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
  73. .finish()).uid("inputMysql")
  74. val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName")
  75. // 模糊监听
  76. val tableName: DataStream[String] = inputMysql.map(line => line.getField(0).toString + ".*").uid("tableName")
  77. val producer = KafkaUtil.getProducer
  78. val databaseIter = databaseName.executeAndCollect().asScala
  79. val databaseList = databaseIter.toSet.mkString(",")
  80. val tableIter = tableName.executeAndCollect().asScala
  81. val tableList = tableIter.toSet.mkString(",")
  82. println("databaseList:", databaseList)
  83. println("tableList:", tableList)
  84. val customConverterConfigs = new java.util.HashMap[String, Object] {
  85. put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric")
  86. }
  87. /**
  88. *
  89. * mysql source for doris
  90. */
  91. val mySqlSource = MySqlSource.builder[String]()
  92. .hostname(props.rds_host)
  93. .port(props.rds_port)
  94. .databaseList(databaseList)
  95. .tableList(tableList)
  96. .username(props.rds_user)
  97. .password(props.rds_password)
  98. .serverId("11110")
  99. .splitSize(props.split_size)
  100. .fetchSize(props.fetch_size)
  101. // .startupOptions(StartupOptions.latest())
  102. // 全量读取
  103. .startupOptions(StartupOptions.initial())
  104. .includeSchemaChanges(true)
  105. // 发现新表,加入同步任务,需要在tableList中配置
  106. .scanNewlyAddedTableEnabled(true)
  107. .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)).build()
  108. val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  109. val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream")
  110. val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream")
  111. val ddlDataStream = FlinkCDCSyncETL.ddlFormat(ddlSqlStream)
  112. val dmlDataStream = FlinkCDCSyncETL.binLogETL(dmlStream)
  113. // ddlDataStream.print()
  114. //producer 为了在数据同步后通知分析任务
  115. val dorisStreamLoad = new DorisStreamLoad(props, producer)
  116. ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema")
  117. dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris")
  118. env.execute("Flink CDC Mysql To Doris With Initial")
  119. }
  120. case class dataLine(merge_type: String, db: String, table: String, data: String)
  121. }
FlinkCDCBinLogETL.scala
  1. package com.xxxx.util
  2. import net.sf.json.JSONObject
  3. import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
  4. import org.apache.flink.api.java.tuple.Tuple4
  5. import org.apache.flink.streaming.api.datastream.DataStream
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7. import scala.collection.mutable.ArrayBuffer
  8. import scala.util.matching.Regex
  9. object FlinkCDCSyncETL {
  10. def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = {
  11. /**
  12. * 根据不同日志类型 匹配load doris方式
  13. */
  14. val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => {
  15. var data: JSONObject = null
  16. var mergetype = "APPEND"
  17. val lineObj = JSONObject.fromObject(line)
  18. val source = lineObj.getJSONObject("source")
  19. val db = source.getString("db")
  20. val table = source.getString("table")
  21. if ("d" == lineObj.getString("op")) {
  22. val oo = lineObj.getJSONObject("before")
  23. data = lineObj.getJSONObject("before")
  24. mergetype = "DELETE"
  25. } else if ("u" == lineObj.getString("op")) {
  26. data = lineObj.getJSONObject("after")
  27. mergetype = "MERGE"
  28. } else if ("c" == lineObj.getString("op")) {
  29. data = lineObj.getJSONObject("after")
  30. } else if ("r" == lineObj.getString("op")) {
  31. data = lineObj.getJSONObject("after")
  32. mergetype = "APPEND"
  33. }
  34. new Tuple4[String, String, String, String](mergetype, db, table, data.toString)
  35. }).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {}))
  36. tupleData
  37. /**
  38. * 窗口聚合数据,将相同load方式,db,table的json 数据组合为长字符串,
  39. */
  40. val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2)
  41. .timeWindow(Time.milliseconds(1000))
  42. .reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3))
  43. byKeyData
  44. }
  45. def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = {
  46. val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => {
  47. try {
  48. val lineObj = JSONObject.fromObject(line)
  49. val historyRecord = JSONObject.fromObject(lineObj.getString("historyRecord"))
  50. val tableChangesArray = historyRecord.getJSONArray("tableChanges")
  51. val tableChanges = JSONObject.fromObject(tableChangesArray.getJSONObject(0))
  52. val tableChangeType = tableChanges.getString("type")
  53. var ddlSql = ""
  54. val table = tableChanges.optJSONObject("table")
  55. val primaryKeyColumnNames = table.getString("primaryKeyColumnNames").replace("[", "").replace("]", "").replace("\"", "")
  56. val columnsArray = table.getJSONArray("columns")
  57. // 建表转换
  58. if (tableChangeType == "CREATE") {
  59. val tableName = tableChanges.getString("id").replace("\"", "")
  60. val columnsArrayBuffer = ArrayBuffer[String]()
  61. columnsArray.forEach(line => {
  62. val columnJson = JSONObject.fromObject(line)
  63. val name = columnJson.getString("name")
  64. val typeName = columnJson.getString("typeName")
  65. val length = columnJson.optInt("length", 1)
  66. val scale = columnJson.optInt("scale", 2)
  67. val lastColumnType = matchColumnType(typeName, length, scale)
  68. val lastColumn = s"$name $lastColumnType"
  69. columnsArrayBuffer.+=(lastColumn)
  70. })
  71. // 对列重新排序,主键依次放在最前面,避免错误Key columns should be a ordered prefix of the scheme
  72. val keys = primaryKeyColumnNames.split(",")
  73. for (indexOfCol <- 0 until keys.length) {
  74. val col = keys(indexOfCol)
  75. var columnFormat = ""
  76. columnsArrayBuffer.foreach(column => {
  77. if (column.startsWith(col)) {
  78. columnFormat = column
  79. }
  80. })
  81. val index = columnsArrayBuffer.indexOf(columnFormat)
  82. columnsArrayBuffer.remove(index)
  83. columnsArrayBuffer.insert(indexOfCol, columnFormat)
  84. }
  85. val header = s"CREATE TABLE IF NOT EXISTS $tableName ("
  86. val end = s""") UNIQUE KEY($primaryKeyColumnNames) DISTRIBUTED BY HASH($primaryKeyColumnNames) BUCKETS 10 PROPERTIES ("replication_allocation" = "tag.location.default: 1")"""
  87. ddlSql = header + columnsArrayBuffer.mkString(",") + end
  88. } else if (tableChangeType == "ALTER") {
  89. var ddl = historyRecord.getString("ddl").replace("\r\n", " ")
  90. println(ddl)
  91. if (ddl.startsWith("RENAME")) {
  92. ddl = ddl.replace("`", "")
  93. val arr = ddl.split("")
  94. ddlSql = s"ALTER TABLE ${arr(2)} RENAME ${arr(4)}"
  95. } else if (ddl.contains("DROP COLUMN")) {
  96. ddlSql = ddl
  97. } else {
  98. val dbTableName = tableChanges.getString("id").replace("\"", "")
  99. val addColName = ddl.split(" ")(5).replace("`", "")
  100. var colTpe = ""
  101. columnsArray.forEach(line => {
  102. val columnJson = JSONObject.fromObject(line)
  103. if (columnJson.getString("name") == addColName) {
  104. val typeName = columnJson.getString("typeName")
  105. val length = columnJson.optInt("length", 1)
  106. val scale = columnJson.optInt("scale", 2)
  107. colTpe = matchColumnType(typeName, length, scale)
  108. }
  109. })
  110. if (ddl.contains("ADD COLUMN")) {
  111. ddlSql = s"ALTER TABLE $dbTableName ADD COLUMN $addColName $colTpe"
  112. } else {
  113. ddlSql = s"ALTER TABLE $dbTableName MODIFY COLUMN $addColName $colTpe"
  114. }
  115. }
  116. }
  117. println(ddlSql)
  118. ddlSql
  119. }
  120. catch {
  121. case ex: Exception => println(ex)
  122. "select 1"
  123. }
  124. })
  125. ddlStrDataStream
  126. }
  127. def showCapital(x: Option[String]): String = x match {
  128. case Some(s) => s
  129. case None => "?"
  130. }
  131. def matchColumnType(columnType: String, length: Int, scale: Int): String = {
  132. var returnColumnType = "VARCHAR(255)"
  133. columnType match {
  134. case "INT UNSIGNED" => returnColumnType = s"INT($length)"
  135. case "INT" => returnColumnType = s"INT($length)"
  136. case "TINYINT" => returnColumnType = s"TINYINT($length)"
  137. case "VARCHAR" => returnColumnType = s"VARCHAR(${length * 3})"
  138. case "BIGINT" => returnColumnType = s"BIGINT(${length})"
  139. case "TINYTEXT" => returnColumnType = s"TINYTEXT"
  140. case "LONGTEXT" => returnColumnType = s"STRING"
  141. case "TEXT" => returnColumnType = s"STRING"
  142. case "DECIMAL" => returnColumnType = s"DECIMAL($length,$scale)"
  143. case "VARBINARY" => returnColumnType = s"STRING"
  144. case "TIMESTAMP" => returnColumnType = s"STRING"
  145. case "ENUM" => returnColumnType = s"TINYINT"
  146. case "MEDIUMINT" => returnColumnType = s"INT"
  147. case "SMALLINT" => returnColumnType = s"SMALLINT"
  148. case "MEDIUMTEXT" => returnColumnType = s"STRING"
  149. case _ => returnColumnType = s"STRING"
  150. }
  151. returnColumnType
  152. }
  153. }
DorisStreamLoad.scala
  1. package com.xxxx.util
  2. import net.sf.json.JSONObject
  3. import net.sf.json.JSONArray
  4. import org.apache.http.HttpHeaders
  5. import org.apache.http.client.methods.HttpPut
  6. import org.apache.http.entity.StringEntity
  7. import org.apache.http.entity.BufferedHttpEntity
  8. import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}
  9. import org.apache.http.util.EntityUtils
  10. import org.slf4j.{Logger, LoggerFactory}
  11. import org.apache.commons.codec.binary.Base64
  12. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  13. import java.io.IOException
  14. import java.nio.charset.StandardCharsets
  15. import java.util.UUID
  16. class DorisStreamLoad(props: PropertiesUtil,producer:KafkaProducer[String, String]) extends Serializable {
  17. lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {
  18. override protected def isRedirectable(method: String): Boolean = {
  19. // If the connection target is FE, you need to deal with 307 redirect。
  20. true
  21. }
  22. })
  23. def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {
  24. val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"
  25. val arr = jsonData.split("=-=-=")
  26. val jsonArray = new JSONArray()
  27. for (line <- arr) {
  28. try {
  29. val js = JSONObject.fromObject(line)
  30. jsonArray.add(js)
  31. } catch {
  32. case e: Exception =>
  33. println(e)
  34. println(line)
  35. }
  36. }
  37. val jsonArrayStr = jsonArray.toString()
  38. val client = httpClientBuilder.build
  39. val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)
  40. try {
  41. val put = new HttpPut(loadUrlStr)
  42. put.removeHeaders(HttpHeaders.CONTENT_LENGTH)
  43. put.removeHeaders(HttpHeaders.TRANSFER_ENCODING)
  44. put.setHeader(HttpHeaders.EXPECT, "100-continue")
  45. put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader)
  46. val label = UUID.randomUUID.toString
  47. // You can set stream load related properties in the Header, here we set label and column_separator.
  48. put.setHeader("label", label)
  49. put.setHeader("merge_type", mergeType)
  50. // put.setHeader("two_phase_commit", "true")
  51. put.setHeader("column_separator", ",")
  52. put.setHeader("format", "json")
  53. put.setHeader("strip_outer_array", "true")
  54. put.setHeader("exec_mem_limit", "6442450944")
  55. val entity = new StringEntity(jsonArrayStr, "UTF-8")
  56. put.setEntity(entity)
  57. try {
  58. val response = client.execute(put)
  59. try {
  60. var loadResult = ""
  61. if (response.getEntity != null) {
  62. loadResult = EntityUtils.toString(response.getEntity)
  63. }
  64. val statusCode = response.getStatusLine.getStatusCode
  65. if (statusCode != 200) {
  66. throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult))
  67. }
  68. } finally if (response != null) {
  69. response.close()
  70. }
  71. }
  72. }
  73. finally
  74. if (client != null) client.close()
  75. }
  76. /**
  77. * Construct authentication information, the authentication method used by doris here is Basic Auth
  78. *
  79. */
  80. def basicAuthHeader: String = {
  81. val tobeEncode = props.doris_user + ":" + props.doris_password
  82. val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
  83. "Basic " + new String(encoded)
  84. }
  85. }
SinkDoris.scala
  1. package com.xxxx.util
  2. import org.apache.flink.api.java.tuple.Tuple4
  3. import com.zbkj.mysql2doris.FlinkCDCMysql2Doris.dataLine
  4. import net.sf.json.JSONObject
  5. import org.apache.flink.streaming.api.functions.sink.SinkFunction
  6. class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]] {
  7. // val dorisStreamLoad:DorisStreamLoadT=null
  8. /**
  9. * 在open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
  10. */
  11. // def open(parameters: Configuration): Unit = {
  12. // super
  13. // super.open(parameters);
  14. //
  15. // }
  16. /**
  17. * 每个元素的插入都要调用一次invoke()方法进行插入操作
  18. */
  19. override def invoke(value:Tuple4[String, String, String, String]): Unit = {
  20. dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2)
  21. val producer = KafkaUtil.getProducer
  22. val json = new JSONObject()
  23. json.put("db",value.f2)
  24. json.put("table",value.f3)
  25. KafkaUtil.sendKafkaMsg(producer, json.toString, "sync_table")
  26. }
  27. }
SinkSchema.scala
  1. package com.xxxx.util
  2. import org.apache.flink.configuration.Configuration
  3. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  4. import java.sql.{Connection, DriverManager, PreparedStatement}
  5. class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String] {
  6. var conn: Connection = _
  7. var ps : PreparedStatement = _
  8. var mysqlPool: MysqlPool = _
  9. override def open(parameters: Configuration): Unit = {
  10. super.open(parameters)
  11. mysqlPool = MysqlManager.getMysqlPool
  12. conn = mysqlPool.getConnection
  13. conn.setAutoCommit(false)
  14. }
  15. override def close(): Unit = {
  16. super.close()
  17. if (conn != null) {
  18. conn.close()
  19. }
  20. if (ps != null) {
  21. ps.close()
  22. }
  23. }
  24. override def invoke(sql: String, context: SinkFunction.Context): Unit = {
  25. super.invoke(sql, context)
  26. if (sql !="" && sql.nonEmpty){
  27. ps = conn.prepareStatement(sql)
  28. try {
  29. ps.execute()
  30. }catch {
  31. case ex:Exception=>println(ex)
  32. }
  33. conn.commit()
  34. }
  35. // conn.close()
  36. }
  37. }
PropertiesUtil.scala
  1. package com.xxxx.util
  2. import java.io.FileInputStream
  3. import java.util.Properties
  4. /**
  5. * propertiesUtil
  6. *
  7. */
  8. class PropertiesUtil extends Serializable {
  9. private val props = new Properties()
  10. var doris_host = ""
  11. var doris_port = 0
  12. var doris_user = ""
  13. var doris_password = ""
  14. var database_list = ""
  15. var table_list = ""
  16. var mysql_host = ""
  17. var mysql_port = 0
  18. var mysql_user = ""
  19. var mysql_password = ""
  20. var doris_load_host = ""
  21. var rds_host = ""
  22. var rds_port = 0
  23. var rds_user = ""
  24. var rds_password = ""
  25. var rds_database = ""
  26. // var sync_database_select_sql = ""
  27. // var sync_table_select_sql = ""
  28. // var sync_config_host = ""
  29. // var sync_config_port = 0
  30. // var sync_config_user = ""
  31. // var sync_config_password = ""
  32. var sync_config_db = ""
  33. var sync_config_table = ""
  34. var sync_redis_table = ""
  35. var address_table = ""
  36. var parallelism = 0
  37. var split_size = 0
  38. var fetch_size = 0
  39. var bootstrap_servers = ""
  40. var topic = ""
  41. var group_id = ""
  42. var offset_mode = ""
  43. // reids
  44. var redis_max_total: Int = 0
  45. var redis_max_idle: Int = 0
  46. var redis_min_idle: Int = 0
  47. var redis_host = ""
  48. var redis_port: Int = 0
  49. var redis_timeout: Int = 0
  50. var redis_password = ""
  51. var redis_db_index: Int = 0
  52. var prefix = "0"
  53. def init(filePath: String): Unit = {
  54. props.load(new FileInputStream(filePath))
  55. // hdfs
  56. doris_host = props.getProperty("doris.host")
  57. doris_port = props.getProperty("doris.port").toInt
  58. doris_user = props.getProperty("doris.user")
  59. doris_password = props.getProperty("doris.password")
  60. database_list = props.getProperty("database.list")
  61. table_list = props.getProperty("table.list")
  62. mysql_host = props.getProperty("mysql.host")
  63. mysql_port = props.getProperty("mysql.port").toInt
  64. mysql_user = props.getProperty("mysql.user")
  65. mysql_password = props.getProperty("mysql.password")
  66. doris_load_host = props.getProperty("doris.load.host")
  67. rds_host = props.getProperty("rds.host")
  68. rds_port = props.getProperty("rds.port").toInt
  69. rds_user = props.getProperty("rds.user")
  70. rds_password = props.getProperty("rds.password")
  71. rds_database = props.getProperty("rds.database")
  72. sync_config_db = props.getProperty("sync.config.db")
  73. sync_config_table = props.getProperty("sync.config.table")
  74. sync_redis_table = props.getProperty("sync.redis.table")
  75. address_table = props.getProperty("address.table")
  76. parallelism = props.getProperty("parallelism").toInt
  77. split_size = props.getProperty("split.size").toInt
  78. fetch_size = props.getProperty("fetch.size").toInt
  79. bootstrap_servers = props.getProperty("bootstrap.servers")
  80. topic = props.getProperty("topic")
  81. group_id = props.getProperty("group.id")
  82. offset_mode = props.getProperty("offset.mode")
  83. // reids
  84. redis_max_total = props.getProperty("redis.max.total").toInt
  85. redis_max_idle = props.getProperty("redis.max.idle").toInt
  86. redis_min_idle = props.getProperty("redis.min.idle").toInt
  87. redis_host = props.getProperty("redis.redis.host")
  88. redis_port = props.getProperty("redis.redis.port").toInt
  89. redis_timeout = props.getProperty("redis.redis.timeout").toInt
  90. redis_password = props.getProperty("redis.password")
  91. redis_db_index = props.getProperty("redis.db.index").toInt
  92. prefix = props.getProperty("redis.key.prefix")
  93. }
  94. def stringToInt(prop: String): Int = {
  95. try {
  96. prop.toInt
  97. } catch {
  98. case ex: Exception => {
  99. 0
  100. }
  101. }
  102. }
  103. }
  104. //惰性单例,真正计算时才初始化对象
  105. object PropertiesManager {
  106. @volatile private var propertiesUtil: PropertiesUtil = _
  107. def getUtil: PropertiesUtil = {
  108. propertiesUtil
  109. }
  110. def initUtil(): Unit = {
  111. var filePath = "config.properties"
  112. // filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties"
  113. filePath = "/opt/flink-1.13.6/job/mysql2doris/config.properties"
  114. if (propertiesUtil == null) {
  115. propertiesUtil = new PropertiesUtil
  116. }
  117. propertiesUtil.init(filePath)
  118. // propertiesUtil.evn = evn
  119. }
  120. }

若有疑问请留言或者 加入857技术社区

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/606582
推荐阅读
相关标签
  

闽ICP备14008679号