赞
踩
python 带码不在赘述,git:GitHub - xiaofeicn/MysqlToDorisTable
flink中需要感知新表,每日重启时获取doris 表A数据,并组装成databaseList,tableList的参数,代码如下,代码有注释
FlinkCDCMysql2Doris.scala
- package com.xxxx.mysql2doris
-
-
- import org.apache.flink.streaming.api.TimeCharacteristic
- import com.zbkj.util.{DorisStreamLoad, FlinkCDCSyncETL, KafkaUtil, PropertiesManager, PropertiesUtil, SinkDoris, SinkSchema}
- import com.ververica.cdc.connectors.mysql.source.MySqlSource
- import com.ververica.cdc.connectors.mysql.table.StartupOptions
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
- import com.zbkj.util.KafkaUtil.proper
- import net.sf.json.JSONObject
- import org.apache.flink.api.common.eventtime.WatermarkStrategy
- import org.apache.flink.api.common.restartstrategy.RestartStrategies
- import org.apache.flink.api.common.time.Time
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo
- import org.apache.flink.streaming.api.CheckpointingMode
- import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, DataStreamUtils}
- import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
- import org.slf4j.{Logger, LoggerFactory}
- import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
- import org.apache.flink.api.java.typeutils.RowTypeInfo
- import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
- import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
- import org.apache.kafka.connect.json.JsonConverterConfig
-
- import java.util.Properties
- import java.util.concurrent.TimeUnit
- import scala.collection.JavaConverters.asScalaIteratorConverter
-
- object FlinkCDCMysql2Doris {
-
- PropertiesManager.initUtil()
- val props: PropertiesUtil = PropertiesManager.getUtil
- val log: Logger = LoggerFactory.getLogger(this.getClass)
-
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // 并行度
- env.setParallelism(props.parallelism)
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
- /**
- * checkpoint的相关设置
- */
- // 启用检查点,指定触发checkpoint的时间间隔(单位:毫秒,默认500毫秒),默认情况是不开启的
- env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE)
- // 设定Checkpoint超时时间,默认为10分钟
- env.getCheckpointConfig.setCheckpointTimeout(600000)
-
- /** 设定两个Checkpoint之间的最小时间间隔,防止出现例如状态数据过大而导致Checkpoint执行时间过长,从而导致Checkpoint积压过多
- * 最终Flink应用密切触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能(单位:毫秒) */
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
- // 默认情况下,只有一个检查点可以运行
- // 根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
- // env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
- /** 外部检查点
- * 不会在任务正常停止的过程中清理掉检查点数据,而是会一直保存在外部系统介质中,另外也可以通过从外部检查点中对任务进行恢复 */
- env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
-
- /** 如果有更近的保存点时,是否将作业回退到该检查点 */
- env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
- // 设置可以允许的checkpoint失败数
- env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
- //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
- env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
-
- /**
- * 重启策略的配置
- */
- // 重启3次,每次失败后等待10000毫秒
- // env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(3, TimeUnit.MINUTES), Time.of(30, TimeUnit.SECONDS)))
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
-
- /**
- * 获取同步表配置
- * database table
- */
- val inputMysql = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername("com.mysql.jdbc.Driver")
- .setDBUrl("jdbc:mysql://%s:%d/%s".format(props.doris_host, props.doris_port, props.sync_config_db))
- .setUsername(props.doris_user)
- .setPassword(props.doris_password)
- .setQuery("select member_id,sync_table from %s.%s".format(props.sync_config_db, props.sync_config_table))
- .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
- .finish()).uid("inputMysql")
-
-
- val databaseName: DataStream[String] = inputMysql.map(line => line.getField(0).toString).uid("databaseName")
- // 模糊监听
- val tableName: DataStream[String] = inputMysql.map(line => line.getField(0).toString + ".*").uid("tableName")
- val producer = KafkaUtil.getProducer
-
- val databaseIter = databaseName.executeAndCollect().asScala
- val databaseList = databaseIter.toSet.mkString(",")
-
- val tableIter = tableName.executeAndCollect().asScala
- val tableList = tableIter.toSet.mkString(",")
- println("databaseList:", databaseList)
- println("tableList:", tableList)
- val customConverterConfigs = new java.util.HashMap[String, Object] {
- put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric")
- }
- /**
- *
- * mysql source for doris
- */
- val mySqlSource = MySqlSource.builder[String]()
- .hostname(props.rds_host)
- .port(props.rds_port)
- .databaseList(databaseList)
- .tableList(tableList)
- .username(props.rds_user)
- .password(props.rds_password)
- .serverId("11110")
- .splitSize(props.split_size)
- .fetchSize(props.fetch_size)
- // .startupOptions(StartupOptions.latest())
- // 全量读取
- .startupOptions(StartupOptions.initial())
- .includeSchemaChanges(true)
- // 发现新表,加入同步任务,需要在tableList中配置
- .scanNewlyAddedTableEnabled(true)
- .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)).build()
-
- val dataStreamSource: DataStreamSource[String] = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
- val ddlSqlStream: DataStream[String] = dataStreamSource.filter(line => line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("ddlSqlStream")
-
- val dmlStream: DataStream[String] = dataStreamSource.filter(line => !line.contains("historyRecord") && !line.contains("CHANGE COLUMN")).uid("dmlStream")
-
- val ddlDataStream = FlinkCDCSyncETL.ddlFormat(ddlSqlStream)
- val dmlDataStream = FlinkCDCSyncETL.binLogETL(dmlStream)
- // ddlDataStream.print()
-
- //producer 为了在数据同步后通知分析任务
- val dorisStreamLoad = new DorisStreamLoad(props, producer)
- ddlDataStream.addSink(new SinkSchema(props)).name("ALTER TABLE TO DORIS").uid("SinkSchema")
- dmlDataStream.addSink(new SinkDoris(dorisStreamLoad)).name("Data TO DORIS").uid("SinkDoris")
- env.execute("Flink CDC Mysql To Doris With Initial")
-
-
- }
-
- case class dataLine(merge_type: String, db: String, table: String, data: String)
-
-
- }
FlinkCDCBinLogETL.scala
- package com.xxxx.util
-
- import net.sf.json.JSONObject
- import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
- import org.apache.flink.api.java.tuple.Tuple4
- import org.apache.flink.streaming.api.datastream.DataStream
- import org.apache.flink.streaming.api.windowing.time.Time
-
- import scala.collection.mutable.ArrayBuffer
- import scala.util.matching.Regex
-
- object FlinkCDCSyncETL {
-
- def binLogETL(dataStreamSource: DataStream[String]): DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = {
- /**
- * 根据不同日志类型 匹配load doris方式
- */
- val tupleData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = dataStreamSource.map(line => {
- var data: JSONObject = null
- var mergetype = "APPEND"
- val lineObj = JSONObject.fromObject(line)
-
- val source = lineObj.getJSONObject("source")
- val db = source.getString("db")
- val table = source.getString("table")
- if ("d" == lineObj.getString("op")) {
- val oo = lineObj.getJSONObject("before")
- data = lineObj.getJSONObject("before")
- mergetype = "DELETE"
- } else if ("u" == lineObj.getString("op")) {
- data = lineObj.getJSONObject("after")
- mergetype = "MERGE"
- } else if ("c" == lineObj.getString("op")) {
- data = lineObj.getJSONObject("after")
- } else if ("r" == lineObj.getString("op")) {
- data = lineObj.getJSONObject("after")
- mergetype = "APPEND"
- }
- new Tuple4[String, String, String, String](mergetype, db, table, data.toString)
- }).returns(TypeInformation.of(new TypeHint[Tuple4[String, String, String, String]] {}))
- tupleData
- /**
- * 窗口聚合数据,将相同load方式,db,table的json 数据组合为长字符串,
- */
- val byKeyData: DataStream[org.apache.flink.api.java.tuple.Tuple4[String, String, String, String]] = tupleData.keyBy(0, 1, 2)
- .timeWindow(Time.milliseconds(1000))
- .reduce((itemFirst, itemSecond) => new Tuple4(itemFirst.f0, itemFirst.f1, itemFirst.f2, itemFirst.f3 + "=-=-=" + itemSecond.f3))
- byKeyData
- }
-
- def ddlFormat(ddlDataStream: DataStream[String]): DataStream[String] = {
-
- val ddlStrDataStream: DataStream[String] = ddlDataStream.map(line => {
- try {
- val lineObj = JSONObject.fromObject(line)
- val historyRecord = JSONObject.fromObject(lineObj.getString("historyRecord"))
- val tableChangesArray = historyRecord.getJSONArray("tableChanges")
- val tableChanges = JSONObject.fromObject(tableChangesArray.getJSONObject(0))
- val tableChangeType = tableChanges.getString("type")
- var ddlSql = ""
-
- val table = tableChanges.optJSONObject("table")
- val primaryKeyColumnNames = table.getString("primaryKeyColumnNames").replace("[", "").replace("]", "").replace("\"", "")
- val columnsArray = table.getJSONArray("columns")
- // 建表转换
- if (tableChangeType == "CREATE") {
- val tableName = tableChanges.getString("id").replace("\"", "")
- val columnsArrayBuffer = ArrayBuffer[String]()
- columnsArray.forEach(line => {
- val columnJson = JSONObject.fromObject(line)
- val name = columnJson.getString("name")
- val typeName = columnJson.getString("typeName")
- val length = columnJson.optInt("length", 1)
- val scale = columnJson.optInt("scale", 2)
- val lastColumnType = matchColumnType(typeName, length, scale)
- val lastColumn = s"$name $lastColumnType"
- columnsArrayBuffer.+=(lastColumn)
- })
-
- // 对列重新排序,主键依次放在最前面,避免错误Key columns should be a ordered prefix of the scheme
- val keys = primaryKeyColumnNames.split(",")
- for (indexOfCol <- 0 until keys.length) {
- val col = keys(indexOfCol)
- var columnFormat = ""
- columnsArrayBuffer.foreach(column => {
- if (column.startsWith(col)) {
- columnFormat = column
- }
-
- })
- val index = columnsArrayBuffer.indexOf(columnFormat)
- columnsArrayBuffer.remove(index)
- columnsArrayBuffer.insert(indexOfCol, columnFormat)
-
- }
-
-
- val header = s"CREATE TABLE IF NOT EXISTS $tableName ("
- val end = s""") UNIQUE KEY($primaryKeyColumnNames) DISTRIBUTED BY HASH($primaryKeyColumnNames) BUCKETS 10 PROPERTIES ("replication_allocation" = "tag.location.default: 1")"""
- ddlSql = header + columnsArrayBuffer.mkString(",") + end
- } else if (tableChangeType == "ALTER") {
- var ddl = historyRecord.getString("ddl").replace("\r\n", " ")
- println(ddl)
- if (ddl.startsWith("RENAME")) {
- ddl = ddl.replace("`", "")
- val arr = ddl.split("")
- ddlSql = s"ALTER TABLE ${arr(2)} RENAME ${arr(4)}"
- } else if (ddl.contains("DROP COLUMN")) {
-
- ddlSql = ddl
- } else {
- val dbTableName = tableChanges.getString("id").replace("\"", "")
- val addColName = ddl.split(" ")(5).replace("`", "")
- var colTpe = ""
- columnsArray.forEach(line => {
-
- val columnJson = JSONObject.fromObject(line)
-
- if (columnJson.getString("name") == addColName) {
- val typeName = columnJson.getString("typeName")
- val length = columnJson.optInt("length", 1)
- val scale = columnJson.optInt("scale", 2)
- colTpe = matchColumnType(typeName, length, scale)
- }
-
- })
- if (ddl.contains("ADD COLUMN")) {
-
- ddlSql = s"ALTER TABLE $dbTableName ADD COLUMN $addColName $colTpe"
- } else {
-
- ddlSql = s"ALTER TABLE $dbTableName MODIFY COLUMN $addColName $colTpe"
- }
-
- }
- }
-
- println(ddlSql)
- ddlSql
- }
- catch {
- case ex: Exception => println(ex)
- "select 1"
- }
-
- })
- ddlStrDataStream
- }
-
- def showCapital(x: Option[String]): String = x match {
- case Some(s) => s
- case None => "?"
- }
-
- def matchColumnType(columnType: String, length: Int, scale: Int): String = {
- var returnColumnType = "VARCHAR(255)"
- columnType match {
- case "INT UNSIGNED" => returnColumnType = s"INT($length)"
- case "INT" => returnColumnType = s"INT($length)"
- case "TINYINT" => returnColumnType = s"TINYINT($length)"
- case "VARCHAR" => returnColumnType = s"VARCHAR(${length * 3})"
- case "BIGINT" => returnColumnType = s"BIGINT(${length})"
- case "TINYTEXT" => returnColumnType = s"TINYTEXT"
- case "LONGTEXT" => returnColumnType = s"STRING"
- case "TEXT" => returnColumnType = s"STRING"
- case "DECIMAL" => returnColumnType = s"DECIMAL($length,$scale)"
- case "VARBINARY" => returnColumnType = s"STRING"
- case "TIMESTAMP" => returnColumnType = s"STRING"
- case "ENUM" => returnColumnType = s"TINYINT"
- case "MEDIUMINT" => returnColumnType = s"INT"
- case "SMALLINT" => returnColumnType = s"SMALLINT"
- case "MEDIUMTEXT" => returnColumnType = s"STRING"
- case _ => returnColumnType = s"STRING"
- }
- returnColumnType
- }
-
- }
DorisStreamLoad.scala
- package com.xxxx.util
-
- import net.sf.json.JSONObject
- import net.sf.json.JSONArray
- import org.apache.http.HttpHeaders
- import org.apache.http.client.methods.HttpPut
- import org.apache.http.entity.StringEntity
- import org.apache.http.entity.BufferedHttpEntity
- import org.apache.http.impl.client.{DefaultRedirectStrategy, HttpClientBuilder, HttpClients}
- import org.apache.http.util.EntityUtils
- import org.slf4j.{Logger, LoggerFactory}
- import org.apache.commons.codec.binary.Base64
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-
- import java.io.IOException
- import java.nio.charset.StandardCharsets
- import java.util.UUID
-
- class DorisStreamLoad(props: PropertiesUtil,producer:KafkaProducer[String, String]) extends Serializable {
-
-
- lazy val httpClientBuilder: HttpClientBuilder = HttpClients.custom.setRedirectStrategy(new DefaultRedirectStrategy() {
- override protected def isRedirectable(method: String): Boolean = {
- // If the connection target is FE, you need to deal with 307 redirect。
- true
- }
- })
-
-
- def loadJson(jsonData: String, mergeType: String, db: String, table: String): Unit = try {
- val loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"
- val arr = jsonData.split("=-=-=")
- val jsonArray = new JSONArray()
- for (line <- arr) {
- try {
- val js = JSONObject.fromObject(line)
- jsonArray.add(js)
- } catch {
- case e: Exception =>
- println(e)
- println(line)
- }
-
- }
- val jsonArrayStr = jsonArray.toString()
- val client = httpClientBuilder.build
- val loadUrlStr = String.format(loadUrlPattern, props.doris_load_host, db, table)
- try {
- val put = new HttpPut(loadUrlStr)
- put.removeHeaders(HttpHeaders.CONTENT_LENGTH)
- put.removeHeaders(HttpHeaders.TRANSFER_ENCODING)
- put.setHeader(HttpHeaders.EXPECT, "100-continue")
- put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader)
- val label = UUID.randomUUID.toString
- // You can set stream load related properties in the Header, here we set label and column_separator.
- put.setHeader("label", label)
- put.setHeader("merge_type", mergeType)
- // put.setHeader("two_phase_commit", "true")
- put.setHeader("column_separator", ",")
- put.setHeader("format", "json")
- put.setHeader("strip_outer_array", "true")
- put.setHeader("exec_mem_limit", "6442450944")
- val entity = new StringEntity(jsonArrayStr, "UTF-8")
-
- put.setEntity(entity)
-
- try {
- val response = client.execute(put)
- try {
- var loadResult = ""
- if (response.getEntity != null) {
- loadResult = EntityUtils.toString(response.getEntity)
- }
- val statusCode = response.getStatusLine.getStatusCode
- if (statusCode != 200) {
- throw new IOException("Stream load failed. status: %s load result: %s".format(statusCode, loadResult))
- }
-
- } finally if (response != null) {
- response.close()
- }
- }
- }
- finally
- if (client != null) client.close()
- }
-
- /**
- * Construct authentication information, the authentication method used by doris here is Basic Auth
- *
- */
- def basicAuthHeader: String = {
- val tobeEncode = props.doris_user + ":" + props.doris_password
- val encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8))
- "Basic " + new String(encoded)
- }
-
-
- }
SinkDoris.scala
- package com.xxxx.util
-
-
- import org.apache.flink.api.java.tuple.Tuple4
- import com.zbkj.mysql2doris.FlinkCDCMysql2Doris.dataLine
- import net.sf.json.JSONObject
- import org.apache.flink.streaming.api.functions.sink.SinkFunction
-
-
- class SinkDoris(dorisStreamLoad:DorisStreamLoad) extends SinkFunction[Tuple4[String, String, String, String]] {
-
- // val dorisStreamLoad:DorisStreamLoadT=null
- /**
- * 在open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
- */
- // def open(parameters: Configuration): Unit = {
- // super
- // super.open(parameters);
- //
- // }
-
- /**
- * 每个元素的插入都要调用一次invoke()方法进行插入操作
- */
- override def invoke(value:Tuple4[String, String, String, String]): Unit = {
-
- dorisStreamLoad.loadJson(value.f3,value.f0,value.f1,value.f2)
- val producer = KafkaUtil.getProducer
- val json = new JSONObject()
- json.put("db",value.f2)
- json.put("table",value.f3)
- KafkaUtil.sendKafkaMsg(producer, json.toString, "sync_table")
-
- }
-
- }
SinkSchema.scala
- package com.xxxx.util
-
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
-
- import java.sql.{Connection, DriverManager, PreparedStatement}
-
- class SinkSchema(props:PropertiesUtil) extends RichSinkFunction[String] {
- var conn: Connection = _
- var ps : PreparedStatement = _
- var mysqlPool: MysqlPool = _
-
- override def open(parameters: Configuration): Unit = {
- super.open(parameters)
- mysqlPool = MysqlManager.getMysqlPool
- conn = mysqlPool.getConnection
- conn.setAutoCommit(false)
- }
-
- override def close(): Unit = {
- super.close()
- if (conn != null) {
- conn.close()
- }
- if (ps != null) {
- ps.close()
- }
- }
-
- override def invoke(sql: String, context: SinkFunction.Context): Unit = {
- super.invoke(sql, context)
- if (sql !="" && sql.nonEmpty){
- ps = conn.prepareStatement(sql)
- try {
- ps.execute()
- }catch {
- case ex:Exception=>println(ex)
- }
-
- conn.commit()
-
- }
- // conn.close()
- }
-
- }
PropertiesUtil.scala
- package com.xxxx.util
-
- import java.io.FileInputStream
- import java.util.Properties
-
- /**
- * propertiesUtil
- *
- */
- class PropertiesUtil extends Serializable {
-
-
- private val props = new Properties()
-
- var doris_host = ""
- var doris_port = 0
- var doris_user = ""
- var doris_password = ""
-
- var database_list = ""
- var table_list = ""
-
- var mysql_host = ""
- var mysql_port = 0
- var mysql_user = ""
- var mysql_password = ""
- var doris_load_host = ""
-
- var rds_host = ""
- var rds_port = 0
- var rds_user = ""
- var rds_password = ""
- var rds_database = ""
-
- // var sync_database_select_sql = ""
- // var sync_table_select_sql = ""
- // var sync_config_host = ""
- // var sync_config_port = 0
- // var sync_config_user = ""
- // var sync_config_password = ""
- var sync_config_db = ""
- var sync_config_table = ""
- var sync_redis_table = ""
- var address_table = ""
-
- var parallelism = 0
- var split_size = 0
- var fetch_size = 0
-
- var bootstrap_servers = ""
- var topic = ""
- var group_id = ""
- var offset_mode = ""
-
- // reids
- var redis_max_total: Int = 0
- var redis_max_idle: Int = 0
- var redis_min_idle: Int = 0
- var redis_host = ""
- var redis_port: Int = 0
- var redis_timeout: Int = 0
- var redis_password = ""
- var redis_db_index: Int = 0
- var prefix = "0"
-
-
- def init(filePath: String): Unit = {
- props.load(new FileInputStream(filePath))
-
- // hdfs
- doris_host = props.getProperty("doris.host")
- doris_port = props.getProperty("doris.port").toInt
- doris_user = props.getProperty("doris.user")
- doris_password = props.getProperty("doris.password")
-
- database_list = props.getProperty("database.list")
- table_list = props.getProperty("table.list")
-
- mysql_host = props.getProperty("mysql.host")
- mysql_port = props.getProperty("mysql.port").toInt
- mysql_user = props.getProperty("mysql.user")
- mysql_password = props.getProperty("mysql.password")
- doris_load_host = props.getProperty("doris.load.host")
-
-
- rds_host = props.getProperty("rds.host")
- rds_port = props.getProperty("rds.port").toInt
- rds_user = props.getProperty("rds.user")
- rds_password = props.getProperty("rds.password")
- rds_database = props.getProperty("rds.database")
- sync_config_db = props.getProperty("sync.config.db")
- sync_config_table = props.getProperty("sync.config.table")
- sync_redis_table = props.getProperty("sync.redis.table")
- address_table = props.getProperty("address.table")
-
-
- parallelism = props.getProperty("parallelism").toInt
- split_size = props.getProperty("split.size").toInt
- fetch_size = props.getProperty("fetch.size").toInt
-
- bootstrap_servers = props.getProperty("bootstrap.servers")
- topic = props.getProperty("topic")
- group_id = props.getProperty("group.id")
- offset_mode = props.getProperty("offset.mode")
-
-
- // reids
- redis_max_total = props.getProperty("redis.max.total").toInt
- redis_max_idle = props.getProperty("redis.max.idle").toInt
- redis_min_idle = props.getProperty("redis.min.idle").toInt
- redis_host = props.getProperty("redis.redis.host")
- redis_port = props.getProperty("redis.redis.port").toInt
- redis_timeout = props.getProperty("redis.redis.timeout").toInt
- redis_password = props.getProperty("redis.password")
- redis_db_index = props.getProperty("redis.db.index").toInt
-
- prefix = props.getProperty("redis.key.prefix")
-
-
- }
-
- def stringToInt(prop: String): Int = {
- try {
- prop.toInt
- } catch {
- case ex: Exception => {
- 0
- }
- }
- }
- }
-
- //惰性单例,真正计算时才初始化对象
- object PropertiesManager {
- @volatile private var propertiesUtil: PropertiesUtil = _
-
- def getUtil: PropertiesUtil = {
- propertiesUtil
- }
-
- def initUtil(): Unit = {
- var filePath = "config.properties"
- // filePath = this.getClass.getResource("/").toString.replace("file:", "") + "config.properties"
- filePath = "/opt/flink-1.13.6/job/mysql2doris/config.properties"
- if (propertiesUtil == null) {
- propertiesUtil = new PropertiesUtil
- }
- propertiesUtil.init(filePath)
- // propertiesUtil.evn = evn
- }
- }
若有疑问请留言或者 加入857技术社区
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。