当前位置:   article > 正文

Appache Griffin 扩展Mysql数据源

griffin 扩展mysql数据源

概述

Apache Griffin定位为大数据的数据质量监控工具,支持批处理数据源hive、text文件、avro文件和实时数据源kafka,而一些以关系型数据库如mysql、oracle为存储的项目也同样需要可配置化的数据质量监控工具,所以扩展griffin的mysql数据源就可以为项目的数据质量监控提供多一种选择。

代码结构

从上一篇文章apache griffin 中已经介绍了griffin的特性、执行流程及其架构,本文主要介绍一下其代码结构及扩展数据源的简单实现,先了解一下代码结构:

代码主要分为measure、service、ui三部分,measure为spark定时任务代码;service为spring boot代码,做web端配置和监控界面;ui为前端angular js相关代码和资源。

扩展数据源主要实现代码在measure模块,下面以griffin项目中的demo读取avro数据源的批处理为实例介绍一下griffin如何读取配置和选择数据源:

  1. 批处理avro文件数据源配置、measure模块执行环境配置

环境配置文件: env-batch.json

  1. {
  2. # spark 配置
  3. "spark": {
  4. "log.level": "WARN",
  5. "config": {
  6. "spark.master": "local[*]"
  7. }
  8. },
  9. # 对比结果输出配置,console、hdfs、elasticsearch
  10. "sinks": [
  11. {
  12. "type": "CONSOLE",
  13. "config": {
  14. "max.log.lines": 10
  15. }
  16. },
  17. {
  18. "type": "HDFS",
  19. "config": {
  20. "path": "hdfs://localhost/griffin/batch/persist",
  21. "max.persist.lines": 10000,
  22. "max.lines.per.file": 10000
  23. }
  24. },
  25. {
  26. "type": "ELASTICSEARCH",
  27. "config": {
  28. "method": "post",
  29. "api": "http://10.148.181.248:39200/griffin/accuracy",
  30. "connection.timeout": "1m",
  31. "retry": 10
  32. }
  33. }
  34. ],
  35. "griffin.checkpoint": []
  36. }

