赞
踩
首页包含用户所有项目的任务状态统计、流程状态统计、工作流定义统计。
点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。
在项目管理页面点击项目名称链接,进入项目首页,如下图所示,项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。这几个指标的说明如下
shell节点,在worker执行的时候,会生成一个临时shell脚本,使用租户同名的linux用户执行这个脚本。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SAkHpS8X-1683205953749)(https://dolphinscheduler.apache.org/img/shell_dag.png)]
拖动工具栏中的任务节点到画板中,如下图所示:
拖动工具栏中的任务节点到画板中,如下图所示:
依赖节点提供了逻辑判断功能,比如检查昨天的B流程是否成功,或者C流程是否执行成功。
例如,A流程为周报任务,B、C流程为天任务,A任务需要B、C任务在上周的每一天都执行成功,如图示:
假如,周报A同时还需要自身在上周二执行成功:
拖动工具栏中的任务节点到画板中,如下图所示:
spark-submit
方式提交任务拖动工具栏中的任务节点到画板中,如下图所示:
注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Spark则没有主函数的class,其他都是一样
hadoop jar
方式提交任务拖动工具栏中的任务节点到画板中,如下图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p1oexZeW-1683205971444)(null)]
python **
方式提交任务。拖动工具栏中的任务节点到画板中,如下图所示:
__init__.py
文件[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D8Zut6Pc-1683205976158)(null)]
注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Flink则没有主函数的class,其他都是一样
变量名 | 声明方式 | 含义 |
---|---|---|
system.biz.date | ${system.biz.date} | 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1 |
system.biz.curdate | ${system.biz.curdate} | 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1 |
system.datetime | ${system.datetime} | 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1 |
[
.
.
.
]
格式的,
[…] 格式的,
[…]格式的,[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等
1.使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月
+ 后 N 年:$[add\_months(yyyyMMdd,12\*N)]
+ 前 N 年:$[add\_months(yyyyMMdd,-12\*N)]
+ 后 N 月:$[add\_months(yyyyMMdd,N)]
+ 前 N 月:$[add\_months(yyyyMMdd,-N)]
2.直接加减数字 在自定义格式后直接“+/-”数字
+ 后 N 周:$[yyyyMMdd+7\*N]
+ 前 N 周:$[yyyyMMdd-7\*N]
+ 后 N 天:$[yyyyMMdd+N]
+ 前 N 天:$[yyyyMMdd-N]
+ 后 N 小时:$[HHmmss+N/24]
+ 前 N 小时:$[HHmmss-N/24]
+ 后 N 分钟:$[HHmmss+N/24/60]
+ 前 N 分钟:$[HHmmss-N/24/60]
在工作流定义页面配置的参数,作用于该工作流中全部的任务
全局参数配置方式如下:在工作流定义页面,点击“设置全局”右边的加号,填写对应的变量名称和对应的值,保存即可
这里定义的global_bizdate参数可以被其它任一节点的局部参数引用,并设置global_bizdate的value为通过引用系统参数system.biz.date获得的值
在任务定义页面配置的参数,默认作用域仅限该任务,如果配置了参数传递则可将该参数作用到下游任务中。
本地参数配置方式如下:在任务定义页面,点击“自定义参数”右边的加号,填写对应的变量名称和对应的值,保存即可
如果想要在本地参数中调用系统内置参数,将内置参数对应的值填到value
中,如上图中的${biz_date}
以及${curdate}
DolphinScheduler 提供参数间相互引用的能力,包括:本地参数引用全局参数、上下游参数传递。因为有引用的存在,就涉及当参数名相同时,参数的优先级问题,详见参数优先级
本地任务引用全局参数的前提是,你已经定义了全局参数,使用方式和本地参数中的使用方式类似,但是参数的值需要配置成全局参数中的key
如上图中的${biz_date}
以及${curdate}
,就是本地参数引用全局参数的例子。观察上图的最后一行,local_param_bizdate通过
g
l
o
b
a
l
b
i
z
d
a
t
e
来引用全局参数,在
s
h
e
l
l
脚本中可以通过
{global_bizdate}来引用全局参数,在shell脚本中可以通过
globalbizdate来引用全局参数,在shell脚本中可以通过{local_param_bizdate}来引全局变量 global_bizdate的值,或通过JDBC直接将local_param_bizdate的值set进去。同理,local_param通过{local_param}引用上一节中定义的全局参数。biz_date、biz_curdate、system.datetime都是用户自定义的参数,通过{全局参数}进行赋值。
DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支持上游单向传递给下游。目前支持这个特性的任务类型有:
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。目前我们主要针对 SQL 和 SHELL 节点做了可以向下传递参数的功能。
prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。
如果 SQL 节点的结果只有一行,一个或多个字段,prop 的名字需要和字段名称一致。数据类型可选择为除 LIST 以外的其他类型。变量会选择 SQL 查询结果中的列名中与该变量名称相同的列对应的值。
如果 SQL 节点的结果为多行,一个或多个字段,prop 的名字需要和字段名称一致。数据类型选择为LIST。获取到 SQL 查询结果后会将对应列转化为 LIST,并将该结果转化为 JSON 后作为对应变量的值。
我们再以上图中包含 SQL 节点的流程举例说明:
上图中节点【createParam1】的定义如下:
节点【createParam2】的定义如下:
您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。
节点实例【createParam1】如下:
这里当然 “id” 的值会等于 12.
我们再来看节点实例【createParam2】的情况。
这里只有 “id” 的值。尽管用户定义的 sql 查到的是 “id” 和 “database_name” 两个字段,但是由于只定义了一个为 out 的变量 “id”,所以只会设置一个变量。由于显示的原因,这里已经替您查好了该 list 的长度为 10。
prop 为用户指定;方向选择为 OUT,只有当方向为 OUT 时才会被定义为变量输出;数据类型可以根据需要选择不同数据结构;value 部分不需要填写。
用户需要传递参数,在定义 shell 脚本时,需要输出格式为 ${setValue(key=value)} 的语句,key 为对应参数的 prop,value 为该参数的值。
例如下图中, 通过 echo '${setValue(trans=hello trans)}'
, 将’trans’设置为"hello trans", 在下游任务中就可以使用trans这个变量了:
shell 节点定义时当日志检测到 ${setValue(key=value1)} 的格式时,会将 value1 赋值给 key,下游节点便可以直接使用变量 key 的值。同样,您可以在【工作流实例】页面,找到对应的节点实例,便可以查看该变量的值。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6pUPPoeR-1683205971021)(null)]
DolphinScheduler 中所涉及的参数值的定义可能来自三种类型:
因为参数的值存在多个来源,当参数名相同时,就需要会存在参数优先级的问题。DolphinScheduler 参数的优先级从高到低为:全局参数 > 上游任务传递的参数 > 本地参数
在上游任务传递的参数的情况下,由于上游可能存在多个任务向下游传递参数。当上游传递的参数名称相同时:
下面例子向你展示如何使用任务参数传递的优先级问题
1:先以 shell 节点解释第一种情况
节点 【useParam】可以使用到节点【createParam】中设置的变量。而节点 【useParam】与节点【noUseParam】中并没有依赖关系,所以并不会获取到节点【noUseParam】的变量。上图中只是以 shell 节点作为例子,其他类型节点具有相同的使用规则。
其中节点【createParam】在使用变量时直接使用即可。另外该节点设置了 “key” 和 “key1” 两个变量,这里用户用定义了一个与上游节点传递的变量名相同的变量 key1,并且复制了值为 “12”,但是由于我们设置的优先级的关系,这里的值 “12” 会被抛弃,最终使用上游节点设置的变量值。
2:我们再以 sql 节点来解释另外一种情况
节点【use_create】的定义如下:
“status” 是当前节点设置的节点的自有变量。但是用户在保存时也同样设置了 “status” 变量,并且赋值为 -1。那在该 SQL 执行时,status 的值为优先级更高的 -1。抛弃了节点的自有变量的值。
这里的 “id” 是上游节点设置的变量,用户在节点【createParam1】、节点【createParam2】中设置了相同参数名 “id” 的参数。而节点【use_create】中使用了最先结束的【createParam1】的值。
数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源。
注意:如果开启了kerberos,则需要填写 Principal
注意:如果开启了kerberos,则需要填写 Principal
在2.0.0版本中,用户需要创建告警实例,然后同告警组进行关联,一个告警组可以使用多个告警实例,我们会逐一进行进行告警通知。
首先需要进入到安全中心,选择告警组管理,然后点击左侧的告警实例管理,然后创建一个告警实例,然后选择对应的告警插件,填写相关告警参数。
然后选择告警组管理,创建告警组,选择相应的告警实例即可。
如果您需要使用到企业微信进行告警,请在安装完成后,修改 alert.properties
文件,然后重启 alert 服务即可。企业微信的配置样例如下
# 设置企业微信告警功能是否开启:开启为 true,否则为 false enterprise.wechat.enable="true" # 设置 corpid,每个企业都拥有唯一的 corpid,获取此信息可在管理后台 “我的企业” - “企业信息” 下查看 “企业 ID”(需要有管理员权限) enterprise.wechat.corp.id="xxx" # 设置 secret,secret 是企业应用里面用于保障数据安全的 “钥匙”,每一个应用都有一个独立的访问密钥 enterprise.wechat.secret="xxx" # 设置 agentid,每个应用都有唯一的 agentid。在管理后台 -> “应用与小程序” -> “应用”,点进某个应用,即可看到 agentid enterprise.wechat.agent.id="xxxx" # 设置 userid,多个用逗号分隔。每个成员都有唯一的 userid,即所谓 “帐号”。在管理后台 -> “通讯录” -> 点进某个成员的详情页,可以看到 enterprise.wechat.users=zhangsan,lisi # 获取 access\_token 的地址,使用如下例子无需修改 enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret} # 发送应用消息地址,使用如下例子无需改动 enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token} # 发送消息格式,无需改动 enterprise.wechat.user.send.msg={\"touser\":\"{toUser}\",\"agentid\":\"{agentId}\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}
如果需要用到资源上传功能,针对单机可以选择本地文件目录作为上传文件夹(此操作不需要部署 Hadoop)。当然也可以选择上传到 Hadoop or MinIO 集群上,此时则需要有Hadoop (2.6+) 或者 MinIO 等相关环境
*注意:*
- 如果用到资源上传的功能,那么 安装部署中,部署用户需要有这部分的操作权限
- 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的
core-site.xml
和hdfs-site.xml
复制到/opt/dolphinscheduler/conf
,非 NameNode HA 跳过次步骤
conf/common.properties # Users who have permission to create directories under the HDFS root path hdfs.root.user=hdfs # data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended resource.upload.path=/dolphinscheduler # resource storage type : HDFS,S3,NONE resource.storage.type=HDFS # whether kerberos starts hadoop.security.authentication.startup.state=false # java.security.krb5.conf path java.security.krb5.conf.path=/opt/krb5.conf # loginUserFromKeytab user login.user.keytab.username=hdfs-mycluster@ESZ.COM # loginUserFromKeytab path login.user.keytab.path=/opt/hdfs.headless.keytab # if resource.storage.type is HDFS,and your Hadoop Cluster NameNode has HA enabled, you need to put core-site.xml and hdfs-site.xml in the installPath/conf directory. In this example, it is placed under /opt/soft/dolphinscheduler/conf, and configure the namenode cluster name; if the NameNode is not HA, modify it to a specific IP or host name. # if resource.storage.type is S3,write S3 address,HA,for example :s3a://dolphinscheduler, # Note,s3 be sure to create the root directory /dolphinscheduler fs.defaultFS=hdfs://mycluster:8020 #resourcemanager ha note this need ips , this empty if single yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx # If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine yarn.application.status.address=http://xxxx:8088/ws/v1/cluster/apps/%s
是对各种资源文件的管理,包括创建基本的txt/log/sh/conf/py/java等文件、上传jar包等各种类型文件,可进行编辑、重命名、下载、删除等操作。
文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties
上传文件:点击"上传文件"按钮进行上传,将文件拖拽到上传区域,文件名会自动以上传的文件名称补全
对可查看的文件类型,点击文件名称,可查看文件详情
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rUFXelKK-1683205976507)(null)]
点击文件列表的"下载"按钮下载文件或者在文件详情中点击右上角"下载"按钮下载文件
文件列表->点击"删除"按钮,删除指定文件
资源管理和文件管理功能类似,不同之处是资源管理是上传的UDF函数,文件管理上传的是用户程序,脚本及配置文件 操作功能:重命名、下载、删除。
和上传文件相同。
点击“创建UDF函数”,输入udf函数参数,选择udf资源,点击“提交”,创建udf函数。 目前只支持HIVE的临时UDF函数
worker.properties
配置文件中参数 worker.tenant.auto.create=true
实现当 linux 用户不存在时自动创建该用户。worker.tenant.auto.create=true
参数会要求 worker 可以免密运行 sudo
命令。由于后端接口有登录检查,令牌管理提供了一种可以通过调用接口的方式对系统进行各种操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQj8OzXt-1683205975817)(null)]
/\*\* \* test token \*/ public void doPOSTParam()throws Exception{ // create HttpClient CloseableHttpClient httpclient = HttpClients.createDefault(); // create http post request HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create"); httpPost.setHeader("token", "123"); // set parameters List<NameValuePair> parameters = new ArrayList<NameValuePair>(); parameters.add(new BasicNameValuePair("projectName", "qzw")); parameters.add(new BasicNameValuePair("desc", "qzw")); UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters); httpPost.setEntity(formEntity); CloseableHttpResponse response = null; try { // execute response = httpclient.execute(httpPost); // response status code 200 if (response.getStatusLine().getStatusCode() == 200) { String content = EntityUtils.toString(response.getEntity(), "UTF-8"); System.out.println(content); } } finally { if (response != null) { response.close(); } httpclient.close(); } }
每个worker节点都会归属于自己的Worker分组,默认分组为default.
在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务.
新增/更新 worker分组
示例:
worker.groups=default,test
worker.properties
中的配置。修改步骤为"安全中心 -> worker分组管理 -> 点击 ‘新建worker分组’ -> 输入’组名称’ -> 选择已有worker -> 点击’提交’"创建/更新 环境
使用 环境
一般都是通过页面来创建项目、流程等,但是与第三方系统集成就需要通过调用 API 来管理项目、流程
地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
projects/query-project-list
token:刚刚生成的Token
这里以创建名为 “wudl-flink-test” 的项目为例
返回 msg 信息为 “success”,说明我们已经成功通过 API 的方式创建了项目。
如果您对创建项目的源码感兴趣,欢迎继续阅读下面内容
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QOXUFbby-1683205974909)(null)]
1.租户对应的是 linux 用户, 用户 worker 提交作业所使用的的用户, 如果 linux 没有这个用户, worker 会在执行脚本的时候创建这个用户
2.租户和租户编码都是唯一不能重复,好比一个人有名字有身份证号。
3.创建完租户会在 hdfs 对应的目录上有相关的文件夹。
地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
projects/query-project-list
token: 刚刚生成的 Token
http://192.168.1.163:12345/dolphinscheduler/ui/#/monitor/servers/master
本章节介绍Apache DolphinScheduler调度系统架构
系统架构图
启动流程活动图
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。
+ **Distributed Quartz**分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作
+ **MasterSchedulerService**是一个扫描线程,定时扫描数据库中的 **command** 表,生成工作流实例,根据不同的**命令类型**进行不同的业务操作
+ **WorkflowExecuteThread**主要是负责DAG任务切分、任务提交、各种不同命令类型的逻辑处理,处理任务状态和工作流状态事件
+ **EventExecuteService**处理master负责的工作流实例所有的状态变化事件,使用线程池处理工作流的状态事件
+ **StateWheelExecuteThread**处理依赖任务和超时任务的定时状态更新
WorkerServer也采用分布式无中心设计理念,支持自定义任务插件,主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
+ **WorkerManagerThread**主要通过netty领取master发送过来的任务,并根据不同任务类型调用**TaskExecuteThread**对应执行器。
+ **RetryReportTaskStatusThread**主要通过netty向master汇报任务状态,如果汇报失败,会一直重试汇报
+ **LoggerServer**是一个日志服务,提供日志分片查看、刷新和下载等功能
注册中心,使用插件化实现,默认支持Zookeeper, 系统中的MasterServer和WorkerServer节点通过注册中心来进行集群管理和容错。另外系统还基于注册中心进行事件监听和分布式锁。
提供告警相关功能,仅支持单机服务。支持自定义告警插件。
API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。
系统的前端页面,提供系统的各种可视化操作界面,详见系统使用手册部分。
中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:
中心化思想设计存在的问题:
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:
其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。
ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。
注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:
接下来说正题,我们将工作流中的任务节点分了两种类型。
所有任务都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作
在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:
不同流程实例优先级
优先于
同一个流程实例优先级
优先于
同一流程内任务优先级
优先于
同一流程内任务
提交顺序依次从高到低进行任务处理。
+ 具体实现是根据任务实例的json解析优先级,然后把**流程实例优先级\_流程实例id\_任务优先级\_任务id**信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务
- 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
![流程优先级配置](https://img-blog.csdnimg.cn/img_convert/ce19d6317201e28d868832b5151b6697.png)
- 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
![任务优先级配置](https://img-blog.csdnimg.cn/img_convert/8329e7570c0f8e4d88bc1706ed81cbb8.png)
/\*\* \* task log appender \*/ public class TaskLogAppender extends FileAppender<ILoggingEvent> { ... @Override protected void append(ILoggingEvent event) { if (currentlyActiveFile == null){ currentlyActiveFile = getFile(); } String activeFile = currentlyActiveFile; // thread name: taskThreadName-processDefineId\_processInstanceId\_taskInstanceId String threadName = event.getThreadName(); String[] threadNameArr = threadName.split("-"); // logId = processDefineId\_processInstanceId\_taskInstanceId String logId = threadNameArr[1]; ... super.subAppend(event); } }
以/流程定义id/流程实例id/任务实例id.log的形式生成日志
/\*\*
\* task log filter
\*/
public class TaskLogFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
表名 | 表信息 |
---|---|
t_ds_access_token | 访问ds后端的token |
t_ds_alert | 告警信息 |
t_ds_alertgroup | 告警组 |
t_ds_command | 执行命令 |
t_ds_datasource | 数据源 |
t_ds_error_command | 错误命令 |
t_ds_process_definition | 流程定义 |
t_ds_process_instance | 流程实例 |
t_ds_project | 项目 |
t_ds_queue | 队列 |
t_ds_relation_datasource_user | 用户关联数据源 |
t_ds_relation_process_instance | 子流程 |
t_ds_relation_project_user | 用户关联项目 |
t_ds_relation_resources_user | 用户关联资源 |
t_ds_relation_udfs_user | 用户关联UDF函数 |
t_ds_relation_user_alertgroup | 用户关联告警组 |
t_ds_resources | 资源文件 |
t_ds_schedules | 流程定时调度 |
t_ds_session | 用户登录的session |
t_ds_task_instance | 任务实例 |
t_ds_tenant | 租户 |
t_ds_udfs | UDF资源 |
t_ds_user | 用户 |
t_ds_version | ds版本信息 |
字段 | 类型 | 注释 |
---|---|---|
id | int | 主键 |
name | varchar | 流程定义名称 |
version | int | 流程定义版本 |
release_state | tinyint | 流程定义的发布状态:0 未上线 1已上线 |
project_id | int | 项目id |
user_id | int | 流程定义所属用户id |
process_definition_json | longtext | 流程定义json串 |
description | text | 流程定义描述 |
global_params | text | 全局参数 |
flag | tinyint | 流程是否可用:0 不可用,1 可用 |
locations | text | 节点坐标信息 |
connects | text | 节点连线信息 |
receivers | text | 收件人 |
receivers_cc | text | 抄送人 |
create_time | datetime | 创建时间 |
timeout | int | 超时时间 |
tenant_id | int | 租户id |
update_time | datetime | 更新时间 |
modify_by | varchar | 修改用户 |
resource_ids | varchar | 资源id集 |
字段 | 类型 | 注释 |
---|---|---|
id | int | 主键 |
name | varchar | 流程实例名称 |
process_definition_id | int | 流程定义id |
state | tinyint | 流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 |
recovery | tinyint | 流程实例容错标识:0 正常,1 需要被容错重启 |
start_time | datetime | 流程实例开始时间 |
end_time | datetime | 流程实例结束时间 |
run_times | int | 流程实例运行次数 |
host | varchar | 流程实例所在的机器 |
command_type | tinyint | 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 |
command_param | text | 命令的参数(json格式) |
task_depend_type | tinyint | 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 |
max_try_times | tinyint | 最大重试次数 |
failure_strategy | tinyint | 失败策略 0 失败后结束,1 失败后继续 |
warning_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
warning_group_id | int | 告警组id |
schedule_time | datetime | 预期运行时间 |
command_start_time | datetime | 开始命令时间 |
global_params | text | 全局参数(固化流程定义的参数) |
process_instance_json | longtext | 流程实例json(copy的流程定义的json) |
flag | tinyint | 是否可用,1 可用,0不可用 |
update_time | timestamp | 更新时间 |
is_sub_process | int | 是否是子工作流 1 是,0 不是 |
executor_id | int | 命令执行用户 |
locations | text | 节点坐标信息 |
connects | text | 节点连线信息 |
history_cmd | text | 历史命令,记录所有对流程实例的操作 |
dependence_schedule_times | text | 依赖节点的预估时间 |
process_instance_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
timeout | int | 超时时间 |
tenant_id | int | 租户id |
字段 | 类型 | 注释 |
---|---|---|
id | int | 主键 |
name | varchar | 任务名称 |
task_type | varchar | 任务类型 |
process_definition_id | int | 流程定义id |
process_instance_id | int | 流程实例id |
task_json | longtext | 任务节点json |
state | tinyint | 任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 |
submit_time | datetime | 任务提交时间 |
start_time | datetime | 任务开始时间 |
end_time | datetime | 任务结束时间 |
host | varchar | 执行任务的机器 |
execute_path | varchar | 任务执行路径 |
log_path | varchar | 任务日志路径 |
alert_flag | tinyint | 是否告警 |
retry_times | int | 重试次数 |
pid | int | 进程pid |
app_link | varchar | yarn app id |
flag | tinyint | 是否可用:0 不可用,1 可用 |
retry_interval | int | 重试间隔 |
max_retry_times | int | 最大重试次数 |
task_instance_priority | int | 任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
字段 | 类型 | 注释 |
---|---|---|
id | int | 主键 |
process_definition_id | int | 流程定义id |
start_time | datetime | 调度开始时间 |
end_time | datetime | 调度结束时间 |
crontab | varchar | crontab 表达式 |
failure_strategy | tinyint | 失败策略: 0 结束,1 继续 |
user_id | int | 用户id |
release_state | tinyint | 状态:0 未上线,1 上线 |
warning_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
warning_group_id | int | 告警组id |
process_instance_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
create_time | datetime | 创建时间 |
update_time | datetime | 更新时间 |
字段 | 类型 | 注释 |
---|---|---|
id | int | 主键 |
command_type | tinyint | 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 |
process_definition_id | int | 流程定义id |
command_param | text | 命令的参数(json格式) |
task_depend_type | tinyint | 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 |
failure_strategy | tinyint | 失败策略:0结束,1继续 |
warning_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
warning_group_id | int | 告警组 |
schedule_time | datetime | 预期运行时间 |
start_time | datetime | 开始时间 |
executor_id | int | 执行用户id |
dependence | varchar | 依赖字段 |
update_time | datetime | 更新时间 |
process_instance_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
worker_group | varchar | 任务指定运行的worker分组 |
本文档为dolphinscheduler配置文件说明文档,针对版本为 dolphinscheduler-1.3.x 版本.
目前dolphinscheduler 所有的配置文件都在 [conf ] 目录中. 为了更直观的了解[conf]目录所在的位置以及包含的配置文件,请查看下面dolphinscheduler安装目录的简化说明. 本文主要讲述dolphinscheduler的配置文件.其他部分先不做赘述.
[注:以下 dolphinscheduler 简称为DS.]
├─bin DS命令存放目录 │ ├─dolphinscheduler-daemon.sh 启动/关闭DS服务脚本 │ ├─start-all.sh 根据配置文件启动所有DS服务 │ ├─stop-all.sh 根据配置文件关闭所有DS服务 ├─conf 配置文件目录 │ ├─application-api.properties api服务配置文件 │ ├─datasource.properties 数据库配置文件 │ ├─zookeeper.properties zookeeper配置文件 │ ├─master.properties master服务配置文件 │ ├─worker.properties worker服务配置文件 │ ├─quartz.properties quartz服务配置文件 │ ├─common.properties 公共服务[存储]配置文件 │ ├─alert.properties alert服务配置文件 │ ├─config 环境变量配置文件夹 │ ├─install_config.conf DS环境变量配置脚本[用于DS安装/启动] │ ├─env 运行脚本环境变量配置目录 │ ├─dolphinscheduler_env.sh 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...] │ ├─org mybatis mapper文件目录 │ ├─i18n i18n配置文件目录 │ ├─logback-api.xml api服务日志配置文件 │ ├─logback-master.xml master服务日志配置文件 │ ├─logback-worker.xml worker服务日志配置文件 │ ├─logback-alert.xml alert服务日志配置文件 ├─sql DS的元数据创建升级sql文件 │ ├─create 创建SQL脚本目录 │ ├─upgrade 升级SQL脚本目录 │ ├─dolphinscheduler_postgre.sql postgre数据库初始化脚本 │ ├─dolphinscheduler_mysql.sql mysql数据库初始化脚本 │ ├─soft_version 当前DS版本标识文件 ├─script DS服务部署,数据库创建/升级脚本目录 │ ├─create-dolphinscheduler.sh DS数据库初始化脚本 │ ├─upgrade-dolphinscheduler.sh DS数据库升级脚本 │ ├─monitor-server.sh DS服务监控启动脚本 │ ├─scp-hosts.sh 安装文件传输脚本 │ ├─remove-zk-node.sh 清理zookeeper缓存文件脚本 ├─ui 前端WEB资源目录 ├─lib DS依赖的jar存放目录 ├─install.sh 自动安装DS服务脚本
序号 | 服务分类 | 配置文件 |
---|---|---|
1 | 启动/关闭DS服务脚本 | dolphinscheduler-daemon.sh |
2 | 数据库连接配置 | datasource.properties |
3 | zookeeper连接配置 | zookeeper.properties |
4 | 公共[存储]配置 | common.properties |
5 | API服务配置 | application-api.properties |
6 | Master服务配置 | master.properties |
7 | Worker服务配置 | worker.properties |
8 | Alert 服务配置 | alert.properties |
9 | Quartz配置 | quartz.properties |
10 | DS环境变量配置脚本[用于DS安装/启动] | install_config.conf |
11 | 运行脚本加载环境变量配置文件 [如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …] | dolphinscheduler_env.sh |
12 | 各服务日志配置文件 | api服务日志配置文件 : logback-api.xml master服务日志配置文件 : logback-master.xml worker服务日志配置文件 : logback-worker.xml alert服务日志配置文件 : logback-alert.xml |
dolphinscheduler-daemon.sh脚本负责DS的启动&关闭. start-all.sh/stop-all.sh最终也是通过dolphinscheduler-daemon.sh对集群进行启动/关闭操作. 目前DS只是做了一个基本的设置,JVM参数请根据各自资源的实际情况自行设置.
默认简化参数如下:
export DOLPHINSCHEDULER\_OPTS="
-server
-Xmx16g
-Xms1g
-Xss512k
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:+UseFastAccessorMethods
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSInitiatingOccupancyFraction=70
"
不建议设置"-XX:DisableExplicitGC" , DS使用Netty进行通讯,设置该参数,可能会导致内存泄漏.
在DS中使用Druid对数据库连接进行管理,默认简化配置如下.
参数 | 默认值 | 描述 |
---|---|---|
spring.datasource.driver-class-name | 数据库驱动 | |
spring.datasource.url | 数据库连接地址 | |
spring.datasource.username | 数据库用户名 | |
spring.datasource.password | 数据库密码 | |
spring.datasource.initialSize | 5 | 初始连接池数量 |
spring.datasource.minIdle | 5 | 最小连接池数量 |
spring.datasource.maxActive | 5 | 最大连接池数量 |
spring.datasource.maxWait | 60000 | 最大等待时长 |
spring.datasource.timeBetweenEvictionRunsMillis | 60000 | 连接检测周期 |
spring.datasource.timeBetweenConnectErrorMillis | 60000 | 重试间隔 |
spring.datasource.minEvictableIdleTimeMillis | 300000 | 连接保持空闲而不被驱逐的最小时间 |
spring.datasource.validationQuery | SELECT 1 | 检测连接是否有效的sql |
spring.datasource.validationQueryTimeout | 3 | 检测连接是否有效的超时时间[seconds] |
spring.datasource.testWhileIdle | true | 申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 |
spring.datasource.testOnBorrow | true | 申请连接时执行validationQuery检测连接是否有效 |
spring.datasource.testOnReturn | false | 归还连接时执行validationQuery检测连接是否有效 |
spring.datasource.defaultAutoCommit | true | 是否开启自动提交 |
spring.datasource.keepAlive | true | 连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作。 |
spring.datasource.poolPreparedStatements | true | 开启PSCache |
spring.datasource.maxPoolPreparedStatementPerConnectionSize | 20 | 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。 |
参数 | 默认值 | 描述 |
---|---|---|
zookeeper.quorum | localhost:2181 | zk集群连接信息 |
zookeeper.dolphinscheduler.root | /dolphinscheduler | DS在zookeeper存储根目录 |
zookeeper.session.timeout | 60000 | session 超时 |
zookeeper.connection.timeout | 30000 | 连接超时 |
zookeeper.retry.base.sleep | 100 | 基本重试时间差 |
zookeeper.retry.max.sleep | 30000 | 最大重试时间 |
zookeeper.retry.maxtime | 10 | 最大重试次数 |
common.properties配置文件目前主要是配置hadoop/s3a相关的配置.
参数 | 默认值 | 描述 |
---|---|---|
data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件 |
resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,NONE |
resource.upload.path | /dolphinscheduler | 资源文件存储路径 |
hadoop.security.authentication.startup.state | false | hadoop是否开启kerberos权限 |
java.security.krb5.conf.path | /opt/krb5.conf | kerberos配置目录 |
login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos登录用户 |
login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos登录用户keytab |
kerberos.expire.time | 2 | kerberos过期时间,整数,单位为小时 |
resource.view.suffixs | txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties | 资源中心支持的文件格式 |
hdfs.root.user | hdfs | 如果存储类型为HDFS,需要配置拥有对应操作权限的用户 |
fs.defaultFS | hdfs://mycluster:8020 | 请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录 |
fs.s3a.endpoint | s3 endpoint地址 | |
fs.s3a.access.key | s3 access key | |
fs.s3a.secret.key | s3 secret key | |
yarn.resourcemanager.ha.rm.ids | yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可 | |
yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | 如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname |
dolphinscheduler.env.path | env/dolphinscheduler_env.sh | 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …] |
development.state | false | 是否处于开发模式 |
参数 | 默认值 | 描述 |
---|---|---|
server.port | 12345 | api服务通讯端口 |
server.servlet.session.timeout | 7200 | session超时时间 |
server.servlet.context-path | /dolphinscheduler | 请求路径 |
spring.servlet.multipart.max-file-size | 1024MB | 最大上传文件大小 |
spring.servlet.multipart.max-request-size | 1024MB | 最大请求大小 |
server.jetty.max-http-post-size | 5000000 | jetty服务最大发送请求大小 |
spring.messages.encoding | UTF-8 | 请求编码 |
spring.jackson.time-zone | GMT+8 | 设置时区 |
spring.messages.basename | i18n/messages | i18n配置 |
security.authentication.type | PASSWORD | 权限校验类型 |
参数 | 默认值 | 描述 |
---|---|---|
master.listen.port | 5678 | master监听端口 |
master.exec.threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
master.exec.task.num | 20 | master每个流程实例的并行任务数量 |
master.dispatch.task.num | 3 | master每个批次的派发任务数量 |
master.host.selector | LowerWeight | master host选择器,用于选择合适的worker执行任务,可选值: Random, RoundRobin, LowerWeight |
master.heartbeat.interval | 10 | master心跳间隔,单位为秒 |
master.task.commit.retryTimes | 5 | 任务重试次数 |
master.task.commit.interval | 1000 | 任务提交间隔,单位为毫秒 |
master.max.cpuload.avg | -1 | master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2 |
master.reserved.memory | 0.3 | master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G |
参数 | 默认值 | 描述 |
---|---|---|
worker.listen.port | 1234 | worker监听端口 |
worker.exec.threads | 100 | worker工作线程数量,用于限制并行的任务实例数量 |
worker.heartbeat.interval | 10 | worker心跳间隔,单位为秒 |
worker.max.cpuload.avg | -1 | worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2 |
worker.reserved.memory | 0.3 | worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G |
worker.groups | default | worker分组配置,逗号分隔,例如’worker.groups=default,test’ worker启动时会根据该配置自动加入对应的分组 |
参数 | 默认值 | 描述 |
---|---|---|
alert.type | 告警类型 | |
mail.protocol | SMTP | 邮件服务器协议 |
mail.server.host | xxx.xxx.com | 邮件服务器地址 |
mail.server.port | 25 | 邮件服务器端口 |
mail.sender | xxx@xxx.com | 发送人邮箱 |
mail.user | xxx@xxx.com | 发送人邮箱名称 |
mail.passwd | 111111 | 发送人邮箱密码 |
mail.smtp.starttls.enable | true | 邮箱是否开启tls |
mail.smtp.ssl.enable | false | 邮箱是否开启ssl |
mail.smtp.ssl.trust | xxx.xxx.com | 邮箱ssl白名单 |
xls.file.path | /tmp/xls | 邮箱附件临时工作目录 |
以下为企业微信配置[选填] | ||
enterprise.wechat.enable | false | 企业微信是否启用 |
enterprise.wechat.corp.id | xxxxxxx | |
enterprise.wechat.secret | xxxxxxx | |
enterprise.wechat.agent.id | xxxxxxx | |
enterprise.wechat.users | xxxxxxx | |
enterprise.wechat.token.url | https://qyapi.weixin.qq.com/cgi-bin/gettoken? corpid=corpId&corpsecret=secret | |
enterprise.wechat.push.url | https://qyapi.weixin.qq.com/cgi-bin/message/send? access_token=$token | |
enterprise.wechat.user.send.msg | 发送消息格式 | |
enterprise.wechat.team.send.msg | 群发消息格式 | |
plugin.dir | /Users/xx/your/path/to/plugin/dir | 插件目录 |
这里面主要是quartz配置,请结合实际业务场景&资源进行配置,本文暂时不做展开.
参数 | 默认值 | 描述 |
---|---|---|
org.quartz.jobStore.driverDelegateClass | org.quartz.impl.jdbcjobstore.StdJDBCDelegate | |
org.quartz.jobStore.driverDelegateClass | org.quartz.impl.jdbcjobstore.PostgreSQLDelegate | |
org.quartz.scheduler.instanceName | DolphinScheduler | |
org.quartz.scheduler.instanceId | AUTO | |
org.quartz.scheduler.makeSchedulerThreadDaemon | true | |
org.quartz.jobStore.useProperties | false | |
org.quartz.threadPool.class | org.quartz.simpl.SimpleThreadPool | |
org.quartz.threadPool.makeThreadsDaemons | true | |
org.quartz.threadPool.threadCount | 25 | |
org.quartz.threadPool.threadPriority | 5 | |
org.quartz.jobStore.class | org.quartz.impl.jdbcjobstore.JobStoreTX | |
org.quartz.jobStore.tablePrefix | QRTZ_ | |
org.quartz.jobStore.isClustered | true | |
org.quartz.jobStore.misfireThreshold | 60000 | |
org.quartz.jobStore.clusterCheckinInterval | 5000 | |
org.quartz.jobStore.acquireTriggersWithinLock | true | |
org.quartz.jobStore.dataSource | myDs | |
org.quartz.dataSource.myDs.connectionProvider.class | org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider |
install_config.conf这个配置文件比较繁琐,这个文件主要有两个地方会用到.
调用install.sh脚本会自动加载该文件中的配置.并根据该文件中的内容自动配置上述的配置文件中的内容. 比如:dolphinscheduler-daemon.sh、datasource.properties、zookeeper.properties、common.properties、application-api.properties、master.properties、worker.properties、alert.properties、quartz.properties 等文件.
DS集群在启动&关闭的时候,会加载该配置文件中的masters,workers,alertServer,apiServers等参数,启动/关闭DS集群.
文件内容如下:
# 注意: 该配置文件中如果包含特殊字符,如: `.\*[]^${}\+?|()@#&`, 请转义, # 示例: `[` 转义为 `\[` # 数据库类型, 目前仅支持 postgresql 或者 mysql dbtype="mysql" # 数据库 地址 & 端口 dbhost="192.168.xx.xx:3306" # 数据库 名称 dbname="dolphinscheduler" # 数据库 用户名 username="xx" # 数据库 密码 password="xx" # Zookeeper地址 zkQuorum="192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181" # 将DS安装到哪个目录,如: /data1\_1T/dolphinscheduler, installPath="/data1\_1T/dolphinscheduler" # 使用哪个用户部署 # 注意: 部署用户需要sudo 权限, 并且可以操作 hdfs . # 如果使用hdfs的话,根目录必须使用该用户进行创建.否则会有权限相关的问题. deployUser="dolphinscheduler" # 以下为告警服务配置 # 邮件服务器地址 mailServerHost="smtp.exmail.qq.com" # 邮件服务器 端口 mailServerPort="25" # 发送者 mailSender="xxxxxxxxxx" # 发送用户 mailUser="xxxxxxxxxx" # 邮箱密码 mailPassword="xxxxxxxxxx" # TLS协议的邮箱设置为true,否则设置为false starttlsEnable="true" # 开启SSL协议的邮箱配置为true,否则为false。注意: starttlsEnable和sslEnable不能同时为true sslEnable="false" # 邮件服务地址值,同 mailServerHost sslTrust="smtp.exmail.qq.com" #业务用到的比如sql等资源文件上传到哪里,可以设置:HDFS,S3,NONE。如果想上传到HDFS,请配置为HDFS;如果不需要资源上传功能请选择NONE。 resourceStorageType="NONE" # if S3,write S3 address,HA,for example :s3a://dolphinscheduler, # Note,s3 be sure to create the root directory /dolphinscheduler defaultFS="hdfs://mycluster:8020" # 如果resourceStorageType 为S3 需要配置的参数如下: s3Endpoint="http://192.168.xx.xx:9010" s3AccessKey="xxxxxxxxxx" s3SecretKey="xxxxxxxxxx" # 如果ResourceManager是HA,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx",否则如果是单ResourceManager或者根本没用到yarn,请配置yarnHaIps=""即可,如果没用到yarn,配置为"" yarnHaIps="192.168.xx.xx,192.168.xx.xx" # 如果是单ResourceManager,则配置为ResourceManager节点ip或主机名,否则保持默认值即可。 singleYarnIp="yarnIp1" # 资源文件在 HDFS/S3 存储路径 resourceUploadPath="/dolphinscheduler" # HDFS/S3 操作用户 hdfsRootUser="hdfs" # 以下为 kerberos 配置 # kerberos是否开启 kerberosStartUp="false" # kdc krb5 config file path krb5ConfPath="$installPath/conf/krb5.conf" # keytab username keytabUserName="hdfs-mycluster@ESZ.COM" # username keytab path keytabPath="$installPath/conf/hdfs.headless.keytab" # api 服务端口 apiServerPort="12345" # 部署DS的所有主机hostname ips="ds1,ds2,ds3,ds4,ds5" # ssh 端口 , 默认 22 sshPort="22" # 部署master服务主机 masters="ds1,ds2" # 部署 worker服务的主机 # 注意: 每一个worker都需要设置一个worker 分组的名称,默认值为 "default" workers="ds1:default,ds2:default,ds3:default,ds4:default,ds5:default" # 部署alert服务主机 alertServer="ds3" # 部署api服务主机 apiServers="ds1"
通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中. 涉及到的任务类型有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等
export HADOOP\_HOME=/opt/soft/hadoop
export HADOOP\_CONF\_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK\_HOME1=/opt/soft/spark1
export SPARK\_HOME2=/opt/soft/spark2
export PYTHON\_HOME=/opt/soft/python
export JAVA\_HOME=/opt/soft/java
export HIVE\_HOME=/opt/soft/hive
export FLINK\_HOME=/opt/soft/flink
export DATAX\_HOME=/opt/soft/datax/bin/datax.py
export PATH=$HADOOP\_HOME/bin:$SPARK\_HOME1/bin:$SPARK\_HOME2/bin:$PYTHON\_HOME:$JAVA\_HOME/bin:$HIVE\_HOME/bin:$PATH:$FLINK\_HOME/bin:$DATAX\_HOME:$PATH
对应服务服务名称 | 日志文件名 |
---|---|
api服务日志配置文件 | logback-api.xml |
master服务日志配置文件 | logback-master.xml |
worker服务日志配置文件 | logback-worker.xml |
alert服务日志配置文件 | logback-alert.xml |
在dolphinscheduler中创建的所有任务都保存在t_ds_process_definition 表中.
该数据库表结构如下表所示:
序号 | 字段 | 类型 | 描述 |
---|---|---|---|
1 | id | int(11) | 主键 |
2 | name | varchar(255) | 流程定义名称 |
3 | version | int(11) | 流程定义版本 |
4 | release_state | tinyint(4) | 流程定义的发布状态:0 未上线 , 1已上线 |
5 | project_id | int(11) | 项目id |
6 | user_id | int(11) | 流程定义所属用户id |
7 | process_definition_json | longtext | 流程定义JSON |
8 | description | text | 流程定义描述 |
9 | global_params | text | 全局参数 |
10 | flag | tinyint(4) | 流程是否可用:0 不可用,1 可用 |
11 | locations | text | 节点坐标信息 |
12 | connects | text | 节点连线信息 |
13 | receivers | text | 收件人 |
14 | receivers_cc | text | 抄送人 |
15 | create_time | datetime | 创建时间 |
16 | timeout | int(11) | 超时时间 |
17 | tenant_id | int(11) | 租户id |
18 | update_time | datetime | 更新时间 |
19 | modify_by | varchar(36) | 修改用户 |
20 | resource_ids | varchar(255) | 资源ids |
其中process_definition_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.
公共的数据结构如下表.
序号 | 字段 | 类型 | 描述 |
---|---|---|---|
1 | globalParams | Array | 全局参数 |
2 | tasks | Array | 流程中的任务集合 [ 各个类型的结构请参考如下章节] |
3 | tenantId | int | 租户id |
4 | timeout | int | 超时时间 |
数据示例:
{
"globalParams":[
{
"prop":"golbal\_bizdate",
"direct":"IN",
"type":"VARCHAR",
"value":"${system.biz.date}"
}
],
"tasks":Array[1],
"tenantId":0,
"timeout":0
}
各任务类型存储结构详解
节点数据结构如下:
序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 | ||
2 | type | String | 类型 | SHELL | |
3 | name | String | 名称 | ||
4 | params | Object | 自定义参数 | Json 格式 | |
5 | rawScript | String | Shell脚本 | ||
6 | localParams | Array | 自定义参数 | ||
7 | resourceList | Array | 资源文件 | ||
8 | description | String | 描述 | ||
9 | runFlag | String | 运行标识 | ||
10 | conditionResult | Object | 条件分支 | ||
11 | successNode | Array | 成功跳转节点 | ||
12 | failedNode | Array | 失败跳转节点 | ||
13 | dependence | Object | 任务依赖 | 与params互斥 | |
14 | maxRetryTimes | String | 最大重试次数 | ||
15 | retryInterval | String | 重试间隔 | ||
16 | timeout | Object | 超时控制 | ||
17 | taskInstancePriority | String | 任务优先级 | ||
18 | workerGroup | String | Worker 分组 | ||
19 | preTasks | Array | 前置任务 |
节点数据样例:
{ "type":"SHELL", "id":"tasks-80760", "name":"Shell Task", "params":{ "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "rawScript":"echo "This is a shell script"" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
通过 SQL对指定的数据源进行数据查询、更新操作.
节点数据结构如下:
序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 |
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Array | 流程中的任务集合 [ 各个类型的结构请参考如下章节] |
| 3 | tenantId | int | 租户id |
| 4 | timeout | int | 超时时间 |
数据示例:
{
"globalParams":[
{
"prop":"golbal\_bizdate",
"direct":"IN",
"type":"VARCHAR",
"value":"${system.biz.date}"
}
],
"tasks":Array[1],
"tenantId":0,
"timeout":0
}
各任务类型存储结构详解
节点数据结构如下:
序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 | ||
2 | type | String | 类型 | SHELL | |
3 | name | String | 名称 | ||
4 | params | Object | 自定义参数 | Json 格式 | |
5 | rawScript | String | Shell脚本 | ||
6 | localParams | Array | 自定义参数 | ||
7 | resourceList | Array | 资源文件 | ||
8 | description | String | 描述 | ||
9 | runFlag | String | 运行标识 | ||
10 | conditionResult | Object | 条件分支 | ||
11 | successNode | Array | 成功跳转节点 | ||
12 | failedNode | Array | 失败跳转节点 | ||
13 | dependence | Object | 任务依赖 | 与params互斥 | |
14 | maxRetryTimes | String | 最大重试次数 | ||
15 | retryInterval | String | 重试间隔 | ||
16 | timeout | Object | 超时控制 | ||
17 | taskInstancePriority | String | 任务优先级 | ||
18 | workerGroup | String | Worker 分组 | ||
19 | preTasks | Array | 前置任务 |
节点数据样例:
{ "type":"SHELL", "id":"tasks-80760", "name":"Shell Task", "params":{ "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "rawScript":"echo "This is a shell script"" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
通过 SQL对指定的数据源进行数据查询、更新操作.
节点数据结构如下:
序号 | 参数名 | 类型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任务编码 |
[外链图片转存中…(img-pxbvrjrV-1714401712376)]
[外链图片转存中…(img-VROvaK4s-1714401712377)]
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。