当前位置:   article > 正文

DolphinScheduler海豚调度教程_海豚调度使用说明_dolphinscheduler使用教程

dolphinscheduler使用教程

功能介绍

首页

首页包含用户所有项目的任务状态统计、流程状态统计、工作流定义统计。

img

项目管理

创建项目

点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。

img

项目首页

在项目管理页面点击项目名称链接,进入项目首页,如下图所示,项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。这几个指标的说明如下

  • 任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数
  • 流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数
  • 工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义

img

任务类型

Shell节点

shell节点,在worker执行的时候,会生成一个临时shell脚本,使用租户同名的linux用户执行这个脚本。

  • 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
  • 工具栏中拖动[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-04KjxJw7-1683205953746)(https://dolphinscheduler.apache.org/img/shell.png)]到画板中,如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SAkHpS8X-1683205953749)(https://dolphinscheduler.apache.org/img/shell_dag.png)]

  • 节点名称:一个工作流定义中的节点名称是唯一的。
  • 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
  • 描述信息:描述该节点的功能。
  • 任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
  • Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
  • 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
  • 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
  • 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
  • 脚本:用户开发的SHELL程序。
  • 资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。
  • 自定义参数:是SHELL局部的用户自定义参数,会替换脚本中以${变量}的内容。
子流程节点
  • 子流程节点,就是把外部的某个工作流定义当做一个任务节点去执行。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 节点名称:一个工作流定义中的节点名称是唯一的
  • 运行标志:标识这个节点是否能正常调度
  • 描述信息:描述该节点的功能
  • 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
  • 子节点:是选择子流程的工作流定义,右上角进入该子节点可以跳转到所选子流程的工作流定义
依赖节点
  • 依赖节点,就是依赖检查节点。比如A流程依赖昨天的B流程执行成功,依赖节点会去检查B流程在昨天是否有执行成功的实例。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

依赖节点提供了逻辑判断功能,比如检查昨天的B流程是否成功,或者C流程是否执行成功。

img

例如,A流程为周报任务,B、C流程为天任务,A任务需要B、C任务在上周的每一天都执行成功,如图示:

img

假如,周报A同时还需要自身在上周二执行成功:

img

存储过程节点
  • 根据选择的数据源,执行存储过程。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
  • 方法:是存储过程的方法名称
  • 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型
SQL节点
  • 拖动工具栏中的PNG任务节点到画板中
  • 非查询SQL功能:编辑非查询SQL任务信息,sql类型选择非查询,如下图所示:

img

  • 查询SQL功能:编辑查询SQL任务信息,sql类型选择查询,选择表格或附件形式发送邮件到指定的收件人,如下图所示。

img

  • 数据源:选择对应的数据源
  • sql类型:支持查询和非查询两种,查询是select类型的查询,是有结果集返回的,可以指定邮件通知为表格、附件或表格附件三种模板。非查询是没有结果集返回的,是针对update、delete、insert三种类型的操作。
  • sql参数:输入参数格式为key1=value1;key2=value2…
  • sql语句:SQL语句
  • UDF函数:对于HIVE类型的数据源,可以引用资源中心中创建的UDF函数,其他类型的数据源暂不支持UDF函数。
  • 自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。
  • 前置sql:前置sql在sql语句之前执行。
  • 后置sql:后置sql在sql语句之后执行。
SPARK节点
  • 通过SPARK节点,可以直接直接执行SPARK程序,对于spark节点,worker会使用spark-submit方式提交任务

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 程序类型:支持JAVA、Scala和Python三种语言
  • 主函数的class:是Spark程序的入口Main Class的全路径
  • 主jar包:是Spark的jar包
  • 部署方式:支持yarn-cluster、yarn-client和local三种模式
  • Driver内核数:可以设置Driver内核数及内存数
  • Executor数量:可以设置Executor数量、Executor内存数和Executor内核数
  • 命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
  • 其他参数:支持 --jars、–files、–archives、–conf格式
  • 资源:如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Spark则没有主函数的class,其他都是一样

MapReduce(MR)节点
  • 使用MR节点,可以直接执行MR程序。对于mr节点,worker会使用hadoop jar方式提交任务

拖动工具栏中的PNG任务节点到画板中,如下图所示:

JAVA程序

img

  • 主函数的class:是MR程序的入口Main Class的全路径
  • 程序类型:选择JAVA语言
  • 主jar包:是MR的jar包
  • 命令行参数:是设置MR程序的输入参数,支持自定义参数变量的替换
  • 其他参数:支持 –D、-files、-libjars、-archives格式
  • 资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
Python程序

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p1oexZeW-1683205971444)(null)]

  • 程序类型:选择Python语言
  • 主jar包:是运行MR的Python jar包
  • 其他参数:支持 –D、-mapper、-reducer、-input -output格式,这里可以设置用户自定义参数的输入,比如:
  • -mapper “mapper.py 1” -file mapper.py -reducer reducer.py -file reducer.py –input /journey/words.txt -output /journey/out/mr/${currentTimeMillis}
  • 其中 -mapper 后的 mapper.py 1是两个参数,第一个参数是mapper.py,第二个参数是1
  • 资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容
Python节点
  • 使用python节点,可以直接执行python脚本,对于python节点,worker会使用python **方式提交任务。

拖动工具栏中的PNG任务节点到画板中,如下图所示:

img

  • 脚本:用户开发的Python程序
  • 环境名称:执行Python程序的解释器路径,指定运行脚本的解释器。当你需要使用 Python 虚拟环境 时,可以通过创建不同的环境名称来实现。
  • 资源:是指脚本中需要调用的资源文件列表
  • 自定义参数:是Python局部的用户自定义参数,会替换脚本中以${变量}的内容
  • 注意:若引入资源目录树下的python文件,需添加 __init__.py 文件
Flink节点
  • 拖动工具栏中的img任务节点到画板中,如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D8Zut6Pc-1683205976158)(null)]

  • 程序类型:支持JAVA、Scala和Python三种语言
  • 主函数的class:是Flink程序的入口Main Class的全路径
  • 主jar包:是Flink的jar包
  • 部署方式:支持cluster、local三种模式
  • slot数量:可以设置slot数
  • taskManage数量:可以设置taskManage数
  • jobManager内存数:可以设置jobManager内存数
  • taskManager内存数:可以设置taskManager内存数
  • 命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
  • 其他参数:支持 --jars、–files、–archives、–conf格式
  • 资源:如果其他参数中引用了资源文件,需要在资源中选择指定
  • 自定义参数:是Flink局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Flink则没有主函数的class,其他都是一样

http节点
  • 拖动工具栏中的img任务节点到画板中,如下图所示:

img

  • 节点名称:一个工作流定义中的节点名称是唯一的。
  • 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
  • 描述信息:描述该节点的功能。
  • 任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
  • Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
  • 失败重试次数:任务失败重新提交的次数,支持下拉和手填。
  • 失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
  • 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
  • 请求地址:http请求URL。
  • 请求类型:支持GET、POSt、HEAD、PUT、DELETE。
  • 请求参数:支持Parameter、Body、Headers。
  • 校验条件:支持默认响应码、自定义响应码、内容包含、内容不包含。
  • 校验内容:当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容。
  • 自定义参数:是http局部的用户自定义参数,会替换脚本中以${变量}的内容。
DATAX节点
  • 拖动工具栏中的img任务节点到画板中

img

  • 自定义模板:打开自定义模板开关时,可以自定义datax节点的json配置文件内容(适用于控件配置不满足需求时)
  • 数据源:选择抽取数据的数据源
  • sql语句:目标库抽取数据的sql语句,节点执行时自动解析sql查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换
  • 目标库:选择数据同步的目标库
  • 目标表:数据同步的目标表名
  • 前置sql:前置sql在sql语句之前执行(目标库执行)。
  • 后置sql:后置sql在sql语句之后执行(目标库执行)。
  • json:datax同步的json配置文件
  • 自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。

参数

