当前位置:   article > 正文

【DataHub】 现代数据栈的元数据平台--如何与spark集成,自动产生spark作业的数据血缘关系?

【DataHub】 现代数据栈的元数据平台--如何与spark集成,自动产生spark作业的数据血缘关系?

DataHub如何集成spark?

DataHub通过如下方式集成spark:

  • 提供了一个轻量级的Java代理,它侦听Spark应用程序和作业事件,并实时将数据血缘元数据推送到DataHub
  • Java代理侦听应用程序的开始、结束事件、SQLExecution的开始、结束事件,以在Datahub中创建pipelines (即DataFlow)和任务(即DataJob),以及对正在读写的数据集间的数据血缘关系。

如下面的代码(源码参见:TestSparkJobsLineage.java

 spark = SparkSession.builder()
        .appName(APP_NAME)
        .config("spark.master", MASTER)
        .config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
        .config("spark.datahub.lineage.consumerTypes", "accumulator")
        .config("spark.datahub.rest.server", "http://172.25.21.188:" + "8080")//mockServer.getPort())
        .config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath())
        .enableHiveSupport()
        .getOrCreate();

    .......
 Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv");
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv");
    df1.createOrReplaceTempView("v1");
    df2.createOrReplaceTempView("v2");

    Dataset<Row> df =
        spark.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id");

    // InsertIntoHadoopFsRelationCommand
    df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv");
    Thread.sleep(5000);
    check(dsl(hdfsDs("out.csv"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0));
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

集成spark产生的元数据

执行上面的测试用例,将产生如下元数据:

  • 整个spark application是一个dataFlow,在UI展示上,是归到Pipelines/spark 下面 【每个Spark <master, appName>创建一个pipeline。】
  • 每个SQL的执行动作如df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv");,产生一个任务 dataJob【每执行一次Spark查询都会创建一个任务】
  • 因为每个dataJob都有输入、输出数据集,这样就产生了数据集间的数据血缘关系。
    在这里插入图片描述

集成spark产生的数据集间的数据血缘关系

执行上面的测试用例,将同时产生数据集间的数据血缘关系,如下:
在这里插入图片描述
在这里插入图片描述

集成spark产生的元数据的属性

以下管道pipelines和任务tasks中的自定义属性与Spark UI相关:

  • 管道pipeline中的appName和appId可以用来确定Spark应用程序
  • 任务中的descriptionSQLQueryId可以用来在Spark UI的SQL Tab上确定应用程序中的Query Execution

管道pipeline和任务task的其他自定义属性:捕获执行的开始和结束时间、查询计划在任务的queryPlan属性中捕获。
在这里插入图片描述

针对spark不同版本的支持

  • 测试的主要版本是Spark/Scala版本2.48 /2_11。
  • datahub-spark-lineage库已经在Spark版本(2.2.0 - 2.4.8)和Scala版本(2.10 - 2.12)上进行了测试。
  • 针对Spark 3,目前仅仅在Spark 3.1.2和Scala 2.12下测试过。
  • 针对其它spark版本的支持,datahub正在计划做

针对spark api的支持

支持的API列表

下面是当前被解析的Spark api列表:

  • InsertIntoHadoopFsRelationCommand
  • SaveIntoDataSourceCommand (jdbc)
  • CreateHiveTableAsSelectCommand
  • InsertIntoHiveTable
  • Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC.

不支持的API列表

  • View related commands
  • Cache commands and implications on lineage
  • RDD jobs

如何在spark作业中集成datahub-spark-lineage?

在spark-submit时集成

针对提交的spark作业有效,示例如下

spark-submit --class org.apache.spark.examples.SparkSQLTest \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g --executor-memory 1g  --executor-cores 2 \
--jars "/opt/sparkjob/jars/datahub-spark-lineage-0.8.26.jar" \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.rest.server=http://localhost:8080" \
spark-sql-test-1.1.2.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

修改spark-defaults.conf

修改spark默认的配置文件spark-defaults.conf,针对所有spark作业有效

#Configuring datahub spark agent jar
spark.extraListeners                         datahub.spark.DatahubSparkListener
spark.datahub.rest.server                    http://localhost:8080
#如果开启https和token认证,将如下配置打开
#spark.datahub.rest.token                    <token>
  • 1
  • 2
  • 3
  • 4
  • 5

把datahub-spark-lineage-0.8.26.jar放在spark.yarn.archive指定的压缩包 或 spark.yarn.jars指定的hdfs目录下【参见spark执行优化–禁止将依赖的Jar包传到HDFS(spark.yarn.jar和spark.yarn.archive的使用)

在spark程序代码中指定

spark的参数配置参见spark参数配置

spark = SparkSession.builder \
          .master("spark://spark-master:7077") \
          .appName("spark-sql-lineage-test") \
          .config("spark.jars","/opt/sparkjob/jars/datahub-spark-lineage-0.8.26.jar") \
          .config("spark.extraListeners","datahub.spark.DatahubSparkListener") \
          .config("spark.datahub.rest.server", "http://localhost:8080") \
          .enableHiveSupport() \
          .getOrCreate()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

参考

metadata-integration spark-lineage
spark-lineage源码

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号