赞
踩
DataHub通过如下方式集成spark:
如下面的代码(源码参见:TestSparkJobsLineage.java
spark = SparkSession.builder() .appName(APP_NAME) .config("spark.master", MASTER) .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.lineage.consumerTypes", "accumulator") .config("spark.datahub.rest.server", "http://172.25.21.188:" + "8080")//mockServer.getPort()) .config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath()) .enableHiveSupport() .getOrCreate(); ....... Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv"); Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv"); df1.createOrReplaceTempView("v1"); df2.createOrReplaceTempView("v2"); Dataset<Row> df = spark.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id"); // InsertIntoHadoopFsRelationCommand df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv"); Thread.sleep(5000); check(dsl(hdfsDs("out.csv"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0));
执行上面的测试用例,将产生如下元数据:
Pipelines/spark
下面 【每个Spark <master, appName>创建一个pipeline。】df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv");
,产生一个任务 dataJob【每执行一次Spark查询都会创建一个任务】执行上面的测试用例,将同时产生数据集间的数据血缘关系,如下:
以下管道pipelines和任务tasks中的自定义属性与Spark UI相关:
description
和SQLQueryId
可以用来在Spark UI的SQL Tab上确定应用程序中的Query Execution管道pipeline和任务task的其他自定义属性:捕获执行的开始和结束时间、查询计划在任务的queryPlan属性中捕获。
下面是当前被解析的Spark api列表:
针对提交的spark作业有效,示例如下
spark-submit --class org.apache.spark.examples.SparkSQLTest \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g --executor-memory 1g --executor-cores 2 \
--jars "/opt/sparkjob/jars/datahub-spark-lineage-0.8.26.jar" \
--conf "spark.extraListeners=datahub.spark.DatahubSparkListener" \
--conf "spark.datahub.rest.server=http://localhost:8080" \
spark-sql-test-1.1.2.jar
修改spark默认的配置文件spark-defaults.conf,针对所有spark作业有效
#Configuring datahub spark agent jar
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
#如果开启https和token认证,将如下配置打开
#spark.datahub.rest.token <token>
把datahub-spark-lineage-0.8.26.jar放在spark.yarn.archive指定的压缩包 或 spark.yarn.jars指定的hdfs目录下【参见spark执行优化–禁止将依赖的Jar包传到HDFS(spark.yarn.jar和spark.yarn.archive的使用)】
spark的参数配置参见spark参数配置
spark = SparkSession.builder \
.master("spark://spark-master:7077") \
.appName("spark-sql-lineage-test") \
.config("spark.jars","/opt/sparkjob/jars/datahub-spark-lineage-0.8.26.jar") \
.config("spark.extraListeners","datahub.spark.DatahubSparkListener") \
.config("spark.datahub.rest.server", "http://localhost:8080") \
.enableHiveSupport() \
.getOrCreate()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。