内置参数
基础内置参数
变量名声明方式含义
system.biz.date${system.biz.date}日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1
system.biz.curdate${system.biz.curdate}日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1
system.datetime${system.datetime}日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1
衍生内置参数
  • 支持代码中自定义变量名,声明方式:${变量名}。可以是引用 “系统参数”
  • 我们定义这种基准变量为

[

.

.

.

]

格式的,

[…] 格式的,

[…]格式的,[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等

  • 也可以通过以下两种方式:

1.使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月

+ 后 N 年:$[add\_months(yyyyMMdd,12\*N)]
+ 前 N 年:$[add\_months(yyyyMMdd,-12\*N)]
+ 后 N 月:$[add\_months(yyyyMMdd,N)]
+ 前 N 月:$[add\_months(yyyyMMdd,-N)]
  • 1
  • 2
  • 3
  • 4

2.直接加减数字 在自定义格式后直接“+/-”数字

+ 后 N 周:$[yyyyMMdd+7\*N]
+ 前 N 周:$[yyyyMMdd-7\*N]
+ 后 N 天:$[yyyyMMdd+N]
+ 前 N 天:$[yyyyMMdd-N]
+ 后 N 小时:$[HHmmss+N/24]
+ 前 N 小时:$[HHmmss-N/24]
+ 后 N 分钟:$[HHmmss+N/24/60]
+ 前 N 分钟:$[HHmmss-N/24/60]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
全局参数
作用域

在工作流定义页面配置的参数,作用于该工作流中全部的任务

使用方式

全局参数配置方式如下:在工作流定义页面,点击“设置全局”右边的加号,填写对应的变量名称和对应的值,保存即可

img

img

这里定义的global_bizdate参数可以被其它任一节点的局部参数引用,并设置global_bizdate的value为通过引用系统参数system.biz.date获得的值

本地参数

作用域

在任务定义页面配置的参数,默认作用域仅限该任务,如果配置了参数传递则可将该参数作用到下游任务中。

使用方式

本地参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,填写对应的变量名称和对应的值,保存即可

img

img

如果想要在本地参数中调用系统内置参数,将内置参数对应的值填到value中,如上图中的${biz_date}以及${curdate}

参数的引用

DolphinScheduler 提供参数间相互引用的能力,包括:本地参数引用全局参数、上下游参数传递。因为有引用的存在,就涉及当参数名相同时,参数的优先级问题,详见参数优先级

本地任务引用全局参数

本地任务引用全局参数的前提是,你已经定义了全局参数,使用方式和本地参数中的使用方式类似,但是参数的值需要配置成全局参数中的key

parameter-call-global-in-local

如上图中的${biz_date}以及${curdate},就是本地参数引用全局参数的例子。观察上图的最后一行,local_param_bizdate通过

g

l

o

b

a

l

b

i

z

d

a

t

e

来引用全局参数,在

s

h

e

l

l

脚本中可以通过

{global_bizdate}来引用全局参数,在shell脚本中可以通过

globalb​izdate来引用全局参数,在shell脚本中可以通过{local_param_bizdate}来引全局变量 global_bizdate的值,或通过JDBC直接将local_param_bizdate的值set进去。同理,local_param通过{local_param}引用上一节中定义的全局参数。biz_date、biz_curdate、system.datetime都是用户自定义的参数,通过{全局参数}进行赋值。

上游任务传递给下游任务

DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支持上游单向传递给下游。目前支持这个特性的任务类型有:

当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。目前我们主要针对 SQL 和 SHELL 节点做了可以向下传递参数的功能。

SQL

prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。

如果 SQL 节点的结果只有一行,一个或多个字段,prop 的名字需要和字段名称一致。数据类型可选择为除 LIST 以外的其他类型。变量会选择 SQL 查询结果中的列名中与该变量名称相同的列对应的值。

如果 SQL 节点的结果为多行,一个或多个字段,prop 的名字需要和字段名称一致。数据类型选择为LIST。获取到 SQL 查询结果后会将对应列转化为 LIST,并将该结果转化为 JSON 后作为对应变量的值。

我们再以上图中包含 SQL 节点的流程举例说明:

上图中节点【createParam1】的定义如下:

image-20210723104957031

节点【createParam2】的定义如下:

image-20210723105026924

您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。

节点实例【createParam1】如下:

image-20210723105131381

这里当然 “id” 的值会等于 12.

我们再来看节点实例【createParam2】的情况。

image-20210723105255850

这里只有 “id” 的值。尽管用户定义的 sql 查到的是 “id” 和 “database_name” 两个字段,但是由于只定义了一个为 out 的变量 “id”,所以只会设置一个变量。由于显示的原因,这里已经替您查好了该 list 的长度为 10。

Shell

prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。

用户需要传递参数,在定义 shell 脚本时,需要输出格式为 ${setValue(key=value)} 的语句,key 为对应参数的 prop,value 为该参数的值。

例如下图中, 通过 echo '${setValue(trans=hello trans)}', 将’trans’设置为"hello trans", 在下游任务中就可以使用trans这个变量了:

trans-shell

shell 节点定义时当日志检测到 ${setValue(key=value1)} 的格式时,会将 value1 赋值给 key,下游节点便可以直接使用变量 key 的值。同样,您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6pUPPoeR-1683205971021)(null)]

参数优先级

DolphinScheduler 中所涉及的参数值的定义可能来自三种类型:

  • 全局参数:在工作流保存页面定义时定义的变量
  • 上游任务传递的参数:上游任务传递过来的参数
  • 本地参数:节点的自有变量,用户在“自定义参数”定义的变量,并且用户可以在工作流定义时定义该部分变量的值

因为参数的值存在多个来源,当参数名相同时,就需要会存在参数优先级的问题。DolphinScheduler 参数的优先级从高到低为:全局参数 > 上游任务传递的参数 > 本地参数

在上游任务传递的参数的情况下,由于上游可能存在多个任务向下游传递参数。当上游传递的参数名称相同时:

  • 下游节点会优先使用值为非空的参数
  • 如果存在多个值为非空的参数,则按照上游任务的完成时间排序,选择完成时间最早的上游任务对应的参数
例子

下面例子向你展示如何使用任务参数传递的优先级问题

1:先以 shell 节点解释第一种情况

image-20210723102938239

节点 【useParam】可以使用到节点【createParam】中设置的变量。而节点 【useParam】与节点【noUseParam】中并没有依赖关系,所以并不会获取到节点【noUseParam】的变量。上图中只是以 shell 节点作为例子,其他类型节点具有相同的使用规则。

image-20210723103316896

其中节点【createParam】在使用变量时直接使用即可。另外该节点设置了 “key” 和 “key1” 两个变量,这里用户用定义了一个与上游节点传递的变量名相同的变量 key1,并且复制了值为 “12”,但是由于我们设置的优先级的关系,这里的值 “12” 会被抛弃,最终使用上游节点设置的变量值。

2:我们再以 sql 节点来解释另外一种情况

image-20210723103937052

节点【use_create】的定义如下:

image-20210723104411489

“status” 是当前节点设置的节点的自有变量。但是用户在保存时也同样设置了 “status” 变量,并且赋值为 -1。那在该 SQL 执行时,status 的值为优先级更高的 -1。抛弃了节点的自有变量的值。

这里的 “id” 是上游节点设置的变量,用户在节点【createParam1】、节点【createParam2】中设置了相同参数名 “id” 的参数。而节点【use_create】中使用了最先结束的【createParam1】的值。

数据源中心

数据源

数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源。

  • 点击“数据源中心->创建数据源”,根据需求创建不同类型的数据源。
  • 点击“测试连接”,测试数据源是否可以连接成功。
MySQL数据源
  • 数据源:选择MYSQL
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP主机名:输入连接MySQL的IP
  • 端口:输入连接MySQL的端口
  • 用户名:设置连接MySQL的用户名
  • 密码:设置连接MySQL的密码
  • 数据库名:输入连接MySQL的数据库名称
  • Jdbc连接参数:用于MySQL连接的参数设置,以JSON形式填写

img

