赞
踩
apache Griffin是一个开源的大数据数据质量解决方案,它支持批处理和流模式两种数据质量检测方式,可以从不同维度(比如离线任务执行完毕后检查源端和目标端的数据数量是否一致、源表的数据空值数量等)度量数据资产,从而提升数据的准确度、可信度。
官方网站的介绍:Big Data Quality Solution For Batch and Streaming 官方介绍:http://griffin.apache.org/#about_page
引入官方文档
意思就是Apache Griffn具备提供明确的数据质量的定义域,这个通常覆盖了大多数数据质量的问题,同时能够支持用户自定义数据质量的标准,通过扩展DSL(Apache Griffn定义),用户能够自定义扩展自己的数据定义功能
官方文档
1,对于数据质量的定义,用户可以通过Apache Griffin UI功能,对于他们关注的数据进行质量定义,例如准确性,完整性,及时性等
2,数据指标计算,Apache Griffin基于数据质量的维度定义,从流模式(kafka模型),批处理(定时功能)的方式抽取元数据进行计算,
3,数据质量结果落盘,数据质量报告作为度量将被逐出指定的目标。
4,apache Griffin提供了易于在ApacheGriffin平台上提供任何新的数据质量要求并编写综合逻辑以定义其数据质量的插件扩展。
各个部分的主要职能:
1,注册数据,把想要检测数据质量的数据源注册到griffin。
2,配置度量模型,可以从数据质量维度来定义模型,如:精确度、完整性、及时性、唯一性等。
3,配置定时任务提交spark集群,定时检查数据。
4,在门户界面上查看指标,分析数据质量校验结果。
步骤略(jdk版本要求1.8以上)
步骤略
(1)下载hadoop版本 :https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/core/ 2.7.7版本
(2)上传到/opt/hadoop,没有hadoop目录可以自己创建 mkdir /opt/hadoop(也可以自行创建其他目录)
tar hadoop-2.7.7.tar.gz
vi /etc/profile
追加Hadoop目录
export HADOOP_HOME=/opt/hadoop/hadoop-2.7.7
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
source /etc/profile
追加hadoop的jdk环境变量‘
cd $HADOOP_HOME/etc/hadoop
vi hadoop-env.sh
在文件中追加JDK环境变量
export JAVA_HOME=/usr/local/jdk(实际自己的jdk部署目录)
编辑core-site.xml文件
cd $HADOOP_HOME/etc/hadoop/conf
vi core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
编辑hdfs-site.xml文件
<configuration> <property> <name>dfs.name.dir</name> <value>/usr/hadoop/hdfs/name</value> <description>namenode上存储hdfs名字空间元数据 </description> </property> <property> <name>dfs.data.dir</name> <value>/usr/hadoop/hdfs/data</value> <description>datanode上数据块的物理存储位置</description> </property> <!-- 设置hdfs副本数量 --> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
配置mapred-site.xml,刚开始安装的时候文件名是mapred-site.xml.template,重命名为mapred-site.xml
<configuration>
<!-- 通知框架MR使用YARN -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
配置yarn-site.xml
<configuration>
<!-- reducer取数据的方式是mapreduce_shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
参考 :https://www.cnblogs.com/caoxb/p/11333741.html
参考: https://blog.csdn.net/k393393/article/details/92440892
参考:添加链接描述
参考:https://blog.csdn.net/fiery_heart/article/details/85265585
基于kafka的时候需要
http://griffin.apache.org/docs/quickstart.html
本地化举例演示:
(1)访问Apache Griffin可视化界面数据
(2)设置指标模型界面
(3)配置源数据和目标数据,以及对应的指标模型结果数据
(4)按照步骤,配置引擎结果
(5),配置任务的执行Job
(6),点击保存
设置Job任务:
(1)job任务配置页面设置
http://griffin.apache.org/docs/usecases.html (待分析,这个我们不会使用流数据源处理)
https://github.com/apache/griffin 基于griffin- 0.4.0-rc0版本
个人github:https://github.com/zcswl7961/apache-griffin-expand
源码模块:
service:spring boot代码,做web配置 和监控界面服务端数据
measure: scala 代码,Spark定时任务代码
ui:前端界面
数据依赖配置模块:
application.properties
env:流和批处理
(1)首先是由前端进行作业任务保存之后,调用JobController的 POST /jobs方法,判断对应的是为批处理还是流处理作业任务,如果为批处理,创建BatchJob数据,然后保存本地的quartz的job
同时执行jobService.addJob(triggerKey, batchJob, BATCH);方法,创建定时任务,执行SparkSubmitJob job作业
initParam(jd);初始化相关参数信息,包括从JobDetail中获取measure,jobInstance,获取livy.url的配置信息,
setLivyConf(); 设置livy任务提交的相关参数赋值给livyConfMap实例,主要是对于sparkProperties.json文件的解析,同时追加了一个raw参数
saveJobInstance(jd);通过livy提交spark任务,同时将当前的任务执行历史存入到本地quartz库中,进入到saveJobInstance方法中,首先执行post2Livy方法 ,首先设置了livy任务提交的HttpEntity
最终的livy任务提交中,调用的接口是 url:http://192.168.239.171:8998/batches ,body信息为:
{ "file":"hdfs://localhost:8020/griffin/griffin-measure.jar", "className":"org.apache.griffin.measure.Application", "name":"griffin", "queue":"default", "numExecutors":2, "executorCores":1, "driverMemory":"1g", "executorMemory":"1g", "conf":{ "spark.yarn.dist.files":"hdfs://localhost:8020/home/spark_conf/hive-site.xml" }, "files":[ ], "args":[ "{ "spark" : { "log.level" : "WARN" }, "sinks" : [ { "type" : "CONSOLE", "config" : { "max.log.lines" : 10 } }, { "type" : "HDFS", "config" : { "path" : "hdfs://localhost:8020/griffin/persist", "max.persist.lines" : 10000, "max.lines.per.file" : 10000 } }, { "type" : "ELASTICSEARCH", "config" : { "method" : "post", "api" : "http://localhost:9200/griffin/accuracy", "connection.timeout" : "1m", "retry" : 10 } } ], "griffin.checkpoint" : [ ] }", "{ "measure.type" : "griffin", "id" : 3355, "name" : "schedule-job-zcg", "owner" : "test", "description" : "test", "deleted" : false, "timestamp" : 1569477360000, "dq.type" : "ACCURACY", "sinks" : [ "ELASTICSEARCH", "HDFS" ], "process.type" : "BATCH", "data.sources" : [ { "id" : 3358, "name" : "source", "connectors" : [ { "id" : 3359, "name" : "source1569548839003", "type" : "HIVE", "version" : "1.2", "predicates" : [ ], "data.unit" : "1day", "data.time.zone" : "", "config" : { "database" : "griffin_demo", "table.name" : "demo_src", "where" : "dt='20190927' AND hour = '09'" } } ], "baseline" : false }, { "id" : 3360, "name" : "target", "connectors" : [ { "id" : 3361, "name" : "target1569548846717", "type" : "HIVE", "version" : "1.2", "predicates" : [ ], "data.unit" : "1day", "data.time.zone" : "", "config" : { "database" : "griffin_demo", "table.name" : "demo_tgt", "where" : "dt='20190927' AND hour = '09'" } } ], "baseline" : false } ], "evaluate.rule" : { "id" : 3356, "rules" : [ { "id" : 3357, "rule" : "source.id=target.id", "dsl.type" : "griffin-dsl", "dq.type" : "ACCURACY", "out.dataframe.name" : "accuracy" } ] }, "measure.type" : "griffin" }", "raw,raw" ] }
livy接收到service提交的任务之后,提交到spark,spark接受的到任务之后,进行执行,首先是获取hadoop中配置的fileName:hdfs://localhost:8020/griffin/griffin-measure.jar,通过获取对应的className进行执行任务调度
首先measuer计算是依赖于measuer.jar ,同时spark通过访问hadoop中的上传的measuer.jar进行执行,这个配置是在griffin源码中的sparkProperties.json的配置信息中。
在去讲解源码之前,首先大致介绍一下Spark和Hadoop
首先,Hadoop是为了解决大数据存储和大数据分析的一套开源的分布式基础架构,
Hadoop有两大核心:HDFS和MapReducer
HDFS 就像一个传统的分级文件系统,可以进行创建、删除、移动或重命名文件或文件夹等操作,与 Linux 文件系统类似。
基础的文件操作命令:
MapReduce:MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳)”以及它们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。
当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
mapReduce的执行示例:
MapReduce的执行示例
Spark是用于大规模数据处理的统一分析引擎。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
Spark的基本运行流程:
进入到meauser模块中,执行Application.scala类,首先是获取启动类传递的两个参数args
val envParamFile = args(0)
val dqParamFile = args(1)
envParamFile:表示对应环境配置信息,包括对应的spark的日志级别,数据源的输出目的地,
{ //对应的spark的日志级别 "spark":{ "log.level":"WARN" }, //数据源的输出目的地 "sinks":[ { "type":"CONSOLE", "config":{ "max.log.lines":10 } }, { "type":"HDFS", "config":{ "path":"hdfs://localhost:8020/griffin/persist", "max.persist.lines":10000, "max.lines.per.file":10000 } }, { "type":"ELASTICSEARCH", "config":{ "method":"post", "api":"http://localhost:9200/griffin/accuracy", "connection.timeout":"1m", "retry":10 } } ], "griffin.checkpoint":[ ] }
dbParamFile:表示对应的执行任务的数据配置,包括对应的数据源的配置,计算规则信息
{ "measure.type":"griffin", "id":3355, "name":"schedule-job-zcg", "owner":"test", "description":"test", "deleted":false, "timestamp":1569492180000, "dq.type":"ACCURACY", "sinks":[ "ELASTICSEARCH", "HDFS" ], "process.type":"BATCH", "data.sources":[ { "id":3358, "name":"source", "connectors":[ { "id":3359, "name":"source1569548839003", "type":"HIVE", "version":"1.2", "predicates":[ ], "data.unit":"1day", "data.time.zone":"", "config":{ "database":"griffin_demo", "table.name":"demo_src", "where":"dt='20190927' AND hour = '09'" } } ], "baseline":false }, { "id":3360, "name":"target", "connectors":[ { "id":3361, "name":"target1569548846717", "type":"HIVE", "version":"1.2", "predicates":[ ], "data.unit":"1day", "data.time.zone":"", "config":{ "database":"griffin_demo", "table.name":"demo_tgt", "where":"dt='20190927' AND hour = '09'" } } ], "baseline":false } ], "evaluate.rule":{ "id":3356, "rules":[ { "id":3357, "rule":"source.id=target.id", "dsl.type":"griffin-dsl", "dq.type":"ACCURACY", "out.dataframe.name":"accuracy" } ] } }
Application.scala核心代码:
object Application extends Loggable { def main(args: Array[String]): Unit = { // info(args.toString) val args = new Array[String](2) // 测试代码 args(0) = "{\n \"spark\":{\n \"log.level\":\"WARN\",\n \"config\":{\n \"spark" + ".master\":\"local[*]\"\n }\n },\n \"sinks\":[\n {\n \"type\":\"CONSOLE\",\n " + " \"config\":{\n \"max.log.lines\":10\n }\n },\n {\n " + "\"type\":\"HDFS\",\n \"config\":{\n " + "\"path\":\"hdfs://localhost:8020/griffin/batch/persist\",\n \"max.persist" + ".lines\":10000,\n \"max.lines.per.file\":10000\n }\n },\n {\n " + "\"type\":\"ELASTICSEARCH\",\n \"config\":{\n \"method\":\"post\",\n " + "\"api\":\"http://192.168.239.171:9200/griffin/accuracy\",\n \"connection" + ".timeout\":\"1m\",\n \"retry\":10\n }\n }\n ],\n \"griffin" + ".checkpoint\":[\n\n ]\n}"; args(1) = "{\n \"name\":\"accu_batch\",\n \"process.type\":\"batch\",\n \"data" + ".sources\":[\n {\n \"name\":\"source\",\n \"baseline\":true,\n " + "\"connectors\":[\n {\n \"type\":\"avro\",\n \"version\":\"1.7\"," + "\n \"config\":{\n \"file.name\":\"src/test/resources/users_info_src" + ".avro\"\n }\n }\n ]\n },\n {\n \"name\":\"target\",\n " + " \"connectors\":[\n {\n \"type\":\"avro\",\n \"version\":\"1.7\"," + "\n \"config\":{\n \"file.name\":\"src/test/resources/users_info_target" + ".avro\"\n }\n }\n ]\n }\n ],\n \"evaluate.rule\":{\n " + "\"rules\":[\n {\n \"dsl.type\":\"griffin-dsl\",\n \"dq" + ".type\":\"accuracy\",\n \"out.dataframe.name\":\"accu\",\n \"rule\":\"source" + ".user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND " + "source.last_name = target.last_name AND source.address = target.address AND source.email =" + " target.email AND source.phone = target.phone AND source.post_code = target.post_code\"\n " + " }\n ]\n },\n \"sinks\":[\n \"CONSOLE\",\n \"ELASTICSEARCH\"\n ]\n}"; if (args.length < 2) { error("Usage: class <env-param> <dq-param>") sys.exit(-1) } val envParamFile = args(0) val dqParamFile = args(1) info(envParamFile) info(dqParamFile) // read param files // args(0)信息,将其转换成对应的EnvConfig对象, val envParam = readParamFile[EnvConfig](envParamFile) match { case Success(p) => p case Failure(ex) => error(ex.getMessage, ex) sys.exit(-2) } // args(2)信息,将其转换成对应的DQConfig配置信息 val dqParam = readParamFile[DQConfig](dqParamFile) match { case Success(p) => p case Failure(ex) => error(ex.getMessage, ex) sys.exit(-2) } val allParam: GriffinConfig = GriffinConfig(envParam, dqParam) // choose process // 选择对应的进程对象进行执行,这里面的就是BatchDQApp val procType = ProcessType(allParam.getDqConfig.getProcType) val dqApp: DQApp = procType match { case BatchProcessType => BatchDQApp(allParam) case StreamingProcessType => StreamingDQApp(allParam) case _ => error(s"${procType} is unsupported process type!") sys.exit(-4) } startup // (1)初始化griffin定时任务执行环境 // 具体代码见下个代码块,主要逻辑是创建sparkSession和注册griffin自定义的spark udf // dq app init dqApp.init match { case Success(_) => info("process init success") case Failure(ex) => error(s"process init error: ${ex.getMessage}", ex) shutdown sys.exit(-5) } // dq app run // (2)执行对应的定时任务作业,这里的处理就是批处理任务, val success = dqApp.run match { case Success(result) => info("process run result: " + (if (result) "success" else "failed")) result case Failure(ex) => error(s"process run error: ${ex.getMessage}", ex) if (dqApp.retryable) { throw ex } else { shutdown sys.exit(-5) } } // dq app end dqApp.close match { case Success(_) => info("process end success") case Failure(ex) => error(s"process end error: ${ex.getMessage}", ex) shutdown sys.exit(-5) } shutdown if (!success) { sys.exit(-5) } } private def readParamFile[T <: Param](file: String)(implicit m : ClassTag[T]): Try[T] = { val paramReader = ParamReaderFactory.getParamReader(file) paramReader.readConfig[T] } private def startup(): Unit = { } private def shutdown(): Unit = { } }
首先进入到**(1)**中 初始化griffiin定时任务执行环境源码,进入到BatchDQApp的init方法
def init: Try[_] = Try {
// build spark 2.0+ application context
val conf = new SparkConf().setAppName(metricName)
conf.setAll(sparkParam.getConfig)
conf.set("spark.sql.crossJoin.enabled", "true")
sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
sqlContext = sparkSession.sqlContext
// register udf
GriffinUDFAgent.register(sqlContext)
}
该段代码主要的功能是根据dqParam中的name属性,创建对应的SparkSession,同时获取对应的SqlContext,
SparkSession是spark2.0引入的新概念,SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
在最后的代码中
GriffinUDFAgent.register(sqlContext)
注册udf,udf是hive中支持的自定义函数,其中udf即最基本的自定义函数,类似to_char,to_data等,其中还有udaf和udtf两种类型:
源码中,通过GriffinUDFs注册了基础的udf函数,index_of,matches,reg_replace,通过GriffinUDAggFs注册了udaf函数(这部分可以进行扩展)
def register(sqlContext: SQLContext): Unit = {
sqlContext.udf.register("index_of", indexOf _)
sqlContext.udf.register("matches", matches _)
sqlContext.udf.register("reg_replace", regReplace _)
}
object GriffinUDAggFs {
def register(sqlContext: SQLContext): Unit = {
}
}
然后,进入到执行对应的定时任务作业 ,这一部分是spark计算的核心代码所在
def run: Try[Boolean] = Try { // start time val startTime = new Date().getTime val measureTime = getMeasureTime val contextId = ContextId(measureTime) // 根据配置获取数据源 即dq对应的data.sources配置, // get data sources val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources) // 数据源初始化操作 dataSources.foreach(_.init) // 创建griffin执行上下文 // create dq context val dqContext: DQContext = DQContext( contextId, metricName, dataSources, sinkParams, BatchProcessType )(sparkSession) // 根据对应的sink的配置,输出结果到console和elasticsearch中 // start id val applicationId = sparkSession.sparkContext.applicationId dqContext.getSink().start(applicationId) // 创建数据检测对比job信息 // build job val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule) // dq job execute val result = dqJob.execute(dqContext) // end time val endTime = new Date().getTime dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms") // clean context dqContext.clean() // finish dqContext.getSink().finish() result }
run方法中,主要的几大功能
首先进入到根据配置获取数据源,传递的参数有sparkSession,dqParam.getDataSources即dq对应的data.sources配置
def getDataSources(sparkSession: SparkSession,
ssc: StreamingContext,
dataSources: Seq[DataSourceParam]
): Seq[DataSource] = {
dataSources.zipWithIndex.flatMap { pair =>
val (param, index) = pair
getDataSource(sparkSession, ssc, param, index)
}
}
getDataSource()方法中,第一个参数是对应的SparkSession,第二个参数是StreamingContext(这里是null),第三个参数是数据源配置,第四个参数是index
private def getDataSource(sparkSession: SparkSession, ssc: StreamingContext, dataSourceParam: DataSourceParam, index: Int ): Option[DataSource] = { val name = dataSourceParam.getName val connectorParams = dataSourceParam.getConnectors val timestampStorage = TimestampStorage() // for streaming data cache val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt( sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage) val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, timestampStorage, streamingCacheClientOpt) match { case Success(connector) => Some(connector) case _ => None } } Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt)) }
调用DataConnectorFactory.getDataConnector函数获取对应的DataConnector对象
第一个参数是SparkSession
第二个参数在batch模式下是null
第三个参数为dbparam配置中data.sources中的connectors参数
第四个参数为时间戳
第五个参数为streaming data cache,为null
def getDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] ): Try[DataConnector] = { val conType = dcParam.getType val version = dcParam.getVersion Try { // 数据源映射 conType match { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) case CustomRegex() => getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case KafkaRegex() => getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("connector creation error!") } } }
最终,我们能看到griffin的meauser默认的数据源配置有以下几种,hive,avro,textDir,kafka等
这里,我们以Hive数据源来分析对应的创建过程,以及数据的执行过程
/** * batch data connector for hive table * 接收三个参数:1,SparkSession 2,connectors配置信息,3,timestampStorage缓存对象 */ case class HiveBatchDataConnector(@transient sparkSession: SparkSession, dcParam: DataConnectorParam, timestampStorage: TimestampStorage ) extends BatchDataConnector { //connectors配置信息下的config配置 val config = dcParam.getConfig //数据库 val Database = "database" //表 val TableName = "table.name" val Where = "where" //config配置下的database库 val database = config.getString(Database, "default") //config配置下的table val tableName = config.getString(TableName, "") //config下的where配置信息 val whereString = config.getString(Where, "") //关联 val concreteTableName = s"${database}.${tableName}" val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty) //sparkSql执行sql语句,返回对应的DataFrame和TimeRate数据 def data(ms: Long): (Option[DataFrame], TimeRange) = { val dfOpt = try { val dtSql = dataSql info(dtSql) val df = sparkSession.sql(dtSql) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { case e: Throwable => error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e) None } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) } private def tableExistsSql(): String = { // s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql s"tableName LIKE '${tableName}'" } private def metaDataSql(): String = { s"DESCRIBE ${concreteTableName}" } private def dataSql(): String = { val tableClause = s"SELECT * FROM ${concreteTableName}" if (wheres.length > 0) { val clauses = wheres.map { w => s"${tableClause} WHERE ${w}" } clauses.mkString(" UNION ALL ") } else tableClause } }
HiveBatchDataConnector对象首先是继承了BatchDataConnnector,并且BatchDataConnector继承了DataConnector对象,其中,HiveBatchDataConnector实现了DataConnector对象中的data方法,这个是一个比较重要的方法:
def data(ms: Long): (Option[DataFrame], TimeRange) = { val dfOpt = try { val dtSql = dataSql info(dtSql) val df = sparkSession.sql(dtSql) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { case e: Throwable => error(s"load hive table ${concreteTableName} fails: ${e.getMessage}", e) None } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) }
首先,获取SparkSql的执行sql语句,通过dataSql方法,然后通过SparkSession执行sql语句,获取对应的DataFrame,同时执行DataConnector方法中的preProcess方法,封装最终的DataFrame
def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = { // new context val context = createContext(ms) val timestamp = context.contextId.timestamp val suffix = context.contextId.id val dcDfName = dcParam.getDataFrameName("this") try { saveTmst(timestamp) // save timestamp dfOpt.flatMap { df => val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName) // init data context.compileTableRegister.registerTable(thisTable) context.runTimeTableRegister.registerTable(thisTable, df) // build job val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules) // job execute preprocJob.execute(context) // out data val outDf = context.sparkSession.table(s"`${thisTable}`") // add tmst column val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp)) // clean context context.clean() Some(withTmstDf) } } catch { case e: Throwable => error(s"pre-process of data connector [${id}] error: ${e.getMessage}", e) None } } }
关于data方法的执行,我们可以在后面的源码中看到,
最终,我们会根据一系列的配置信息,初始化对应的数据源,接着,
dataSources.foreach(_.init)
执行dataSources的初始化方法,以Hive的DataSource为例,init方法没有具体实现内容
接着,执行
val dqContext: DQContext = DQContext(
contextId, metricName, dataSources, sinkParams, BatchProcessType
)(sparkSession)
创建griffin的上下文DQContext对象
/** * 每一个spark计算的context唯一的上下文对象数据 * dq context: the context of each calculation * unique context id in each calculation * access the same spark session this app created */ case class DQContext(contextId: ContextId, name: String, dataSources: Seq[DataSource], sinkParams: Seq[SinkParam], procType: ProcessType )(@transient implicit val sparkSession: SparkSession) { val sqlContext: SQLContext = sparkSession.sqlContext //编译 val compileTableRegister: CompileTableRegister = CompileTableRegister() //运行环境 val runTimeTableRegister: RunTimeTableRegister = RunTimeTableRegister(sqlContext) val dataFrameCache: DataFrameCache = DataFrameCache() val metricWrapper: MetricWrapper = MetricWrapper(name) val writeMode = WriteMode.defaultMode(procType) //数据源名称 val dataSourceNames: Seq[String] = { // sort data source names, put baseline data source name to the head val (blOpt, others) = dataSources.foldLeft((None: Option[String], Nil: Seq[String])) { (ret, ds) => val (opt, seq) = ret if (opt.isEmpty && ds.isBaseline) (Some(ds.name), seq) else (opt, seq :+ ds.name) } blOpt match { case Some(bl) => bl +: others case _ => others } } dataSourceNames.foreach(name => compileTableRegister.registerTable(name)) def getDataSourceName(index: Int): String = { if (dataSourceNames.size > index) dataSourceNames(index) else "" } implicit val encoder = Encoders.STRING val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq //加载数据 val dataSourceTimeRanges = loadDataSources() def loadDataSources(): Map[String, TimeRange] = { dataSources.map { ds => (ds.name, ds.loadData(this)) }.toMap } printTimeRanges private val sinkFactory = SinkFactory(sinkParams, name) private val defaultSink: Sink = createSink(contextId.timestamp) def getSink(timestamp: Long): Sink = { if (timestamp == contextId.timestamp) getSink() else createSink(timestamp) } def getSink(): Sink = defaultSink private def createSink(t: Long): Sink = { procType match { case BatchProcessType => sinkFactory.getSinks(t, true) case StreamingProcessType => sinkFactory.getSinks(t, false) } } def cloneDQContext(newContextId: ContextId): DQContext = { DQContext(newContextId, name, dataSources, sinkParams, procType)(sparkSession) } def clean(): Unit = { compileTableRegister.unregisterAllTables() runTimeTableRegister.unregisterAllTables() dataFrameCache.uncacheAllDataFrames() dataFrameCache.clearAllTrashDataFrames() } private def printTimeRanges(): Unit = { if (dataSourceTimeRanges.nonEmpty) { val timeRangesStr = dataSourceTimeRanges.map { pair => val (name, timeRange) = pair s"${name} -> (${timeRange.begin}, ${timeRange.end}]" }.mkString(", ") println(s"data source timeRanges: ${timeRangesStr}") } } }
创建DQContext对象,传递四个参数
第一个参数为对应的sparkSession
第二个参数为当前执行任务的name信息,也就是dqparam中的name属性
第三个参数为上面获取的数据源配置信息
第四个参数为envparam中配置的sinks配置信息
第五个参数为批处理类型
重点看一下
//加载数据
val dataSourceTimeRanges = loadDataSources()
加载数据源中的数据信息,调用DataSource中的loadData()方法,这个时候,我们会看到会执行对应每一个数据源DataConnector实现的data方法,因为我们是以对应的Hive数据源为例,进入到HiveBatchDataConnector类中的data方法,重点研究一下SparkSql的执行逻辑
def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = { // new context //创建一个新的DQContext val context = createContext(ms) val timestamp = context.contextId.timestamp val suffix = context.contextId.id val dcDfName = dcParam.getDataFrameName("this") try { saveTmst(timestamp) // save timestamp dfOpt.flatMap { df => val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName) // init data context.compileTableRegister.registerTable(thisTable) context.runTimeTableRegister.registerTable(thisTable, df) // build job val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules) // job execute preprocJob.execute(context) // out data val outDf = context.sparkSession.table(s"`${thisTable}`") // add tmst column val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp)) // clean context context.clean() Some(withTmstDf) } } catch { case e: Throwable => error(s"pre-process of data connector [${id}] error: ${e.getMessage}", e) None } } }
创建build job信息,进入到
// build job
val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
这里源码有个疑惑,在创建DQContext的时候,执行了一次loadDataSources(),内部是创建了一次buildDQJob(),为什么外面又创建了一次??
首先是获取dqparma中的evaluate.rule配置规则,在DQJobBuilder中的buildDQJob方法中,首先是根据dataSource创建 steps
def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = { // build steps by datasources val dsSteps = context.dataSources.flatMap { dataSource => DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam) } // build steps by rules /** * SeqDQStep(List(SparkSqlTransformStep(__missRecords,SELECT `source`.* FROM `source` LEFT JOIN `target` ON coalesce(`source`.`user_id`, '') = coalesce(`target`.`user_id`, '') AND upper(`source`.`first_name`) = upper(`target`.`first_name`) AND coalesce(`source`.`last_name`, '') = coalesce(`target`.`last_name`, '') AND coalesce(`source`.`address`, '') = coalesce(`target`.`address`, '') AND coalesce(`source`.`email`, '') = coalesce(`target`.`email`, '') AND coalesce(`source`.`phone`, '') = coalesce(`target`.`phone`, '') AND coalesce(`source`.`post_code`, '') = coalesce(`target`.`post_code`, '') WHERE (NOT (`source`.`user_id` IS NULL AND `source`.`first_name` IS NULL AND `source`.`last_name` IS NULL AND `source`.`address` IS NULL AND `source`.`email` IS NULL AND `source`.`phone` IS NULL AND `source`.`post_code` IS NULL)) AND (`target`.`user_id` IS NULL AND `target`.`first_name` IS NULL AND `target`.`last_name` IS NULL AND `target`.`address` IS NULL AND `target`.`email` IS NULL AND `target`.`phone` IS NULL AND `target`.`post_code` IS NULL),Map(),true), SparkSqlTransformStep(__missCount,SELECT COUNT(*) AS `miss` FROM `__missRecords`,Map(),false), SparkSqlTransformStep(__totalCount,SELECT COUNT(*) AS `total` FROM `source`,Map(),false), SparkSqlTransformStep(accu, SELECT A.total AS `total`, A.miss AS `miss`, (A.total - A.miss) AS `matched`, coalesce( (A.total - A.miss) / A.total, 1.0) AS `matchedFraction` FROM ( SELECT `__totalCount`.`total` AS total, coalesce(`__missCount`.`miss`, 0) AS miss FROM `__totalCount` LEFT JOIN `__missCount` ) AS A ,Map(),false), MetricWriteStep(accu,accu,DefaultFlattenType,None), RecordWriteStep(__missRecords,__missRecords,None,None))) */ val ruleSteps = ruleParams.flatMap { ruleParam => DQStepBuilder.buildStepOptByRuleParam(context, ruleParam) } // metric flush step val metricFlushStep = MetricFlushStep() /** * ++ 用于连接两个集合 * :+ 用于在集合尾部追加集合 * +: 用于在集合头部追加集合 */ DQJob(dsSteps ++ ruleSteps :+ metricFlushStep) }
然后再创建rulesteps,调用DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam
): Option[DQStep] = {
val dslType = ruleParam.getDslType
val dsNames = context.dataSourceNames
val funcNames = context.functionNames
val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
.flatMap(_.buildDQStep(context, ruleParam))
dqStepOpt.toSeq.flatMap(_.getNames).foreach(name =>
context.compileTableRegister.registerTable(name)
)
dqStepOpt
}
首先是创建对应的RuleParamStepBuilder,然后调用buildDQStep方法
def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
val steps = buildSteps(context, param) //这个没看懂
if (steps.size > 1) Some(SeqDQStep(steps))
else if (steps.size == 1) steps.headOption
else None
}
根据griffin官方的含义,这个根据对应的解析器dfs进行结对对应的执行步骤
Apache Girffin DSL涉及详见:
https://github.com/apache/griffin/blob/master/griffin-doc/measure/dsl-guide.md
Spark :学习参考:https://www.cnblogs.com/qingyunzong/category/1202252.html
Hadoop
Hive
Livy
Quartz
官方网站上给的两个解决的方法1,service/src/main/resources/env/env_batch.json里的ES配置信息不正确 这个已经修复了 2,执行spark任务的yarn服务器上没有配置ES服务器的hostname,连接异常(这个不太明白)
部署参考
https://blog.csdn.net/github_39577257/article/details/90607081
http://griffin.apache.org/docs/quickstart-cn.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。