概述
Apache Griffin定位为大数据的数据质量监控工具,支持批处理数据源hive、text文件、avro文件和实时数据源kafka,而一些以关系型数据库如mysql、oracle为存储的项目也同样需要可配置化的数据质量监控工具,所以扩展griffin的mysql数据源就可以为项目的数据质量监控提供多一种选择。
代码结构
从上一篇文章apache griffin 中已经介绍了griffin的特性、执行流程及其架构,本文主要介绍一下其代码结构及扩展数据源的简单实现,先了解一下代码结构:
代码主要分为measure、service、ui三部分,measure为spark定时任务代码;service为spring boot代码,做web端配置和监控界面;ui为前端angular js相关代码和资源。
扩展数据源主要实现代码在measure模块,下面以griffin项目中的demo读取avro数据源的批处理为实例介绍一下griffin如何读取配置和选择数据源:
- 批处理avro文件数据源配置、measure模块执行环境配置
环境配置文件: env-batch.json
- {
- # spark 配置
- "spark": {
- "log.level": "WARN",
- "config": {
- "spark.master": "local[*]"
- }
- },
-
- # 对比结果输出配置,console、hdfs、elasticsearch
- "sinks": [
- {
- "type": "CONSOLE",
- "config": {
- "max.log.lines": 10
- }
- },
- {
- "type": "HDFS",
- "config": {
- "path": "hdfs://localhost/griffin/batch/persist",
- "max.persist.lines": 10000,
- "max.lines.per.file": 10000
- }
- },
- {
- "type": "ELASTICSEARCH",
- "config": {
- "method": "post",
- "api": "http://10.148.181.248:39200/griffin/accuracy",
- "connection.timeout": "1m",
- "retry": 10
- }
- }
- ],
-
- "griffin.checkpoint": []
- }
数据源配置文件:config-batch.json
- {
- # 任务名称
- "name": "accu_batch",
- # 任务类型,batch 或 streaming
- "process.type": "batch",
- # 数据源 和 数据对比目标 配置
- "data.sources": [
- {
- "name": "source",
- "baseline": true,
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_src.avro"
- }
- }
- ]
- }, {
- "name": "target",
- "connectors": [
- {
- "type": "avro",
- "version": "1.7",
- "config": {
- "file.name": "src/test/resources/users_info_target.avro"
- }
- }
- ]
- }
- ],
- # 数据校验规则,这里选择 accuracy 准确性对比
- "evaluate.rule": {
- "rules": [
- {
- "dsl.type": "griffin-dsl",
- "dq.type": "accuracy",
- "out.dataframe.name": "accu",
- "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code"
- }
- ]
- },
- # 数据对比结果输出 控制台和es
- "sinks": ["CONSOLE","ELASTICSEARCH"]
- }
- measure 模块代码入口及简单说明
- package org.apache.griffin.measure
-
- import scala.reflect.ClassTag
- import scala.util.{Failure, Success, Try}
-
- import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
- import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
- import org.apache.griffin.measure.configuration.enums._
- import org.apache.griffin.measure.launch.DQApp
- import org.apache.griffin.measure.launch.batch.BatchDQApp
- import org.apache.griffin.measure.launch.streaming.StreamingDQApp
-
-
- /**
- * application entrance
- */
- object Application extends Loggable {
-
- def main(args: Array[String]): Unit = {
- info(args.toString)
- if (args.length < 2) {
- error("Usage: class <env-param> <dq-param>")
- sys.exit(-1)
- }
-
- // 配置运行参数读取 env-batch.json 和 config-batch.json
- val envParamFile = args(0)
- val dqParamFile = args(1)
-
- info(envParamFile)
- info(dqParamFile)
-
- // read param files
- val envParam = readParamFile[EnvConfig](envParamFile) match {
- case Success(p) => p
- case Failure(ex) =>
- error(ex.getMessage, ex)
- sys.exit(-2)
- }
- val dqParam = readParamFile[DQConfig](dqParamFile) match {
- case Success(p) => p
- case Failure(ex) =>
- error(ex.getMessage, ex)
- sys.exit(-2)
- }
-
- // 环境配置和数据源配置组合成 griffin配置
- val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
-
- // 根据数据源配置选择数据源
- // 从数据源配置 process.type 得到配置类型为 batch
- val procType = ProcessType(allParam.getDqConfig.getProcType)
- val dqApp: DQApp = procType match {
- case BatchProcessType => BatchDQApp(allParam)
- case StreamingProcessType => StreamingDQApp(allParam)
- case _ =>
- error(s"${procType} is unsupported process type!")
- sys.exit(-4)
- }
-
- startup
-
- // 初始化 griffin 定时任务执行环境
- // 具体代码见下个代码块,主要逻辑是创建 sparkSession 和注册griffin自定义的spark udf
- dqApp.init match {
- case Success(_) =>
- info("process init success")
- case Failure(ex) =>
- error(s"process init error: ${ex.getMessage}", ex)
- shutdown
- sys.exit(-5)
- }
-
- // 执行定时任务,这里根据配置是执行批处理任务
- val success = dqApp.run match {
- case Success(result) =>
- info("process run result: " + (if (result) "success" else "failed"))
- result
-
- case Failure(ex) =>
- error(s"process run error: ${ex.getMessage}", ex)
-
- if (dqApp.retryable) {
- throw ex
- } else {
- shutdown
- sys.exit(-5)
- }
- }
-
- // 关闭定时任务
- dqApp.close match {
- case Success(_) =>
- info("process end success")
- case Failure(ex) =>
- error(s"process end error: ${ex.getMessage}", ex)
- shutdown
- sys.exit(-5)
- }
-
- shutdown
- // 退出执行程序
- if (!success) {
- sys.exit(-5)
- }
- }
-
- private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = {
- val paramReader = ParamReaderFactory.getParamReader(file)
- paramReader.readConfig[T]
- }
-
- private def startup(): Unit = {
- }
-
- private def shutdown(): Unit = {
- }
-
- }
批处理任务处理类
- package org.apache.griffin.measure.launch.batch
-
- import java.util.Date
-
- import scala.util.Try
-
- import org.apache.spark.SparkConf
- import org.apache.spark.sql.{SparkSession, SQLContext}
-
- import org.apache.griffin.measure.configuration.dqdefinition._
- import org.apache.griffin.measure.configuration.enums._
- import org.apache.griffin.measure.context._
- import org.apache.griffin.measure.datasource.DataSourceFactory
- import org.apache.griffin.measure.job.builder.DQJobBuilder
- import org.apache.griffin.measure.launch.DQApp
- import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
-
-
- case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
-
- val envParam: EnvConfig = allParam.getEnvConfig
- val dqParam: DQConfig = allParam.getDqConfig
-
- val sparkParam = envParam.getSparkParam
- val metricName = dqParam.getName
- // val dataSourceParams = dqParam.dataSources
- // val dataSourceNames = dataSourceParams.map(_.name)
- val sinkParams = getSinkParams
-
- var sqlContext: SQLContext = _
-
- implicit var sparkSession: SparkSession = _
-
- def retryable: Boolean = false
-
- // 初始化并创建sparkSession、注册griffin自定义udf
- def init: Try[_] = Try {
- // build spark 2.0+ application context
- val conf = new SparkConf().setAppName(metricName)
- conf.setAll(sparkParam.getConfig)
- conf.set("spark.sql.crossJoin.enabled", "true")
- sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
- sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
- sqlContext = sparkSession.sqlContext
-
- // register udf
- GriffinUDFAgent.register(sqlContext)
- }
-
- // 定时任务执行方法
- def run: Try[Boolean] = Try {
- // start time
- val startTime = new Date().getTime
-
- val measureTime = getMeasureTime
- val contextId = ContextId(measureTime)
-
- // get data sources
- // 根据配置获取数据源,即config-batch.json的data.sources配置,读取avro文件数据,有source和target两个数据源
- val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
- // 数据源初始化
- dataSources.foreach(_.init)
-
- // 创建griffin执行上下文
- val dqContext: DQContext = DQContext(
- contextId, metricName, dataSources, sinkParams, BatchProcessType
- )(sparkSession)
-
- // 根据配置,输入结果到 console 和 elasticsearch
- val applicationId = sparkSession.sparkContext.applicationId
- dqContext.getSink().start(applicationId)
-
- // 创建数据检查对比job
- val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
-
- // 执行数据对比job,根据在web端配置的步骤执行,demo主要执行配置中的rule sql,将执行结果写入sink中
- val result = dqJob.execute(dqContext)
-
- // 打印本次检查结束时间
- val endTime = new Date().getTime
- dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")
-
- // 关闭griffin context
- dqContext.clean()
-
- // 输出结束标记
- dqContext.getSink().finish()
-
- result
- }
-
- def close: Try[_] = Try {
- sparkSession.close()
- sparkSession.stop()
- }
-
- }
-
到这里,对于measure的代码执行顺序已经做了一个简单说明,仔细看的同学不难发现,其实执行过程并不复杂,代码逻辑的比较清晰;
其中,本文关注的数据创建主要在:BatchDQApp 类的 val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
;
我们看下DataSourceFactory类的代码:
- package org.apache.griffin.measure.datasource
-
- import scala.util.Success
-
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.streaming.StreamingContext
-
- import org.apache.griffin.measure.Loggable
- import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
- import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
- import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
-
-
- object DataSourceFactory extends Loggable {
-
- def getDataSources(sparkSession: SparkSession,
- ssc: StreamingContext,
- dataSources: Seq[DataSourceParam]
- ): Seq[DataSource] = {
- dataSources.zipWithIndex.flatMap { pair =>
- val (param, index) = pair
- getDataSource(sparkSession, ssc, param, index)
- }
- }
-
- private def getDataSource(sparkSession: SparkSession,
- ssc: StreamingContext,
- dataSourceParam: DataSourceParam,
- index: Int
- ): Option[DataSource] = {
- val name = dataSourceParam.getName
- val connectorParams = dataSourceParam.getConnectors
- val timestampStorage = TimestampStorage()
-
- // streaming 数据缓存
- val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
- sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
-
- // 获取数源连接
- val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
- // 从连接工厂获取连接
- DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
- timestampStorage, streamingCacheClientOpt) match {
- case Success(connector) => Some(connector)
- case _ => None
- }
- }
-
- Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
- }
-
- }
DataConnectorFactory 数据源连接工厂
- package org.apache.griffin.measure.datasource.connector
-
- import scala.util.Try
-
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.streaming.StreamingContext
-
- import org.apache.griffin.measure.Loggable
- import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
- import org.apache.griffin.measure.datasource.TimestampStorage
- import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
- import org.apache.griffin.measure.datasource.connector.batch._
- import org.apache.griffin.measure.datasource.connector.streaming._
-
-
- object DataConnectorFactory extends Loggable {
-
- val HiveRegex = """^(?i)hive$""".r
- val AvroRegex = """^(?i)avro$""".r
- val TextDirRegex = """^(?i)text-dir$""".r
-
- val KafkaRegex = """^(?i)kafka$""".r
-
- val CustomRegex = """^(?i)custom$""".r
-
- /**
- * create data connector
- * @param sparkSession spark env
- * @param ssc spark streaming env
- * @param dcParam data connector param
- * @param tmstCache same tmst cache in one data source
- * @param streamingCacheClientOpt for streaming cache
- * @return data connector
- */
- def getDataConnector(sparkSession: SparkSession,
- ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TimestampStorage,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ): Try[DataConnector] = {
- val conType = dcParam.getType
- val version = dcParam.getVersion
- Try {
- // 数据源映射
- conType match {
- case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
- case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
- case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
- case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- case KafkaRegex() =>
- getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- case _ => throw new Exception("connector creation error!")
- }
- }
- }
-
- private def getStreamingDataConnector(sparkSession: SparkSession,
- ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TimestampStorage,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ): StreamingDataConnector = {
- if (ssc == null) throw new Exception("streaming context is null!")
- val conType = dcParam.getType
- val version = dcParam.getVersion
- conType match {
- case KafkaRegex() =>
- getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- case _ => throw new Exception("streaming connector creation error!")
- }
- }
-
- // 自定义数据源标识方法
- private def getCustomConnector(session: SparkSession,
- context: StreamingContext,
- param: DataConnectorParam,
- storage: TimestampStorage,
- maybeClient: Option[StreamingCacheClient]): DataConnector = {
- val className = param.getConfig("class").asInstanceOf[String]
- val cls = Class.forName(className)
- if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
- val ctx = BatchDataConnectorContext(session, param, storage)
- val meth = cls.getDeclaredMethod("apply", classOf[BatchDataConnectorContext])
- meth.invoke(null, ctx).asInstanceOf[BatchDataConnector]
- } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
- val ctx = StreamingDataConnectorContext(session, context, param, storage, maybeClient)
- val meth = cls.getDeclaredMethod("apply", classOf[StreamingDataConnectorContext])
- meth.invoke(null, ctx).asInstanceOf[StreamingDataConnector]
- } else {
- throw new ClassCastException(s"$className should extend BatchDataConnector or StreamingDataConnector")
- }
- }
-
- private def getKafkaDataConnector(sparkSession: SparkSession,
- ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TimestampStorage,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ): KafkaStreamingDataConnector = {
- val KeyType = "key.type"
- val ValueType = "value.type"
- val config = dcParam.getConfig
- val keyType = config.getOrElse(KeyType, "java.lang.String").toString
- val valueType = config.getOrElse(ValueType, "java.lang.String").toString
-
- (keyType, valueType) match {
- case ("java.lang.String", "java.lang.String") =>
- KafkaStreamingStringDataConnector(
- sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- case _ =>
- throw new Exception("not supported type kafka data connector")
- }
- }
- }
看到这里,相信大家都已经知道数据源创建的方法,这里对数据源配置做一个映射,运行时得到相应的数据,demo选择avro数据源,我们接着看看AvroBatchDataConnector的实现:
- package org.apache.griffin.measure.datasource.connector.batch
-
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
- import org.apache.griffin.measure.context.TimeRange
- import org.apache.griffin.measure.datasource.TimestampStorage
- import org.apache.griffin.measure.utils.HdfsUtil
- import org.apache.griffin.measure.utils.ParamUtil._
-
- /**
- * batch data connector for avro file
- */
- case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
- dcParam: DataConnectorParam,
- timestampStorage: TimestampStorage
- ) extends BatchDataConnector {
-
- val config = dcParam.getConfig
-
- val FilePath = "file.path"
- val FileName = "file.name"
-
- val filePath = config.getString(FilePath, "")
- val fileName = config.getString(FileName, "")
-
- val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
-
- private def pathPrefix(): Boolean = {
- filePath.nonEmpty
- }
-
- private def fileExist(): Boolean = {
- HdfsUtil.existPath(concreteFileFullPath)
- }
-
- def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load avro file ${concreteFileFullPath} fails", e)
- None
- }
- val tmsts = readTmst(ms)
- (dfOpt, TimeRange(ms, tmsts))
- }
- }
-
跟着代码可以看到 AvroBatchDataConnector 实现了 DataConnector 接口,主要实现了data 从文件获取数据的方法 val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
;
类似的,我们看看griffin默认数据源hive的数据源实现方式:
- package org.apache.griffin.measure.datasource.connector.batch
-
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
- import org.apache.griffin.measure.context.TimeRange
- import org.apache.griffin.measure.datasource.TimestampStorage
- import org.apache.griffin.measure.utils.ParamUtil._
-
- /**
- * batch data connector for hive table
- */
- case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
- dcParam: DataConnectorParam,
- timestampStorage: TimestampStorage
- ) extends BatchDataConnector {
-
- val config = dcParam.getConfig
-
- val Database = "database"
- val TableName = "table.name"
- val Where = "where"
-
- val database = config.getString(Database, "default")
- val tableName = config.getString(TableName, "")
- val whereString = config.getString(Where, "")
-
- val concreteTableName = s"${database}.${tableName}"
- val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
-
- def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dtSql = dataSql
- info(dtSql)
- val df = sparkSession.sql(dtSql)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e)
- None
- }
- val tmsts = readTmst(ms)
- (dfOpt, TimeRange(ms, tmsts))
- }
-
-
- private def tableExistsSql(): String = {
- // s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
- s"tableName LIKE '${tableName}'"
- }
-
- private def metaDataSql(): String = {
- s"DESCRIBE ${concreteTableName}"
- }
-
- private def dataSql(): String = {
- val tableClause = s"SELECT * FROM ${concreteTableName}"
- if (wheres.length > 0) {
- val clauses = wheres.map { w =>
- s"${tableClause} WHERE ${w}"
- }
- clauses.mkString(" UNION ALL ")
- } else tableClause
- }
-
- }
-
hive数据源连接的实现是不是看上去比较简单,从配置文件中得到源表、目标表和对比sql,由sparkSession.sql执行val df = sparkSession.sql(dtSql)
,返回对比结果数据; 熟悉spark的同学看到这里,大概已经想到,扩展一个mysql数据源已经不是很难的事情了,因为spark sql支持mysql数据源。
扩展MySQL数据源思路
- 在配置文件中添加mysql配置
- DataConnectorFactory添加相应的数据源映射
- 新增MySQLBatchDataConnector实现BatchDataConnector接口
- 考虑分库分表数据源读取方式
由于各种原因,实现代码及demo下回补上。