POSTGRESQL数据源
  • 数据源:选择POSTGRESQL
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP/主机名:输入连接POSTGRESQL的IP
  • 端口:输入连接POSTGRESQL的端口
  • 用户名:设置连接POSTGRESQL的用户名
  • 密码:设置连接POSTGRESQL的密码
  • 数据库名:输入连接POSTGRESQL的数据库名称
  • Jdbc连接参数:用于POSTGRESQL连接的参数设置,以JSON形式填写

img

HIVE数据源
使用HiveServer2

img

  • 数据源:选择HIVE
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP/主机名:输入连接HIVE的IP
  • 端口:输入连接HIVE的端口
  • 用户名:设置连接HIVE的用户名
  • 密码:设置连接HIVE的密码
  • 数据库名:输入连接HIVE的数据库名称
  • Jdbc连接参数:用于HIVE连接的参数设置,以JSON形式填写
使用HiveServer2 HA Zookeeper

img

注意:如果开启了kerberos,则需要填写 Principal

img

Spark数据源

img

  • 数据源:选择Spark
  • 数据源名称:输入数据源的名称
  • 描述:输入数据源的描述
  • IP/主机名:输入连接Spark的IP
  • 端口:输入连接Spark的端口
  • 用户名:设置连接Spark的用户名
  • 密码:设置连接Spark的密码
  • 数据库名:输入连接Spark的数据库名称
  • Jdbc连接参数:用于Spark连接的参数设置,以JSON形式填写

注意:如果开启了kerberos,则需要填写 Principal

img

告警

如何创建告警插件以及告警组

在2.0.0版本中,用户需要创建告警实例,然后同告警组进行关联,一个告警组可以使用多个告警实例,我们会逐一进行进行告警通知。

首先需要进入到安全中心,选择告警组管理,然后点击左侧的告警实例管理,然后创建一个告警实例,然后选择对应的告警插件,填写相关告警参数。

然后选择告警组管理,创建告警组,选择相应的告警实例即可。

img img img img

企业微信

如果您需要使用到企业微信进行告警,请在安装完成后,修改 alert.properties 文件,然后重启 alert 服务即可。企业微信的配置样例如下

# 设置企业微信告警功能是否开启:开启为 true,否则为 false
enterprise.wechat.enable="true"

# 设置 corpid,每个企业都拥有唯一的 corpid,获取此信息可在管理后台 “我的企业” - “企业信息” 下查看 “企业 ID”(需要有管理员权限)
enterprise.wechat.corp.id="xxx"

# 设置 secret,secret 是企业应用里面用于保障数据安全的 “钥匙”,每一个应用都有一个独立的访问密钥
enterprise.wechat.secret="xxx"

# 设置 agentid,每个应用都有唯一的 agentid。在管理后台 -> “应用与小程序” -> “应用”,点进某个应用,即可看到 agentid
enterprise.wechat.agent.id="xxxx"

# 设置 userid,多个用逗号分隔。每个成员都有唯一的 userid,即所谓 “帐号”。在管理后台 -> “通讯录” -> 点进某个成员的详情页,可以看到
enterprise.wechat.users=zhangsan,lisi

# 获取 access\_token 的地址,使用如下例子无需修改
enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}

# 发送应用消息地址,使用如下例子无需改动
enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}

# 发送消息格式,无需改动
enterprise.wechat.user.send.msg={\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

资源中心

如果需要用到资源上传功能,针对单机可以选择本地文件目录作为上传文件夹(此操作不需要部署 Hadoop)。当然也可以选择上传到 Hadoop or MinIO 集群上,此时则需要有Hadoop (2.6+) 或者 MinIO 等相关环境

*注意:*

  • 如果用到资源上传的功能,那么 安装部署中,部署用户需要有这部分的操作权限
  • 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的 core-site.xmlhdfs-site.xml 复制到 /opt/dolphinscheduler/conf,非 NameNode HA 跳过次步骤

hdfs资源配置

  • 上传资源文件和udf函数,所有上传的文件和资源都会被存储到hdfs上,所以需要以下配置项:
conf/common.properties  
    # Users who have permission to create directories under the HDFS root path
    hdfs.root.user=hdfs
    # data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
    resource.upload.path=/dolphinscheduler
    # resource storage type : HDFS,S3,NONE
    resource.storage.type=HDFS
    # whether kerberos starts
    hadoop.security.authentication.startup.state=false
    # java.security.krb5.conf path
    java.security.krb5.conf.path=/opt/krb5.conf
    # loginUserFromKeytab user
    login.user.keytab.username=hdfs-mycluster@ESZ.COM
    # loginUserFromKeytab path
    login.user.keytab.path=/opt/hdfs.headless.keytab    
    # if resource.storage.type is HDFS,and your Hadoop Cluster NameNode has HA enabled, you need to put core-site.xml and hdfs-site.xml in the installPath/conf directory. In this example, it is placed under /opt/soft/dolphinscheduler/conf, and configure the namenode cluster name; if the NameNode is not HA, modify it to a specific IP or host name.
    # if resource.storage.type is S3,write S3 address,HA,for example :s3a://dolphinscheduler,
    # Note,s3 be sure to create the root directory /dolphinscheduler
    fs.defaultFS=hdfs://mycluster:8020    
    #resourcemanager ha note this need ips , this empty if single
    yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx    
    # If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
    yarn.application.status.address=http://xxxx:8088/ws/v1/cluster/apps/%s

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

文件管理

是对各种资源文件的管理,包括创建基本的txt/log/sh/conf/py/java等文件、上传jar包等各种类型文件,可进行编辑、重命名、下载、删除等操作。

img

  • 创建文件

文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties

img

  • 上传文件

上传文件:点击"上传文件"按钮进行上传,将文件拖拽到上传区域,文件名会自动以上传的文件名称补全

img

  • 文件查看

对可查看的文件类型,点击文件名称,可查看文件详情

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rUFXelKK-1683205976507)(null)]

  • 下载文件

点击文件列表的"下载"按钮下载文件或者在文件详情中点击右上角"下载"按钮下载文件

  • 文件重命名

img

  • 删除

文件列表->点击"删除"按钮,删除指定文件

UDF管理

资源管理

资源管理和文件管理功能类似,不同之处是资源管理是上传的UDF函数,文件管理上传的是用户程序,脚本及配置文件 操作功能:重命名、下载、删除。

  • 上传udf资源

和上传文件相同。

函数管理
  • 创建udf函数

点击“创建UDF函数”,输入udf函数参数,选择udf资源,点击“提交”,创建udf函数。 目前只支持HIVE的临时UDF函数

  • UDF函数名称:输入UDF函数时的名称
  • 包名类名:输入UDF函数的全路径
  • UDF资源:设置创建的UDF对应的资源文件

img

监控中心

服务管理

  • 服务管理主要是对系统中的各个服务的健康状况和基本信息的监控和显示
master监控
  • 主要是master的相关信息。

img

worker监控
  • 主要是worker的相关信息。

img

Zookeeper监控
  • 主要是zookpeeper中各个worker和master的相关配置信息。

img

DB监控
  • 主要是DB的健康状况

img

统计管理

img

  • 待执行命令数:统计t_ds_command表的数据
  • 执行失败的命令数:统计t_ds_error_command表的数据
  • 待运行任务数:统计Zookeeper中task_queue的数据
  • 待杀死任务数:统计Zookeeper中task_kill的数据

安全中心(权限系统)

  • 安全中心只有管理员账户才有权限操作,分别有队列管理、租户管理、用户管理、告警组管理、worker分组管理、令牌管理等功能,在用户管理模块可以对资源、数据源、项目等授权
  • 管理员登录,默认用户名密码:admin/dolphinscheduler123

创建队列

  • 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。
  • 管理员进入安全中心->队列管理页面,点击“创建队列”按钮,创建队列。

img

添加租户

  • 租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,则会导致任务运行失败。你可以通过修改 worker.properties 配置文件中参数 worker.tenant.auto.create=true 实现当 linux 用户不存在时自动创建该用户。worker.tenant.auto.create=true 参数会要求 worker 可以免密运行 sudo 命令。
  • 租户编码:租户编码是Linux上的用户,唯一,不能重复
  • 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。