数据源配置文件:config-batch.json

  1. {
  2. # 任务名称
  3. "name": "accu_batch",
  4. # 任务类型,batch 或 streaming
  5. "process.type": "batch",
  6. # 数据源 和 数据对比目标 配置
  7. "data.sources": [
  8. {
  9. "name": "source",
  10. "baseline": true,
  11. "connectors": [
  12. {
  13. "type": "avro",
  14. "version": "1.7",
  15. "config": {
  16. "file.name": "src/test/resources/users_info_src.avro"
  17. }
  18. }
  19. ]
  20. }, {
  21. "name": "target",
  22. "connectors": [
  23. {
  24. "type": "avro",
  25. "version": "1.7",
  26. "config": {
  27. "file.name": "src/test/resources/users_info_target.avro"
  28. }
  29. }
  30. ]
  31. }
  32. ],
  33. # 数据校验规则,这里选择 accuracy 准确性对比
  34. "evaluate.rule": {
  35. "rules": [
  36. {
  37. "dsl.type": "griffin-dsl",
  38. "dq.type": "accuracy",
  39. "out.dataframe.name": "accu",
  40. "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code"
  41. }
  42. ]
  43. },
  44. # 数据对比结果输出 控制台和es
  45. "sinks": ["CONSOLE","ELASTICSEARCH"]
  46. }
  1. measure 模块代码入口及简单说明
  1. package org.apache.griffin.measure
  2. import scala.reflect.ClassTag
  3. import scala.util.{Failure, Success, Try}
  4. import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
  5. import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
  6. import org.apache.griffin.measure.configuration.enums._
  7. import org.apache.griffin.measure.launch.DQApp
  8. import org.apache.griffin.measure.launch.batch.BatchDQApp
  9. import org.apache.griffin.measure.launch.streaming.StreamingDQApp
  10. /**
  11. * application entrance
  12. */
  13. object Application extends Loggable {
  14. def main(args: Array[String]): Unit = {
  15. info(args.toString)
  16. if (args.length < 2) {
  17. error("Usage: class <env-param> <dq-param>")
  18. sys.exit(-1)
  19. }
  20. // 配置运行参数读取 env-batch.json 和 config-batch.json
  21. val envParamFile = args(0)
  22. val dqParamFile = args(1)
  23. info(envParamFile)
  24. info(dqParamFile)
  25. // read param files
  26. val envParam = readParamFile[EnvConfig](envParamFile) match {
  27. case Success(p) => p
  28. case Failure(ex) =>
  29. error(ex.getMessage, ex)
  30. sys.exit(-2)
  31. }
  32. val dqParam = readParamFile[DQConfig](dqParamFile) match {
  33. case Success(p) => p
  34. case Failure(ex) =>
  35. error(ex.getMessage, ex)
  36. sys.exit(-2)
  37. }
  38. // 环境配置和数据源配置组合成 griffin配置
  39. val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
  40. // 根据数据源配置选择数据源
  41. // 从数据源配置 process.type 得到配置类型为 batch
  42. val procType = ProcessType(allParam.getDqConfig.getProcType)
  43. val dqApp: DQApp = procType match {
  44. case BatchProcessType => BatchDQApp(allParam)
  45. case StreamingProcessType => StreamingDQApp(allParam)
  46. case _ =>
  47. error(s"${procType} is unsupported process type!")
  48. sys.exit(-4)
  49. }
  50. startup
  51. // 初始化 griffin 定时任务执行环境
  52. // 具体代码见下个代码块,主要逻辑是创建 sparkSession 和注册griffin自定义的spark udf
  53. dqApp.init match {
  54. case Success(_) =>
  55. info("process init success")
  56. case Failure(ex) =>
  57. error(s"process init error: ${ex.getMessage}", ex)
  58. shutdown
  59. sys.exit(-5)
  60. }
  61. // 执行定时任务,这里根据配置是执行批处理任务
  62. val success = dqApp.run match {
  63. case Success(result) =>
  64. info("process run result: " + (if (result) "success" else "failed"))
  65. result
  66. case Failure(ex) =>
  67. error(s"process run error: ${ex.getMessage}", ex)
  68. if (dqApp.retryable) {
  69. throw ex
  70. } else {
  71. shutdown
  72. sys.exit(-5)
  73. }
  74. }
  75. // 关闭定时任务
  76. dqApp.close match {
  77. case Success(_) =>
  78. info("process end success")
  79. case Failure(ex) =>
  80. error(s"process end error: ${ex.getMessage}", ex)
  81. shutdown
  82. sys.exit(-5)
  83. }
  84. shutdown
  85. // 退出执行程序
  86. if (!success) {
  87. sys.exit(-5)
  88. }
  89. }
  90. private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
  91. val paramReader = ParamReaderFactory.getParamReader(file)
  92. paramReader.readConfig[T]
  93. }
  94. private def startup(): Unit = {
  95. }
  96. private def shutdown(): Unit = {
  97. }
  98. }

