赞
踩
Apache DolphinScheduler 是一个分布式去中心化,易扩展的可视化 DAG 工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。
近日,伯毅同学给社区贡献了工作流核心表结构的剖析文章,非常细致,喜欢的伙伴请转走
在 dolphinscheduler 库中创建的所有工作流定义(模板)都保存在 t_ds_process_definition 表中.
该数据库表结构如下表所示:
序号 | 字段 | 类型 | 描述 |
---|---|---|---|
1 | id | int(11) | 主键 |
2 | name | varchar(255) | 流程定义名称 |
3 | version | int(11) | 流程定义版本 |
4 | release_state | tinyint(4) | 流程定义的发布状态:0 未上线 , 1已上线 |
5 | project_id | int(11) | 项目id |
6 | user_id | int(11) | 流程定义所属用户id |
7 | process_definition_json | longtext | 流程定义JSON |
8 | description | text | 流程定义描述 |
9 | global_params | text | 全局参数 |
10 | flag | tinyint(4) | 流程是否可用:0 不可用,1 可用 |
11 | locations | text | 节点坐标信息 |
12 | connects | text | 节点连线信息 |
13 | receivers | text | 收件人 |
14 | receivers_cc | text | 抄送人 |
15 | create_time | datetime | 创建时间 |
16 | timeout | int(11) | 超时时间 |
17 | tenant_id | int(11) | 租户id |
18 | update_time | datetime | 更新时间 |
19 | modify_by | varchar(36) | 修改用户 |
20 | resource_ids | varchar(255) | 资源ids |
其中 process_definition_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.
公共的数据结构如下表:
序号 | 字段 | 类型 | 描述 |
---|---|---|---|
1 | globalParams | Array | 全局参数 |
2 | tasks | Array | 流程中的任务集合 [ 各个类型的结构请参考如下章节] |
3 | tenantId | int | 租户id |
4 | timeout | int | 超时时间 |
数据示例:
{
"globalParams":[
{
"prop":"golbal_bizdate",
"direct":"IN",
"type":"VARCHAR",
"value":"${system.biz.date}"
}
],
"tasks":Array[1],
"tenantId":0,
"timeout":0
}
** Shell 节点数据结构如下:**
序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 | ||
2 | type | String | 类型 | SHELL | |
3 | name | String | 名称 | ||
4 | params | Object | 自定义参数 | Json 格式 | |
5 | rawScript | String | Shell脚本 | ||
6 | localParams | Array | 自定义参数 | ||
7 | resourceList | Array | 资源文件 | ||
8 | description | String | 描述 | ||
9 | runFlag | String | 运行标识 | ||
10 | conditionResult | Object | 条件分支 | ||
11 | successNode | Array | 成功跳转节点 | ||
12 | failedNode | Array | 失败跳转节点 | ||
13 | dependence | Object | 任务依赖 | 与params互斥 | |
14 | maxRetryTimes | String | 最大重试次数 | ||
15 | retryInterval | String | 重试间隔 | ||
16 | timeout | Object | 超时控制 | ||
17 | taskInstancePriority | String | 任务优先级 | ||
18 | workerGroup | String | Worker 分组 | ||
19 | preTasks | Array | 前置任务 |
Shell 节点数据样例:
{ "type":"SHELL", "id":"tasks-80760", "name":"Shell Task", "params":{ "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "rawScript":"echo "This is a shell script"" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
通过 SQL 对指定的数据源进行数据查询、更新操作.
** SQL 节点数据结构如下:**
序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 | ||
2 | type | String | 类型 | SQL | |
3 | name | String | 名称 | ||
4 | params | Object | 自定义参数 | Json 格式 | |
5 | type | String | 数据库类型 | ||
6 | datasource | Int | 数据源id | ||
7 | sql | String | 查询SQL语句 | ||
8 | udfs | String | udf函数 | UDF函数id,以逗号分隔. | |
9 | sqlType | String | SQL节点类型 | 0 查询 , 1 非查询 | |
10 | title | String | 邮件标题 | ||
11 | receivers | String | 收件人 | ||
12 | receiversCc | String | 抄送人 | ||
13 | showType | String | 邮件显示类型 | TABLE 表格 , ATTACHMENT附件 | |
14 | connParams | String | 连接参数 | ||
15 | preStatements | Array | 前置SQL | ||
16 | postStatements | Array | 后置SQL | ||
17 | localParams | Array | 自定义参数 | ||
18 | description | String | 描述 | ||
19 | runFlag | String | 运行标识 | ||
20 | conditionResult | Object | 条件分支 | ||
21 | successNode | Array | 成功跳转节点 | ||
22 | failedNode | Array | 失败跳转节点 | ||
23 | dependence | Object | 任务依赖 | 与params互斥 | |
24 | maxRetryTimes | String | 最大重试次数 | ||
25 | retryInterval | String | 重试间隔 | ||
26 | timeout | Object | 超时控制 | ||
27 | taskInstancePriority | String | 任务优先级 | ||
28 | workerGroup | String | Worker 分组 | ||
29 | preTasks | Array | 前置任务 |
** SQL 节点数据样例:**
{ "type":"SQL", "id":"tasks-95648", "name":"SqlTask-Query", "params":{ "type":"MYSQL", "datasource":1, "sql":"select id , namge , age from emp where id = ${id}", "udfs":"", "sqlType":"0", "title":"xxxx@xxx.com", "receivers":"xxxx@xxx.com", "receiversCc":"", "showType":"TABLE", "localParams":[ { "prop":"id", "direct":"IN", "type":"INTEGER", "value":"1" } ], "connParams":"", "preStatements":[ "insert into emp ( id,name ) value (1,'Li' )" ], "postStatements":[ ] }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
** Spark 节点数据结构如下:**
<序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 | ||
2 | type | String | 类型 | SPARK | |
3 | name | String | 名称 | ||
4 | params | Object | 自定义参数 | Json 格式 | |
5 | mainClass | String | 运行主类 | ||
6 | mainArgs | String | 运行参数 | ||
7 | others | String | 其他参数 | ||
8 | mainJar | Object | 程序 jar 包 | ||
9 | deployMode | String | 部署模式 | local,client,cluster | |
10 | driverCores | String | driver核数 | ||
11 | driverMemory | String | driver 内存数 | ||
12 | numExecutors | String | executor数量 | ||
13 | executorMemory | String | executor内存 | ||
14 | executorCores | String | executor核数 | ||
15 | programType | String | 程序类型 | JAVA,SCALA,PYTHON | |
16 | sparkVersion | String | Spark 版本 | SPARK1 , SPARK2 | |
17 | localParams | Array |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。