img

创建普通用户

  • 用户分为管理员用户普通用户
    • 管理员有授权和用户管理等权限,没有创建项目和工作流定义的操作的权限。
    • 普通用户可以创建项目和对工作流定义的创建,编辑,执行等操作。
    • 注意:如果该用户切换了租户,则该用户所在租户下所有资源将复制到切换的新租户下。
  • 进入安全中心->用户管理页面,点击“创建用户”按钮,创建用户。

img

编辑用户信息
  • 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息。
  • 普通用户登录后,点击用户名下拉框中的用户信息,进入用户信息页面,点击"编辑"按钮,编辑用户信息。
修改用户密码
  • 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息时,输入新密码修改用户密码。
  • 普通用户登录后,点击用户名下拉框中的用户信息,进入修改密码页面,输入密码并确认密码后点击"编辑"按钮,则修改密码成功。

创建告警组

  • 告警组是在启动时设置的参数,在流程结束以后会将流程的状态和其他信息以邮件形式发送给告警组。
  • 管理员进入安全中心->告警组管理页面,点击“创建告警组”按钮,创建告警组。

img

令牌管理

由于后端接口有登录检查,令牌管理提供了一种可以通过调用接口的方式对系统进行各种操作。

  • 管理员进入安全中心->令牌管理页面,点击“创建令牌”按钮,选择失效时间与用户,点击"生成令牌"按钮,点击"提交"按钮,则选择用户的token创建成功。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQj8OzXt-1683205975817)(null)]

  • 普通用户登录后,点击用户名下拉框中的用户信息,进入令牌管理页面,选择失效时间,点击"生成令牌"按钮,点击"提交"按钮,则该用户创建token成功。
  • 调用示例:
    /\*\*
 \* test token
 \*/
    public  void doPOSTParam()throws Exception{
        // create HttpClient
        CloseableHttpClient httpclient = HttpClients.createDefault();

        // create http post request
        HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create");
        httpPost.setHeader("token", "123");
        // set parameters
        List<NameValuePair> parameters = new ArrayList<NameValuePair>();
        parameters.add(new BasicNameValuePair("projectName", "qzw"));
        parameters.add(new BasicNameValuePair("desc", "qzw"));
        UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
        httpPost.setEntity(formEntity);
        CloseableHttpResponse response = null;
        try {
            // execute
            response = httpclient.execute(httpPost);
            // response status code 200
            if (response.getStatusLine().getStatusCode() == 200) {
                String content = EntityUtils.toString(response.getEntity(), "UTF-8");
                System.out.println(content);
            }
        } finally {
            if (response != null) {
                response.close();
            }
            httpclient.close();
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

授予权限

  • 授予权限包括项目权限,资源权限,数据源权限,UDF函数权限。
  • 管理员可以对普通用户进行非其创建的项目、资源、数据源和UDF函数进行授权。因为项目、资源、数据源和UDF函数授权方式都是一样的,所以以项目授权为例介绍。
  • 注意:对于用户自己创建的项目,该用户拥有所有的权限。则项目列表和已选项目列表中不会显示。
  • 管理员进入安全中心->用户管理页面,点击需授权用户的“授权”按钮,如下图所示:

img

  • 选择项目,进行项目授权。

img

  • 资源、数据源、UDF函数授权同项目授权。

Worker分组

每个worker节点都会归属于自己的Worker分组,默认分组为default.

在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务.

新增/更新 worker分组

  • 打开要设置分组的worker节点上的"conf/worker.properties"配置文件. 修改worker.groups参数.
  • worker.groups参数后面对应的为该worker节点对应的分组名称,默认为default.
  • 如果该worker节点对应多个分组,则以逗号隔开.
示例: 
worker.groups=default,test

  • 1
  • 2
  • 3
  • 也可以在运行中修改worker所属的worker分组,如果修改成功,worker就会使用这个新建的分组,忽略worker.properties中的配置。修改步骤为"安全中心 -> worker分组管理 -> 点击 ‘新建worker分组’ -> 输入’组名称’ -> 选择已有worker -> 点击’提交’"

环境管理

  • 在线配置Worker运行环境,一个Worker可以指定多个环境,每个环境等价于dolphinscheduler_env.sh文件.
  • 默认环境为dolphinscheduler_env.sh文件.
  • 在任务执行时,可以将任务分配给指定worker分组,根据worker分组选择对应的环境,最终由该组中的worker节点执行环境后执行该任务.

创建/更新 环境

  • 环境配置等价于dolphinscheduler_env.sh文件内配置

img

使用 环境

  • 在工作流定义中创建任务节点选择Worker分组和Worker分组对应的环境,任务执行时Worker会先执行环境在执行任务.

img

API 调用

背景

一般都是通过页面来创建项目、流程等,但是与第三方系统集成就需要通过调用 API 来管理项目、流程

操作步骤

创建 token
  1. 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌

img

  1. 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”

img

使用 Token
  1. 打开 API文档页面

地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

img

  1. 选一个测试的接口,本次测试选取的接口是:查询所有项目

projects/query-project-list

  1. 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果
token:刚刚生成的Token

  • 1
  • 2

img

创建项目

这里以创建名为 “wudl-flink-test” 的项目为例

img

img

img

返回 msg 信息为 “success”,说明我们已经成功通过 API 的方式创建了项目。

如果您对创建项目的源码感兴趣,欢迎继续阅读下面内容

附:创建项目源码

img

img

Flink调用

调用 flink 操作步骤

创建队列
  1. 登录调度系统,点击 “安全中心”,再点击左侧的 “队列管理”,点击 “队列管理” 创建队列
  2. 填写队列名称和队列值,然后点击 “提交”

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QOXUFbby-1683205974909)(null)]

创建租户
1.租户对应的是 linux 用户, 用户 worker 提交作业所使用的的用户, 如果 linux 没有这个用户, worker 会在执行脚本的时候创建这个用户
2.租户和租户编码都是唯一不能重复,好比一个人有名字有身份证号。
3.创建完租户会在 hdfs 对应的目录上有相关的文件夹。

  • 1
  • 2
  • 3
  • 4

img

创建用户

img

创建 Token
  1. 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌

img

  1. 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”

img

使用 Token
  1. 打开 API文档页面

地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

img

  1. 选一个测试的接口,本次测试选取的接口是:查询所有项目

projects/query-project-list

  1. 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果
token: 刚刚生成的 Token

  • 1
  • 2

img

用户授权

img

用户登录
http://192.168.1.163:12345/dolphinscheduler/ui/#/monitor/servers/master

  • 1
  • 2

img

资源上传

img

创建工作流

img

img

img

img

查看执行结果

img

查看日志结果

img

(二)高级指南

系统架构设计

本章节介绍Apache DolphinScheduler调度系统架构

1.系统架构
1.1 系统架构图

系统架构图

系统架构图

1.2 启动流程活动图

Start process activity diagram

启动流程活动图

1.3 架构说明
  • MasterServer

MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。

该服务内主要包含:
+ **Distributed Quartz**分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作
+ **MasterSchedulerService**是一个扫描线程,定时扫描数据库中的 **command** 表,生成工作流实例,根据不同的**命令类型**进行不同的业务操作
+ **WorkflowExecuteThread**主要是负责DAG任务切分、任务提交、各种不同命令类型的逻辑处理,处理任务状态和工作流状态事件
+ **EventExecuteService**处理master负责的工作流实例所有的状态变化事件,使用线程池处理工作流的状态事件
+ **StateWheelExecuteThread**处理依赖任务和超时任务的定时状态更新
  • 1
  • 2
  • 3
  • 4
  • 5
  • WorkerServer

WorkerServer也采用分布式无中心设计理念,支持自定义任务插件,主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

该服务包含:
+ **WorkerManagerThread**主要通过netty领取master发送过来的任务,并根据不同任务类型调用**TaskExecuteThread**对应执行器。
+ **RetryReportTaskStatusThread**主要通过netty向master汇报任务状态,如果汇报失败,会一直重试汇报
+ **LoggerServer**是一个日志服务,提供日志分片查看、刷新和下载等功能
  • 1
  • 2
  • 3
  • Registry