批处理任务处理类

  1. package org.apache.griffin.measure.launch.batch
  2. import java.util.Date
  3. import scala.util.Try
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.sql.{SparkSession, SQLContext}
  6. import org.apache.griffin.measure.configuration.dqdefinition._
  7. import org.apache.griffin.measure.configuration.enums._
  8. import org.apache.griffin.measure.context._
  9. import org.apache.griffin.measure.datasource.DataSourceFactory
  10. import org.apache.griffin.measure.job.builder.DQJobBuilder
  11. import org.apache.griffin.measure.launch.DQApp
  12. import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
  13. case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
  14. val envParam: EnvConfig = allParam.getEnvConfig
  15. val dqParam: DQConfig = allParam.getDqConfig
  16. val sparkParam = envParam.getSparkParam
  17. val metricName = dqParam.getName
  18. // val dataSourceParams = dqParam.dataSources
  19. // val dataSourceNames = dataSourceParams.map(_.name)
  20. val sinkParams = getSinkParams
  21. var sqlContext: SQLContext = _
  22. implicit var sparkSession: SparkSession = _
  23. def retryable: Boolean = false
  24. // 初始化并创建sparkSession、注册griffin自定义udf
  25. def init: Try[_] = Try {
  26. // build spark 2.0+ application context
  27. val conf = new SparkConf().setAppName(metricName)
  28. conf.setAll(sparkParam.getConfig)
  29. conf.set("spark.sql.crossJoin.enabled", "true")
  30. sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
  31. sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
  32. sqlContext = sparkSession.sqlContext
  33. // register udf
  34. GriffinUDFAgent.register(sqlContext)
  35. }
  36. // 定时任务执行方法
  37. def run: Try[Boolean] = Try {
  38. // start time
  39. val startTime = new Date().getTime
  40. val measureTime = getMeasureTime
  41. val contextId = ContextId(measureTime)
  42. // get data sources
  43. // 根据配置获取数据源,即config-batch.json的data.sources配置,读取avro文件数据,有source和target两个数据源
  44. val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
  45. // 数据源初始化
  46. dataSources.foreach(_.init)
  47. // 创建griffin执行上下文
  48. val dqContext: DQContext = DQContext(
  49. contextId, metricName, dataSources, sinkParams, BatchProcessType
  50. )(sparkSession)
  51. // 根据配置,输入结果到 console 和 elasticsearch
  52. val applicationId = sparkSession.sparkContext.applicationId
  53. dqContext.getSink().start(applicationId)
  54. // 创建数据检查对比job
  55. val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
  56. // 执行数据对比job,根据在web端配置的步骤执行,demo主要执行配置中的rule sql,将执行结果写入sink中
  57. val result = dqJob.execute(dqContext)
  58. // 打印本次检查结束时间
  59. val endTime = new Date().getTime
  60. dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")
  61. // 关闭griffin context
  62. dqContext.clean()
  63. // 输出结束标记
  64. dqContext.getSink().finish()
  65. result
  66. }
  67. def close: Try[_] = Try {
  68. sparkSession.close()
  69. sparkSession.stop()
  70. }
  71. }

到这里,对于measure的代码执行顺序已经做了一个简单说明,仔细看的同学不难发现,其实执行过程并不复杂,代码逻辑的比较清晰;

其中,本文关注的数据创建主要在:BatchDQApp 类的 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)

我们看下DataSourceFactory类的代码:

  1. package org.apache.griffin.measure.datasource
  2. import scala.util.Success
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.streaming.StreamingContext
  5. import org.apache.griffin.measure.Loggable
  6. import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
  7. import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
  8. import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
  9. object DataSourceFactory extends Loggable {
  10. def getDataSources(sparkSession: SparkSession,
  11. ssc: StreamingContext,
  12. dataSources: Seq[DataSourceParam]
  13. ): Seq[DataSource] = {
  14. dataSources.zipWithIndex.flatMap { pair =>
  15. val (param, index) = pair
  16. getDataSource(sparkSession, ssc, param, index)
  17. }
  18. }
  19. private def getDataSource(sparkSession: SparkSession,
  20. ssc: StreamingContext,
  21. dataSourceParam: DataSourceParam,
  22. index: Int
  23. ): Option[DataSource] = {
  24. val name = dataSourceParam.getName
  25. val connectorParams = dataSourceParam.getConnectors
  26. val timestampStorage = TimestampStorage()
  27. // streaming 数据缓存
  28. val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
  29. sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
  30. // 获取数源连接
  31. val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
  32. // 从连接工厂获取连接
  33. DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
  34. timestampStorage, streamingCacheClientOpt) match {
  35. case Success(connector) => Some(connector)
  36. case _ => None
  37. }
  38. }
  39. Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
  40. }
  41. }

