赞
踩
工作流(Workflow),指“业务过程的部分或整体在计算机应用环境下的自动化”。是对工作流程及其各操作步骤之间业务规则的抽象、概括描述。工作流解决的主要问题是:为了实现某个业务目标,利用计算机软件在多个参与者之间按某种预定规则自动传递文档、信息或者任务。
一个完整的数据分析系统通常都是由多个前后依赖的模块组合构成的:数据采集、数据预处理、数据分析、数据展示等。各个模块单元之间存在时间先后依赖关系,且存在着周期性重复。
为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。
简单的任务调度:直接使用linux的crontab来定义,但是缺点也是比较明显,无法设置依赖。
复杂的任务调度:自主开发调度平台,使用开源调度系统,比如azkaban、Apache Oozie、Cascading、Hamake等。
其中知名度比较高的是Apache Oozie,但是其配置工作流的过程是编写大量的XML配置,而且代码复杂度比较高,不易于二次开发。
crontab
使用linux的crontab来定义调度,但是缺点比较明显,无法设置依赖复杂任务调度。且需要编写相关shell脚本。
当下企业两种选择,
知名度比较高的是Apache Oozie,但是其配置工作流的过程是编写大量的XML配置,而且代码复杂度比较高,不易于二次开发。
Azkaban是由linkedin(领英)公司推出的一个批量工作流任务调度器,用于在一个工作流内以一个特定的顺序运行一组工作和流程。Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。
Azkaban功能特点:
该模式中webServer和executorServer运行在同一个进程中,进程名是AzkabanSingleServer。使用自带的H2数据库。这种模式包含Azkaban的所有特性,但一般用来学习和测试。
该模式使用MySQL数据库, Web Server和Executor Server运行在不同的进程中。
该模式使用MySQL数据库, Web Server和Executor Server运行在不同的机器中。且有多个Executor Server。该模式适用于大规模应用。
cd /export/softwares/
curl -L -O https://github.com/azkaban/azkaban/archive/3.71.0.tar.gz
tar -zxf azkaban-3.71.0.tar.gz -C /export/services/
ln -s /export/services/azkaban-3.71.0 /export/services/azkaban
yum install -y git
修改Azkaban依赖的node版本
Azkaban默认使用NodeJs-8.10.0版本,但是前面已经安装过最新的NodeJs-12.16.1版本,所以直接使用即可
vim /export/services/azkaban/azkaban-web-server/build.gradle
cd /export/services/azkaban
#跳过测试
./gradlew build installDist -x test
查看编译后的二进制软件包
ll azkaban-solo-server/build/distributions/
解压缩
tar -zxf /export/services/azkaban/azkaban-solo-server/build/distributions/azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz -C /export/services/
## 重命名
mv azkaban-solo-server-0.1.0-SNAPSHOT azkaban-3.71.0-bin
#进入/export/services目录
cd /export/services/
#创建软连接
ln -s /export/services/azkaban-3.71.0-bin /export/services/azkaban
#添加环境变量
vim /etc/profile
export AZKABAN_HOME=/export/services/azkaban
PATH=$AZKABAN_HOME/bin:$PATH
source /etc/profile
#配置azkaban.properties
cd /export/services/azkaban
vim conf/azkaban.properties
配置commonprivate.properties
vim plugins/jobtypes/commonprivate.properties
在mysql中添加azkaban数据库
mysql -uroot -p123456
create database azkaban default character set utf8 collate utf8_general_ci;
quit
启动Azkaban
cd /export/services/azkaban
./bin/start-solo.sh
注意:这个start-solo.sh脚本使用的是相对路径,必须进入到/export/services/azkaban路径下执行./bin/start-solo.sh,否则mysql数据库会初始化失败
打开Azkaban的WebUI
在浏览器中输入http://node2:8081
账户:azkaban
密码:azkaban
登录成功后的样子如图:
访问Web Server=>http://node2:8081/ 默认用户名密码azkaban
http://node2:8081/index登录=>Create Project=>Upload zip包 =>execute flow执行一步步操作即可。
创建两个文件one.job two.job,内容如下,打包成zip包。
cat one.job
type=command
command=echo "this is job one"
cat two.job
type=command
dependencies=one
command=echo "this is job two"
创建工程:
上传zip压缩包:
execute执行:
执行页面:
执行结果查看:
vi command.job
#command.job
type=command
command=echo 'hello'
zip command.job
# foo.job
type=command
command=echo foo
第二个job:bar.job依赖foo.job
# bar.job
type=command
dependencies=foo
command=echo bar
将所有job资源文件打到一个zip包中
在azkaban的web管理界面创建工程并上传zip包
启动工作流flow
# fs.job
type=command
command=hadoop fs -mkdir /azaz
将job资源文件打包成zip文件
通过azkaban的web管理平台创建project并上传job压缩包
启动执行该job
除了手动立即执行工作流任务外,azkaban也支持配置定时任务调度。开启方式如下:
首页选择待处理的project
上述图片中,选择左边schedule表示配置定时调度信息,选择右边execute表示立即执行工作流任务。
shell脚本(scheduler.sh)
#!/bin/sh
cls=$1
flag=0
clsDwd=com.erainm.logistics.offline.dwd.${cls}DWD
clsDws=com.erainm.logistics.offline.dws.${cls}DWS
baseDir=/export/services/logistics/lib/
if [[ $cls = "Customer" || $cls = "ExpressBill" || $cls = "TransportTool" || $cls = "Warehouse" || $cls = "Waybill" ]]; then
echo -e "\e[32m==== MainClass is: "$clsDwd" and "$clsDws"\e[0m"
flag=1
else
echo -e "\e[31mUsage : \n\tExpressBill\n\tCustomer\n\tTransportTool\n\tWarehouse\n\tWaybill\e[0m"
fi
if [[ $flag = 1 ]]; then
echo -e "\e[32m==== builder spark commands ====\e[0m"
cmd1="spark-submit --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.1 --class ${clsDwd} --master yarn --deploy-mode cluster --driver-memory 512m --executor-cores 1 --executor-memory 512m --queue default --verbose ${baseDir}logistics-etl.jar"
cmd2="spark-submit --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.1 --class ${clsDws} --master yarn --deploy-mode cluster --driver-memory 512m --executor-cores 1 --executor-memory 512m --queue default --verbose ${baseDir}logistics-etl.jar"
echo -e "\e[32m==== CMD1 is: $cmd1 ====\e[0m"
echo -e "\e[32m==== CMD2 is: $cmd2 ====\e[0m"
fi
if [[ $flag = 1 && `ls -A $baseDir|wc -w` = 1 ]]; then
echo -e "\e[32m==== start execute ${clsDwd} ====\e[0m"
sh $cmd1
echo -e "\e[32m==== start execute ${clsDws} ====\e[0m"
sh $cmd2
else
echo -e "\e[31m==== The jar package in $baseDir directory does not exist! ====\e[0m"
echo -e "\e[31m==== Plase upload logistics-common.jar,logistics-etl.jar,logistics-generate.jar ====\e[0m"
fi
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh ExpressBill
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh Waybill
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh Warehouse
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh TransportTool
#command
type=command
command=sh /export/services/logistics/bin/scheduler.sh Customer
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。