注册中心,使用插件化实现,默认支持Zookeeper, 系统中的MasterServer和WorkerServer节点通过注册中心来进行集群管理和容错。另外系统还基于注册中心进行事件监听和分布式锁。

  • Alert

提供告警相关功能,仅支持单机服务。支持自定义告警插件。

  • API

API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

  • UI

系统的前端页面,提供系统的各种可视化操作界面,详见系统使用手册部分。

1.4 架构设计思想
一、去中心化vs中心化
中心化思想

中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:

master-slave角色

  • Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
  • Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。

中心化思想设计存在的问题:

  • 一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
  • 另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。
去中心化

去中心化

  • 在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。
  • 去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。
  • 实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go语言实现的Etcd。
  • DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,使用分片机制,公平分配工作流在master上执行,并通过不同的发送策略将任务发送给worker执行具体的任务
二、Master执行流程
  1. DolphinScheduler使用分片算法将command取模,根据master的排序id分配,master将拿到的command转换成工作流实例,使用线程池处理工作流实例
  2. DolphinScheduler对工作流的处理流程:
  • 通过UI或者API调用,启动工作流,持久化一条command到数据库中
  • Master通过分片算法,扫描Command表,生成工作流实例ProcessInstance,同时删除Command数据
  • Master使用线程池运行WorkflowExecuteThread,执行工作流实例的流程,包括构建DAG,创建任务实例TaskInstance,将TaskInstance通过netty发送给worker
  • Worker收到任务以后,修改任务状态,并将执行信息返回Master
  • Master收到任务信息,持久化到数据库,并且将状态变化事件存入EventExecuteService事件队列
  • EventExecuteService根据事件队列调用WorkflowExecuteThread进行后续任务的提交和工作流状态的修改
三、容错设计

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况

1. 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

DolphinScheduler容错设计

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。

  • Master容错流程图:

Master容错流程图

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

  • Worker容错流程图:

Worker容错流程图

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。

注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。

2.任务失败重试

这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

  • 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
  • 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行从当前节点开始执行
  • 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行

接下来说正题,我们将工作流中的任务节点分了两种类型。

  • 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
  • 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。

所有任务都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。

如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作

四、任务优先级设计

在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:

  • 按照

不同流程实例优先级

优先于

同一个流程实例优先级

优先于

同一流程内任务优先级

优先于

同一流程内任务

提交顺序依次从高到低进行任务处理。