DataConnectorFactory 数据源连接工厂

  1. package org.apache.griffin.measure.datasource.connector
  2. import scala.util.Try
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.streaming.StreamingContext
  5. import org.apache.griffin.measure.Loggable
  6. import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
  7. import org.apache.griffin.measure.datasource.TimestampStorage
  8. import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
  9. import org.apache.griffin.measure.datasource.connector.batch._
  10. import org.apache.griffin.measure.datasource.connector.streaming._
  11. object DataConnectorFactory extends Loggable {
  12. val HiveRegex = """^(?i)hive$""".r
  13. val AvroRegex = """^(?i)avro$""".r
  14. val TextDirRegex = """^(?i)text-dir$""".r
  15. val KafkaRegex = """^(?i)kafka$""".r
  16. val CustomRegex = """^(?i)custom$""".r
  17. /**
  18. * create data connector
  19. * @param sparkSession spark env
  20. * @param ssc spark streaming env
  21. * @param dcParam data connector param
  22. * @param tmstCache same tmst cache in one data source
  23. * @param streamingCacheClientOpt for streaming cache
  24. * @return data connector
  25. */
  26. def getDataConnector(sparkSession: SparkSession,
  27. ssc: StreamingContext,
  28. dcParam: DataConnectorParam,
  29. tmstCache: TimestampStorage,
  30. streamingCacheClientOpt: Option[StreamingCacheClient]
  31. ): Try[DataConnector] = {
  32. val conType = dcParam.getType
  33. val version = dcParam.getVersion
  34. Try {
  35. // 数据源映射
  36. conType match {
  37. case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
  38. case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
  39. case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
  40. case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
  41. case KafkaRegex() =>
  42. getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
  43. case _ => throw new Exception("connector creation error!")
  44. }
  45. }
  46. }
  47. private def getStreamingDataConnector(sparkSession: SparkSession,
  48. ssc: StreamingContext,
  49. dcParam: DataConnectorParam,
  50. tmstCache: TimestampStorage,
  51. streamingCacheClientOpt: Option[StreamingCacheClient]
  52. ): StreamingDataConnector = {
  53. if (ssc == null) throw new Exception("streaming context is null!")
  54. val conType = dcParam.getType
  55. val version = dcParam.getVersion
  56. conType match {
  57. case KafkaRegex() =>
  58. getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
  59. case _ => throw new Exception("streaming connector creation error!")
  60. }
  61. }
  62. // 自定义数据源标识方法
  63. private def getCustomConnector(session: SparkSession,
  64. context: StreamingContext,
  65. param: DataConnectorParam,
  66. storage: TimestampStorage,
  67. maybeClient: Option[StreamingCacheClient]): DataConnector = {
  68. val className = param.getConfig("class").asInstanceOf[String]
  69. val cls = Class.forName(className)
  70. if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
  71. val ctx = BatchDataConnectorContext(session, param, storage)
  72. val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext])
  73. meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
  74. } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
  75. val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient)
  76. val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext])
  77. meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
  78. } else {
  79. throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector")
  80. }
  81. }
  82. private def getKafkaDataConnector(sparkSession: SparkSession,
  83. ssc: StreamingContext,
  84. dcParam: DataConnectorParam,
  85. tmstCache: TimestampStorage,
  86. streamingCacheClientOpt: Option[StreamingCacheClient]
  87. ): KafkaStreamingDataConnector = {
  88. val KeyType = "key.type"
  89. val ValueType = "value.type"
  90. val config = dcParam.getConfig
  91. val keyType = config.getOrElse(KeyType, "java.lang.String").toString
  92. val valueType = config.getOrElse(ValueType, "java.lang.String").toString
  93. (keyType, valueType) match {
  94. case ("java.lang.String", "java.lang.String") =>
  95. KafkaStreamingStringDataConnector(
  96. sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
  97. case _ =>
  98. throw new Exception("not supported type kafka data connector")
  99. }
  100. }
  101. }