+ 具体实现是根据任务实例的json解析优先级,然后把**流程实例优先级\_流程实例id\_任务优先级\_任务id**信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务


	- 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
	
	 ![流程优先级配置](https://img-blog.csdnimg.cn/img_convert/ce19d6317201e28d868832b5151b6697.png)
	- 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
	
	 ![任务优先级配置](https://img-blog.csdnimg.cn/img_convert/8329e7570c0f8e4d88bc1706ed81cbb8.png)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
五、Logback和netty实现日志访问
  • 由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:
  • 将日志放到ES搜索引擎上
  • 通过netty通信获取远程日志信息
  • 介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。

grpc远程访问

  • 我们使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件。
  • FileAppender主要实现如下:
/\*\*
 \* task log appender
 \*/
public class TaskLogAppender extends FileAppender<ILoggingEvent> {

    ...

   @Override
   protected void append(ILoggingEvent event) {

       if (currentlyActiveFile == null){
           currentlyActiveFile = getFile();
       }
       String activeFile = currentlyActiveFile;
       // thread name: taskThreadName-processDefineId\_processInstanceId\_taskInstanceId
       String threadName = event.getThreadName();
       String[] threadNameArr = threadName.split("-");
       // logId = processDefineId\_processInstanceId\_taskInstanceId
       String logId = threadNameArr[1];
       ...
       super.subAppend(event);
   }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

以/流程定义id/流程实例id/任务实例id.log的形式生成日志

  • 过滤匹配以TaskLogInfo开始的线程名称:
  • TaskLogFilter实现如下:
/\*\*
\* task log filter
\*/
public class TaskLogFilter extends Filter<ILoggingEvent> {

   @Override
   public FilterReply decide(ILoggingEvent event) {
       if (event.getThreadName().startsWith("TaskLogInfo-")){
           return FilterReply.ACCEPT;
       }
       return FilterReply.DENY;
   }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

Dolphin Scheduler 2.0元数据文档

表概览
表名表信息
t_ds_access_token访问ds后端的token
t_ds_alert告警信息
t_ds_alertgroup告警组
t_ds_command执行命令
t_ds_datasource数据源
t_ds_error_command错误命令
t_ds_process_definition流程定义
t_ds_process_instance流程实例
t_ds_project项目
t_ds_queue队列
t_ds_relation_datasource_user用户关联数据源
t_ds_relation_process_instance子流程
t_ds_relation_project_user用户关联项目
t_ds_relation_resources_user用户关联资源
t_ds_relation_udfs_user用户关联UDF函数
t_ds_relation_user_alertgroup用户关联告警组
t_ds_resources资源文件
t_ds_schedules流程定时调度
t_ds_session用户登录的session
t_ds_task_instance任务实例
t_ds_tenant租户
t_ds_udfsUDF资源
t_ds_user用户
t_ds_versionds版本信息
用户 队列 数据源

image.png

  • 一个租户下可以有多个用户
  • t_ds_user中的queue字段存储的是队列表中的queue_name信息,t_ds_tenant下存的是queue_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列
  • t_ds_datasource表中的user_id字段表示创建该数据源的用户,t_ds_relation_datasource_user中的user_id表示,对数据源有权限的用户
项目 资源 告警

image.png

  • 一个用户可以有多个项目,用户项目授权通过t_ds_relation_project_user表完成project_id和user_id的关系绑定
  • t_ds_projcet表中的user_id表示创建该项目的用户,t_ds_relation_project_user表中的user_id表示对项目有权限的用户
  • t_ds_resources表中的user_id表示创建该资源的用户,t_ds_relation_resources_user中的user_id表示对资源有权限的用户
  • t_ds_udfs表中的user_id表示创建该UDF的用户,t_ds_relation_udfs_user表中的user_id表示对UDF有权限的用户
命令 流程 任务

image.png
image.png

  • 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例
  • t_ds_schedulers表存放流程定义的定时调度信息
  • t_ds_relation_process_instance表存放的数据用于处理流程定义中含有子流程的情况,parent_process_instance_id表示含有子流程的主流程实例id,process_instance_id表示子流程实例的id,parent_task_instance_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t_ds_process_instance表和t_ds_task_instance表
核心表Schema
t_ds_process_definition
字段类型注释
idint主键
namevarchar流程定义名称
versionint流程定义版本
release_statetinyint流程定义的发布状态:0 未上线 1已上线
project_idint项目id
user_idint流程定义所属用户id
process_definition_jsonlongtext流程定义json串
descriptiontext流程定义描述
global_paramstext全局参数
flagtinyint流程是否可用:0 不可用,1 可用
locationstext节点坐标信息
connectstext节点连线信息
receiverstext收件人
receivers_cctext抄送人
create_timedatetime创建时间
timeoutint超时时间
tenant_idint租户id
update_timedatetime更新时间
modify_byvarchar修改用户
resource_idsvarchar资源id集
t_ds_process_instance
字段类型注释
idint主键
namevarchar流程实例名称
process_definition_idint流程定义id
statetinyint流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
recoverytinyint流程实例容错标识:0 正常,1 需要被容错重启
start_timedatetime流程实例开始时间
end_timedatetime流程实例结束时间
run_timesint流程实例运行次数
hostvarchar流程实例所在的机器
command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
command_paramtext命令的参数(json格式)
task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
max_try_timestinyint最大重试次数
failure_strategytinyint失败策略 0 失败后结束,1 失败后继续
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组id
schedule_timedatetime预期运行时间
command_start_timedatetime开始命令时间
global_paramstext全局参数(固化流程定义的参数)
process_instance_jsonlongtext流程实例json(copy的流程定义的json)
flagtinyint是否可用,1 可用,0不可用
update_timetimestamp更新时间
is_sub_processint是否是子工作流 1 是,0 不是
executor_idint命令执行用户
locationstext节点坐标信息
connectstext节点连线信息
history_cmdtext历史命令,记录所有对流程实例的操作
dependence_schedule_timestext依赖节点的预估时间
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
timeoutint超时时间
tenant_idint租户id
t_ds_task_instance
字段类型注释
idint主键
namevarchar任务名称
task_typevarchar任务类型
process_definition_idint流程定义id
process_instance_idint流程实例id
task_jsonlongtext任务节点json
statetinyint任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成
submit_timedatetime任务提交时间
start_timedatetime任务开始时间
end_timedatetime任务结束时间
hostvarchar执行任务的机器
execute_pathvarchar任务执行路径
log_pathvarchar任务日志路径
alert_flagtinyint是否告警
retry_timesint重试次数
pidint进程pid
app_linkvarcharyarn app id
flagtinyint是否可用:0 不可用,1 可用
retry_intervalint重试间隔
max_retry_timesint最大重试次数
task_instance_priorityint任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
t_ds_schedules
字段类型注释
idint主键
process_definition_idint流程定义id
start_timedatetime调度开始时间
end_timedatetime调度结束时间
crontabvarcharcrontab 表达式
failure_strategytinyint失败策略: 0 结束,1 继续
user_idint用户id
release_statetinyint状态:0 未上线,1 上线
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组id
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组
create_timedatetime创建时间
update_timedatetime更新时间
t_ds_command
字段类型注释
idint主键
command_typetinyint命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程
process_definition_idint流程定义id
command_paramtext命令的参数(json格式)
task_depend_typetinyint节点依赖类型:0 当前节点,1 向前执行,2 向后执行
failure_strategytinyint失败策略:0结束,1继续
warning_typetinyint告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发
warning_group_idint告警组
schedule_timedatetime预期运行时间
start_timedatetime开始时间
executor_idint执行用户id
dependencevarchar依赖字段
update_timedatetime更新时间
process_instance_priorityint流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest
worker_groupvarchar任务指定运行的worker分组

配置文件

前言

本文档为dolphinscheduler配置文件说明文档,针对版本为 dolphinscheduler-1.3.x 版本.

目录结构

目前dolphinscheduler 所有的配置文件都在 [conf ] 目录中. 为了更直观的了解[conf]目录所在的位置以及包含的配置文件,请查看下面dolphinscheduler安装目录的简化说明. 本文主要讲述dolphinscheduler的配置文件.其他部分先不做赘述.

[注:以下 dolphinscheduler 简称为DS.]

├─bin                               DS命令存放目录
│  ├─dolphinscheduler-daemon.sh         启动/关闭DS服务脚本
│  ├─start-all.sh                       根据配置文件启动所有DS服务
│  ├─stop-all.sh                        根据配置文件关闭所有DS服务
├─conf                              配置文件目录
│  ├─application-api.properties         api服务配置文件
│  ├─datasource.properties              数据库配置文件
│  ├─zookeeper.properties               zookeeper配置文件
│  ├─master.properties                  master服务配置文件
│  ├─worker.properties                  worker服务配置文件
│  ├─quartz.properties                  quartz服务配置文件
│  ├─common.properties                  公共服务[存储]配置文件
│  ├─alert.properties                   alert服务配置文件
│  ├─config                             环境变量配置文件夹
│      ├─install_config.conf                DS环境变量配置脚本[用于DS安装/启动]
│  ├─env                                运行脚本环境变量配置目录
│      ├─dolphinscheduler_env.sh            运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
│  ├─org                                mybatis mapper文件目录
│  ├─i18n                               i18n配置文件目录
│  ├─logback-api.xml                    api服务日志配置文件
│  ├─logback-master.xml                 master服务日志配置文件
│  ├─logback-worker.xml                 worker服务日志配置文件
│  ├─logback-alert.xml                  alert服务日志配置文件
├─sql                               DS的元数据创建升级sql文件
│  ├─create                             创建SQL脚本目录
│  ├─upgrade                            升级SQL脚本目录
│  ├─dolphinscheduler_postgre.sql       postgre数据库初始化脚本
│  ├─dolphinscheduler_mysql.sql         mysql数据库初始化脚本
│  ├─soft_version                       当前DS版本标识文件
├─script                            DS服务部署,数据库创建/升级脚本目录
│  ├─create-dolphinscheduler.sh         DS数据库初始化脚本      
│  ├─upgrade-dolphinscheduler.sh        DS数据库升级脚本                
│  ├─monitor-server.sh                  DS服务监控启动脚本               
│  ├─scp-hosts.sh                       安装文件传输脚本                                                    
│  ├─remove-zk-node.sh                  清理zookeeper缓存文件脚本       
├─ui                                前端WEB资源目录
├─lib                               DS依赖的jar存放目录
├─install.sh                        自动安装DS服务脚本

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
配置文件详解
序号服务分类配置文件
1启动/关闭DS服务脚本dolphinscheduler-daemon.sh
2数据库连接配置datasource.properties
3zookeeper连接配置zookeeper.properties
4公共[存储]配置common.properties
5API服务配置application-api.properties
6Master服务配置master.properties
7Worker服务配置worker.properties
8Alert 服务配置alert.properties
9Quartz配置quartz.properties
10DS环境变量配置脚本[用于DS安装/启动]install_config.conf
11运行脚本加载环境变量配置文件 [如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …]dolphinscheduler_env.sh
12各服务日志配置文件api服务日志配置文件 : logback-api.xml master服务日志配置文件 : logback-master.xml worker服务日志配置文件 : logback-worker.xml alert服务日志配置文件 : logback-alert.xml
1.dolphinscheduler-daemon.sh [启动/关闭DS服务脚本]

dolphinscheduler-daemon.sh脚本负责DS的启动&关闭. start-all.sh/stop-all.sh最终也是通过dolphinscheduler-daemon.sh对集群进行启动/关闭操作. 目前DS只是做了一个基本的设置,JVM参数请根据各自资源的实际情况自行设置.

默认简化参数如下:

export DOLPHINSCHEDULER\_OPTS="
-server 
-Xmx16g 
-Xms1g 
-Xss512k 
-XX:+UseConcMarkSweepGC 
-XX:+CMSParallelRemarkEnabled 
-XX:+UseFastAccessorMethods 
-XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=70
"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

不建议设置"-XX:DisableExplicitGC" , DS使用Netty进行通讯,设置该参数,可能会导致内存泄漏.

2.datasource.properties [数据库连接]

在DS中使用Druid对数据库连接进行管理,默认简化配置如下.

参数默认值描述
spring.datasource.driver-class-name数据库驱动
spring.datasource.url数据库连接地址
spring.datasource.username数据库用户名
spring.datasource.password数据库密码
spring.datasource.initialSize5初始连接池数量
spring.datasource.minIdle5最小连接池数量
spring.datasource.maxActive5最大连接池数量
spring.datasource.maxWait60000最大等待时长
spring.datasource.timeBetweenEvictionRunsMillis60000连接检测周期
spring.datasource.timeBetweenConnectErrorMillis60000重试间隔
spring.datasource.minEvictableIdleTimeMillis300000连接保持空闲而不被驱逐的最小时间
spring.datasource.validationQuerySELECT 1检测连接是否有效的sql
spring.datasource.validationQueryTimeout3检测连接是否有效的超时时间[seconds]
spring.datasource.testWhileIdletrue申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
spring.datasource.testOnBorrowtrue申请连接时执行validationQuery检测连接是否有效
spring.datasource.testOnReturnfalse归还连接时执行validationQuery检测连接是否有效
spring.datasource.defaultAutoCommittrue是否开启自动提交
spring.datasource.keepAlivetrue连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作。
spring.datasource.poolPreparedStatementstrue开启PSCache
spring.datasource.maxPoolPreparedStatementPerConnectionSize20要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。
3.zookeeper.properties [zookeeper连接配置]
参数默认值描述
zookeeper.quorumlocalhost:2181zk集群连接信息
zookeeper.dolphinscheduler.root/dolphinschedulerDS在zookeeper存储根目录
zookeeper.session.timeout60000session 超时
zookeeper.connection.timeout30000连接超时
zookeeper.retry.base.sleep100基本重试时间差
zookeeper.retry.max.sleep30000最大重试时间
zookeeper.retry.maxtime10最大重试次数
4.common.properties [hadoop、s3、yarn配置]

common.properties配置文件目前主要是配置hadoop/s3a相关的配置.

参数默认值描述
data.basedir.path/tmp/dolphinscheduler本地工作目录,用于存放临时文件
resource.storage.typeNONE资源文件存储类型: HDFS,S3,NONE
resource.upload.path/dolphinscheduler资源文件存储路径
hadoop.security.authentication.startup.statefalsehadoop是否开启kerberos权限
java.security.krb5.conf.path/opt/krb5.confkerberos配置目录
login.user.keytab.usernamehdfs-mycluster@ESZ.COMkerberos登录用户
login.user.keytab.path/opt/hdfs.headless.keytabkerberos登录用户keytab
kerberos.expire.time2kerberos过期时间,整数,单位为小时
resource.view.suffixstxt,log,sh,conf,cfg,py,java,sql,hql,xml,properties资源中心支持的文件格式
hdfs.root.userhdfs如果存储类型为HDFS,需要配置拥有对应操作权限的用户
fs.defaultFShdfs://mycluster:8020请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
fs.s3a.endpoints3 endpoint地址
fs.s3a.access.keys3 access key
fs.s3a.secret.keys3 secret key
yarn.resourcemanager.ha.rm.idsyarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.application.status.addresshttp://ds1:8088/ws/v1/cluster/apps/%s如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.pathenv/dolphinscheduler_env.sh运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …]
development.statefalse是否处于开发模式
5.application-api.properties [API服务配置]
参数默认值描述
server.port12345api服务通讯端口
server.servlet.session.timeout7200session超时时间
server.servlet.context-path/dolphinscheduler请求路径
spring.servlet.multipart.max-file-size1024MB最大上传文件大小
spring.servlet.multipart.max-request-size1024MB最大请求大小
server.jetty.max-http-post-size5000000jetty服务最大发送请求大小
spring.messages.encodingUTF-8请求编码
spring.jackson.time-zoneGMT+8设置时区
spring.messages.basenamei18n/messagesi18n配置
security.authentication.typePASSWORD权限校验类型
6.master.properties [Master服务配置]
参数默认值描述
master.listen.port5678master监听端口
master.exec.threads100master工作线程数量,用于限制并行的流程实例数量
master.exec.task.num20master每个流程实例的并行任务数量
master.dispatch.task.num3master每个批次的派发任务数量
master.host.selectorLowerWeightmaster host选择器,用于选择合适的worker执行任务,可选值: Random, RoundRobin, LowerWeight
master.heartbeat.interval10master心跳间隔,单位为秒
master.task.commit.retryTimes5任务重试次数
master.task.commit.interval1000任务提交间隔,单位为毫秒
master.max.cpuload.avg-1master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2
master.reserved.memory0.3master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G
7.worker.properties [Worker服务配置]
参数默认值描述
worker.listen.port1234worker监听端口
worker.exec.threads100worker工作线程数量,用于限制并行的任务实例数量
worker.heartbeat.interval10worker心跳间隔,单位为秒
worker.max.cpuload.avg-1worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2
worker.reserved.memory0.3worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G
worker.groupsdefaultworker分组配置,逗号分隔,例如’worker.groups=default,test’ worker启动时会根据该配置自动加入对应的分组
8.alert.properties [Alert 告警服务配置]
参数默认值描述
alert.typeEMAIL告警类型
mail.protocolSMTP邮件服务器协议
mail.server.hostxxx.xxx.com邮件服务器地址
mail.server.port25邮件服务器端口
mail.senderxxx@xxx.com发送人邮箱
mail.userxxx@xxx.com发送人邮箱名称
mail.passwd111111发送人邮箱密码
mail.smtp.starttls.enabletrue邮箱是否开启tls
mail.smtp.ssl.enablefalse邮箱是否开启ssl
mail.smtp.ssl.trustxxx.xxx.com邮箱ssl白名单
xls.file.path/tmp/xls邮箱附件临时工作目录
以下为企业微信配置[选填]
enterprise.wechat.enablefalse企业微信是否启用
enterprise.wechat.corp.idxxxxxxx
enterprise.wechat.secretxxxxxxx
enterprise.wechat.agent.idxxxxxxx
enterprise.wechat.usersxxxxxxx
enterprise.wechat.token.urlhttps://qyapi.weixin.qq.com/cgi-bin/gettoken? corpid=corpId&corpsecret=secret
enterprise.wechat.push.urlhttps://qyapi.weixin.qq.com/cgi-bin/message/send? access_token=$token
enterprise.wechat.user.send.msg发送消息格式
enterprise.wechat.team.send.msg群发消息格式
plugin.dir/Users/xx/your/path/to/plugin/dir插件目录
9.quartz.properties [Quartz配置]

这里面主要是quartz配置,请结合实际业务场景&资源进行配置,本文暂时不做展开.

参数默认值描述
org.quartz.jobStore.driverDelegateClassorg.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.driverDelegateClassorg.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.scheduler.instanceNameDolphinScheduler
org.quartz.scheduler.instanceIdAUTO
org.quartz.scheduler.makeSchedulerThreadDaemontrue
org.quartz.jobStore.usePropertiesfalse
org.quartz.threadPool.classorg.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemonstrue
org.quartz.threadPool.threadCount25
org.quartz.threadPool.threadPriority5
org.quartz.jobStore.classorg.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.tablePrefixQRTZ_
org.quartz.jobStore.isClusteredtrue
org.quartz.jobStore.misfireThreshold60000
org.quartz.jobStore.clusterCheckinInterval5000
org.quartz.jobStore.acquireTriggersWithinLocktrue
org.quartz.jobStore.dataSourcemyDs
org.quartz.dataSource.myDs.connectionProvider.classorg.apache.dolphinscheduler.service.quartz.DruidConnectionProvider
10.install_config.conf [DS环境变量配置脚本[用于DS安装/启动]]

install_config.conf这个配置文件比较繁琐,这个文件主要有两个地方会用到.

  • 1.DS集群的自动安装.

调用install.sh脚本会自动加载该文件中的配置.并根据该文件中的内容自动配置上述的配置文件中的内容. 比如:dolphinscheduler-daemon.sh、datasource.properties、zookeeper.properties、common.properties、application-api.properties、master.properties、worker.properties、alert.properties、quartz.properties 等文件.

  • 2.DS集群的启动&关闭.

DS集群在启动&关闭的时候,会加载该配置文件中的masters,workers,alertServer,apiServers等参数,启动/关闭DS集群.

文件内容如下:

# 注意: 该配置文件中如果包含特殊字符,如: `.\*[]^${}\+?|()@#&`, 请转义,
# 示例: `[` 转义为 `\[`

# 数据库类型, 目前仅支持 postgresql 或者 mysql
dbtype="mysql"

# 数据库 地址 & 端口
dbhost="192.168.xx.xx:3306"

# 数据库 名称
dbname="dolphinscheduler"


# 数据库 用户名
username="xx"

# 数据库 密码
password="xx"

# Zookeeper地址
zkQuorum="192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181"

# 将DS安装到哪个目录,如: /data1\_1T/dolphinscheduler,
installPath="/data1\_1T/dolphinscheduler"

# 使用哪个用户部署
# 注意: 部署用户需要sudo 权限, 并且可以操作 hdfs .
# 如果使用hdfs的话,根目录必须使用该用户进行创建.否则会有权限相关的问题.
deployUser="dolphinscheduler"


# 以下为告警服务配置
# 邮件服务器地址
mailServerHost="smtp.exmail.qq.com"

# 邮件服务器 端口
mailServerPort="25"

# 发送者
mailSender="xxxxxxxxxx"

# 发送用户
mailUser="xxxxxxxxxx"

# 邮箱密码
mailPassword="xxxxxxxxxx"

# TLS协议的邮箱设置为true,否则设置为false
starttlsEnable="true"

# 开启SSL协议的邮箱配置为true,否则为false。注意: starttlsEnable和sslEnable不能同时为true
sslEnable="false"

# 邮件服务地址值,同 mailServerHost
sslTrust="smtp.exmail.qq.com"

#业务用到的比如sql等资源文件上传到哪里,可以设置:HDFS,S3,NONE。如果想上传到HDFS,请配置为HDFS;如果不需要资源上传功能请选择NONE。
resourceStorageType="NONE"

# if S3,write S3 address,HA,for example :s3a://dolphinscheduler,
# Note,s3 be sure to create the root directory /dolphinscheduler
defaultFS="hdfs://mycluster:8020"

# 如果resourceStorageType 为S3 需要配置的参数如下:
s3Endpoint="http://192.168.xx.xx:9010"
s3AccessKey="xxxxxxxxxx"
s3SecretKey="xxxxxxxxxx"

# 如果ResourceManager是HA,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx",否则如果是单ResourceManager或者根本没用到yarn,请配置yarnHaIps=""即可,如果没用到yarn,配置为""
yarnHaIps="192.168.xx.xx,192.168.xx.xx"

# 如果是单ResourceManager,则配置为ResourceManager节点ip或主机名,否则保持默认值即可。
singleYarnIp="yarnIp1"

# 资源文件在 HDFS/S3 存储路径
resourceUploadPath="/dolphinscheduler"


# HDFS/S3 操作用户
hdfsRootUser="hdfs"

# 以下为 kerberos 配置

# kerberos是否开启
kerberosStartUp="false"
# kdc krb5 config file path
krb5ConfPath="$installPath/conf/krb5.conf"
# keytab username
keytabUserName="hdfs-mycluster@ESZ.COM"
# username keytab path
keytabPath="$installPath/conf/hdfs.headless.keytab"


# api 服务端口
apiServerPort="12345"


# 部署DS的所有主机hostname
ips="ds1,ds2,ds3,ds4,ds5"

# ssh 端口 , 默认 22
sshPort="22"

# 部署master服务主机
masters="ds1,ds2"

# 部署 worker服务的主机
# 注意: 每一个worker都需要设置一个worker 分组的名称,默认值为 "default"
workers="ds1:default,ds2:default,ds3:default,ds4:default,ds5:default"

# 部署alert服务主机
alertServer="ds3"

# 部署api服务主机 
apiServers="ds1"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
11.dolphinscheduler_env.sh [环境变量配置]

通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中. 涉及到的任务类型有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等

export HADOOP\_HOME=/opt/soft/hadoop
export HADOOP\_CONF\_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK\_HOME1=/opt/soft/spark1
export SPARK\_HOME2=/opt/soft/spark2
export PYTHON\_HOME=/opt/soft/python
export JAVA\_HOME=/opt/soft/java
export HIVE\_HOME=/opt/soft/hive
export FLINK\_HOME=/opt/soft/flink
export DATAX\_HOME=/opt/soft/datax/bin/datax.py

export PATH=$HADOOP\_HOME/bin:$SPARK\_HOME1/bin:$SPARK\_HOME2/bin:$PYTHON\_HOME:$JAVA\_HOME/bin:$HIVE\_HOME/bin:$PATH:$FLINK\_HOME/bin:$DATAX\_HOME:$PATH

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
12.各服务日志配置文件
对应服务服务名称日志文件名
api服务日志配置文件logback-api.xml
master服务日志配置文件logback-master.xml
worker服务日志配置文件logback-worker.xml
alert服务日志配置文件logback-alert.xml

任务总体存储结构

在dolphinscheduler中创建的所有任务都保存在t_ds_process_definition 表中.

该数据库表结构如下表所示:

序号字段类型描述
1idint(11)主键
2namevarchar(255)流程定义名称
3versionint(11)流程定义版本
4release_statetinyint(4)流程定义的发布状态:0 未上线 , 1已上线
5project_idint(11)项目id
6user_idint(11)流程定义所属用户id
7process_definition_jsonlongtext流程定义JSON
8descriptiontext流程定义描述
9global_paramstext全局参数
10flagtinyint(4)流程是否可用:0 不可用,1 可用
11locationstext节点坐标信息
12connectstext节点连线信息
13receiverstext收件人
14receivers_cctext抄送人
15create_timedatetime创建时间
16timeoutint(11)超时时间
17tenant_idint(11)租户id
18update_timedatetime更新时间
19modify_byvarchar(36)修改用户
20resource_idsvarchar(255)资源ids

其中process_definition_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.

公共的数据结构如下表.

序号字段类型描述
1globalParamsArray全局参数
2tasksArray流程中的任务集合 [ 各个类型的结构请参考如下章节]
3tenantIdint租户id
4timeoutint超时时间

数据示例:

{
    "globalParams":[
        {
            "prop":"golbal\_bizdate",
            "direct":"IN",
            "type":"VARCHAR",
            "value":"${system.biz.date}"
        }
    ],
    "tasks":Array[1],
    "tenantId":0,
    "timeout":0
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

任务结构

各任务类型存储结构详解

Shell节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringShell脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14maxRetryTimesString最大重试次数
15retryIntervalString重试间隔
16timeoutObject超时控制
17taskInstancePriorityString任务优先级
18workerGroupStringWorker 分组
19preTasksArray前置任务

节点数据样例:

{
    "type":"SHELL",
    "id":"tasks-80760",
    "name":"Shell Task",
    "params":{
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "rawScript":"echo "This is a shell script""
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
SQL节点

通过 SQL对指定的数据源进行数据查询、更新操作.

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码

img
img

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

Array | 流程中的任务集合 [ 各个类型的结构请参考如下章节] |
| 3 | tenantId | int | 租户id |
| 4 | timeout | int | 超时时间 |

数据示例:

{
    "globalParams":[
        {
            "prop":"golbal\_bizdate",
            "direct":"IN",
            "type":"VARCHAR",
            "value":"${system.biz.date}"
        }
    ],
    "tasks":Array[1],
    "tenantId":0,
    "timeout":0
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

任务结构

各任务类型存储结构详解

Shell节点

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码
2typeString类型SHELL
3nameString名称
4paramsObject自定义参数Json 格式
5rawScriptStringShell脚本
6localParamsArray自定义参数
7resourceListArray资源文件
8descriptionString描述
9runFlagString运行标识
10conditionResultObject条件分支
11successNodeArray成功跳转节点
12failedNodeArray失败跳转节点
13dependenceObject任务依赖与params互斥
14maxRetryTimesString最大重试次数
15retryIntervalString重试间隔
16timeoutObject超时控制
17taskInstancePriorityString任务优先级
18workerGroupStringWorker 分组
19preTasksArray前置任务

节点数据样例:

{
    "type":"SHELL",
    "id":"tasks-80760",
    "name":"Shell Task",
    "params":{
        "resourceList":[
            {
                "id":3,
                "name":"run.sh",
                "res":"run.sh"
            }
        ],
        "localParams":[

        ],
        "rawScript":"echo "This is a shell script""
    },
    "description":"",
    "runFlag":"NORMAL",
    "conditionResult":{
        "successNode":[
            ""
        ],
        "failedNode":[
            ""
        ]
    },
    "dependence":{

    },
    "maxRetryTimes":"0",
    "retryInterval":"1",
    "timeout":{
        "strategy":"",
        "interval":null,
        "enable":false
    },
    "taskInstancePriority":"MEDIUM",
    "workerGroup":"default",
    "preTasks":[

    ]
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
SQL节点

通过 SQL对指定的数据源进行数据查询、更新操作.

节点数据结构如下:

序号参数名类型描述描述
1idString任务编码

[外链图片转存中…(img-pxbvrjrV-1714401712376)]
[外链图片转存中…(img-VROvaK4s-1714401712377)]

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/944970
推荐阅读
相关标签
  

闽ICP备14008679号