看到这里,相信大家都已经知道数据源创建的方法,这里对数据源配置做一个映射,运行时得到相应的数据,demo选择avro数据源,我们接着看看AvroBatchDataConnector的实现:

  1. package org.apache.griffin.measure.datasource.connector.batch
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
  4. import org.apache.griffin.measure.context.TimeRange
  5. import org.apache.griffin.measure.datasource.TimestampStorage
  6. import org.apache.griffin.measure.utils.HdfsUtil
  7. import org.apache.griffin.measure.utils.ParamUtil._
  8. /**
  9. * batch data connector for avro file
  10. */
  11. case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
  12. dcParam: DataConnectorParam,
  13. timestampStorage: TimestampStorage
  14. ) extends BatchDataConnector {
  15. val config = dcParam.getConfig
  16. val FilePath = "file.path"
  17. val FileName = "file.name"
  18. val filePath = config.getString(FilePath, "")
  19. val fileName = config.getString(FileName, "")
  20. val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
  21. private def pathPrefix(): Boolean = {
  22. filePath.nonEmpty
  23. }
  24. private def fileExist(): Boolean = {
  25. HdfsUtil.existPath(concreteFileFullPath)
  26. }
  27. def data(ms: Long): (Option[DataFrame], TimeRange) = {
  28. val dfOpt = try {
  29. val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
  30. val dfOpt = Some(df)
  31. val preDfOpt = preProcess(dfOpt, ms)
  32. preDfOpt
  33. } catch {
  34. case e: Throwable =>
  35. error(s"load avro file ${concreteFileFullPath} fails", e)
  36. None
  37. }
  38. val tmsts = readTmst(ms)
  39. (dfOpt, TimeRange(ms, tmsts))
  40. }
  41. }

跟着代码可以看到 AvroBatchDataConnector 实现了 DataConnector 接口,主要实现了data 从文件获取数据的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)

类似的,我们看看griffin默认数据源hive的数据源实现方式:

  1. package org.apache.griffin.measure.datasource.connector.batch
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
  4. import org.apache.griffin.measure.context.TimeRange
  5. import org.apache.griffin.measure.datasource.TimestampStorage
  6. import org.apache.griffin.measure.utils.ParamUtil._
  7. /**
  8. * batch data connector for hive table
  9. */
  10. case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
  11. dcParam: DataConnectorParam,
  12. timestampStorage: TimestampStorage
  13. ) extends BatchDataConnector {
  14. val config = dcParam.getConfig
  15. val Database = "database"
  16. val TableName = "table.name"
  17. val Where = "where"
  18. val database = config.getString(Database, "default")
  19. val tableName = config.getString(TableName, "")
  20. val whereString = config.getString(Where, "")
  21. val concreteTableName = s"${database}.${tableName}"
  22. val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
  23. def data(ms: Long): (Option[DataFrame], TimeRange) = {
  24. val dfOpt = try {
  25. val dtSql = dataSql
  26. info(dtSql)
  27. val df = sparkSession.sql(dtSql)
  28. val dfOpt = Some(df)
  29. val preDfOpt = preProcess(dfOpt, ms)
  30. preDfOpt
  31. } catch {
  32. case e: Throwable =>
  33. error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
  34. None
  35. }
  36. val tmsts = readTmst(ms)
  37. (dfOpt, TimeRange(ms, tmsts))
  38. }
  39. private def tableExistsSql(): String = {
  40. // s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
  41. s"tableName LIKE '${tableName}'"
  42. }
  43. private def metaDataSql(): String = {
  44. s"DESCRIBE ${concreteTableName}"
  45. }
  46. private def dataSql(): String = {
  47. val tableClause = s"SELECT * FROM ${concreteTableName}"
  48. if (wheres.length > 0) {
  49. val clauses = wheres.map { w =>
  50. s"${tableClause} WHERE ${w}"
  51. }
  52. clauses.mkString(" UNION ALL ")
  53. } else tableClause
  54. }
  55. }

hive数据源连接的实现是不是看上去比较简单,从配置文件中得到源表、目标表和对比sql,由sparkSession.sql执行val df = sparkSession.sql(dtSql),返回对比结果数据; 熟悉spark的同学看到这里,大概已经想到,扩展一个mysql数据源已经不是很难的事情了,因为spark sql支持mysql数据源。

扩展MySQL数据源思路

  1. 在配置文件中添加mysql配置
  2. DataConnectorFactory添加相应的数据源映射
  3. 新增MySQLBatchDataConnector实现BatchDataConnector接口
  4. 考虑分库分表数据源读取方式

由于各种原因,实现代码及demo下回补上。

by 赖泽坤@vipshop.com

相关资源: https://github.com/apache/griffin

转载于:https://my.oschina.net/u/939952/blog/3067750

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/523459
推荐阅读
相关标签
  

闽ICP备14008679号