赞
踩
大数据:任务调度https://blog.csdn.net/qq_43713049/article/details/116985497
整体需求
基于时间的任务运行
基于运行依赖关系的任务运行
调度类型
Linux Crontab
Linux中自带的一个工具
优点
缺点
语法
* * * * * command
分钟 小时 日 月 周几
Oozie
Cloudera公司研发的Hadoop生态圈的调度工具
Zeus
阿里巴巴最早基于Hadoop1研发的一个调度系统,目前市场上的Zeus一般都是携程版本的Zeus
Azkaban
LinkedIn公司研发的分布式调度工具
功能
特点
应用
原理
原生方式
这种方式,是通过自己写代码的方式来实现工作流的开发,效率低,容易出问题,不用
实现一个效果:4个程序
先要开发一个XML文件
<start to="first">
<action name="first">
<shell>
<path>xxx.sh</path>
<args></args>
</shell>
<ok to="second"> </ok>
<error to="kill"></error>
</action>
<action name ="second">
<spark>
<jar></jar>
<class></class>
……
</spark>
<ok to="third"> </ok>
<error to="kill"></error>
<action>
<action name ="forth">
<hive>
<scprit></scprit>
<path></path>
……
</hive>
<ok to="end"> </ok>
<error to="kill"></error>
<action>
<kill name="kill">
kill
</kill>
<end name="end">
end
</end>
集成Hue
由于Oozie原生的方式交互性非常差,导致用户上手非常困难
Cloudera基于可视化需求,在Hue中集成Oozie开发和监控
创建测试脚本
测试
mkdir /export/data/flow
#!/bin/bash
echo "this is test01"
#!/bin/bash
echo "this is test02"
#!/bin/bash
echo "this is test03"
#!/bin/bash
echo "this is test04"
hdfs dfs -put /export/data/flow /user/oozie/
单job工作流
需求1:构建一个工作流,执行test01
多job工作流
需求2:构建一个工作流,先执行test01,再执行test02,最后执行test03
分支工作流
需求3
test01先执行
test01执行完成,test02和test03并行执行
test02和test03都执行完成,执行test04
需求:在调度运行一个工作流的实现,需要嵌套调用另外一个工作流
目标:实现自动化脚本调度的开发
路径
实施
增量采集脚本job1
vim /export/data/shell/01.collect.sh
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo “参数至多只能有一个,为处理的日期,请重新运行!”
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=KaTeX parse error: Expected 'EOF', got '#' at position 13: 1 fi else #̲参数个数为0,默认处理昨天的日…{yesterday}"
echo “step2:开始运行采集的程序”
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
S
Q
O
O
P
H
O
M
E
/
b
i
n
/
s
q
o
o
p
i
m
p
o
r
t
−
−
c
o
n
n
e
c
t
j
d
b
c
:
m
y
s
q
l
:
/
/
n
o
d
e
3
:
3306
/
d
b
o
r
d
e
r
−
−
u
s
e
r
n
a
m
e
r
o
o
t
−
−
p
a
s
s
w
o
r
d
−
f
i
l
e
h
d
f
s
:
/
/
n
o
d
e
1
:
8020
/
u
s
e
r
/
o
o
z
i
e
/
s
h
e
l
l
/
s
q
o
o
p
.
p
a
s
s
w
d
−
−
q
u
e
r
y
"
s
e
l
e
c
t
∗
f
r
o
m
t
b
o
r
d
e
r
w
h
e
r
e
s
u
b
s
t
r
i
n
g
(
c
r
e
a
t
e
t
i
m
e
,
1
,
10
)
=
′
SQOOP_HOME/bin/sqoop import \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \ --query "select * from tb_order where substring(create_time,1,10) = '
SQOOPHOME/bin/sqoopimport −−connectjdbc:mysql://node3:3306/dborder −−usernameroot −−password−filehdfs://node1:8020/user/oozie/shell/sqoop.passwd −−query"select∗fromtborderwheresubstring(createtime,1,10)=′{yesterday}’ and $CONDITIONS "
–delete-target-dir
–target-dir /nginx/logs/tb_order/daystr=${yesterday}
–fields-terminated-by ‘\t’
-m 1
echo “step2:采集的程序运行结束”
echo “step3:开始运行ETL”
#模拟ETL的过程,将采集的新增的数据移动到表的目录下
HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
#先判断结果是否存在,如果已经存在,先删除再移动
H
A
D
O
O
P
H
O
M
E
/
b
i
n
/
h
d
f
s
d
f
s
−
t
e
s
t
−
e
/
u
s
e
r
/
h
i
v
e
/
w
a
r
e
h
o
u
s
e
/
t
b
o
r
d
e
r
/
d
a
y
s
t
r
=
HADOOP_HOME/bin/hdfs dfs -test -e /user/hive/warehouse/tb_order/daystr=
HADOOPHOME/bin/hdfsdfs−test−e/user/hive/warehouse/tborder/daystr={yesterday}
if [ $? -eq 0 ]
then
#存在
H
A
D
O
O
P
H
O
M
E
/
b
i
n
/
h
d
f
s
d
f
s
−
r
m
−
r
/
u
s
e
r
/
h
i
v
e
/
w
a
r
e
h
o
u
s
e
/
t
b
o
r
d
e
r
/
d
a
y
s
t
r
=
HADOOP_HOME/bin/hdfs dfs -rm -r /user/hive/warehouse/tb_order/daystr=
HADOOPHOME/bin/hdfsdfs−rm−r/user/hive/warehouse/tborder/daystr={yesterday}
H
A
D
O
O
P
H
O
M
E
/
b
i
n
/
h
d
f
s
d
f
s
−
c
p
/
n
g
i
n
x
/
l
o
g
s
/
t
b
o
r
d
e
r
/
d
a
y
s
t
r
=
HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=
HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr={yesterday} /user/hive/warehouse/tb_order/
else
#不存在
H
A
D
O
O
P
H
O
M
E
/
b
i
n
/
h
d
f
s
d
f
s
−
c
p
/
n
g
i
n
x
/
l
o
g
s
/
t
b
o
r
d
e
r
/
d
a
y
s
t
r
=
HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=
HADOOPHOME/bin/hdfsdfs−cp/nginx/logs/tborder/daystr={yesterday} /user/hive/warehouse/tb_order/
fi
echo “step3:ETL结束”
增量统计个数脚本job2
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
#!/bin/bash #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期 if [ $# -ne 0 ] then #参数个数不为0 if [ $# -ne 1 ] then echo "参数至多只能有一个,为处理的日期,请重新运行!" exit 100 else #参数个数只有1个,就用第一个参数作为处理的日期 yesterday=$1 fi else #参数个数为0,默认处理昨天的日期 yesterday=`date -d '-1 day' +%Y-%m-%d` fi echo "step1:要处理的日期是:${yesterday}" echo "step2:开始运行分析" #step2:运行分析程序 HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0 $HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/02.analysis.sql echo "step2:分析的程序运行结束"
create table if not exists default.tb_order( id string , pid string, userid string, price double , create_time string ) partitioned by (daystr string) row format delimited fields terminated by '\t' location '/user/hive/warehouse/tb_order'; alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}'); create table if not exists default.tb_order_num_rs( daystr string, order_number int ) row format delimited fields terminated by '\t'; insert into table default.tb_order_num_rs select daystr, count(id) as order_number from default.tb_order where daystr='${hiveconf:yest}' group by daystr
增量统计金额脚本job3
vim /export/data/shell/03.analysis.sh
vim /export/data/shell/03.analysis.sql
#!/bin/bash #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期 if [ $# -ne 0 ] then #参数个数不为0 if [ $# -ne 1 ] then echo "参数至多只能有一个,为处理的日期,请重新运行!" exit 100 else #参数个数只有1个,就用第一个参数作为处理的日期 yesterday=$1 fi else #参数个数为0,默认处理昨天的日期 yesterday=`date -d '-1 day' +%Y-%m-%d` fi echo "step1:要处理的日期是:${yesterday}" echo "step2:开始运行分析" #step2:运行分析程序 HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0 $HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/03.analysis.sql echo "step2:分析的程序运行结束"
create table if not exists default.tb_order( id string , pid string, userid string, price double , create_time string ) partitioned by (daystr string) row format delimited fields terminated by '\t' location '/user/hive/warehouse/tb_order'; alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}'); create table if not exists default.tb_order_price_rs( daystr string, order_price double ) row format delimited fields terminated by '\t'; insert into table default.tb_order_price_rs select daystr, sum(price) as order_price from default.tb_order where daystr='${hiveconf:yest}' group by daystr;
增量合并导出脚本job4
vim /export/data/shell/04.export.sh
vim /export/data/shell/04.export.sql
#!/bin/bash #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期 if [ $# -ne 0 ] then #参数个数不为0 if [ $# -ne 1 ] then echo "参数至多只能有一个,为处理的日期,请重新运行!" exit 100 else #参数个数只有1个,就用第一个参数作为处理的日期 yesterday=$1 fi else #参数个数为0,默认处理昨天的日期 yesterday=`date -d '-1 day' +%Y-%m-%d` fi echo "step1:要处理的日期是:${yesterday}" echo "step2:开始运行分析" #step2:运行分析程序 HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0 $HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/04.export.sql echo "step2:分析的程序运行结束" echo "step3:开始运行导出的程序" #step2:运行增量采集 SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0 $SQOOP_HOME/bin/sqoop export \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \ --table tb_order_rs \ --hcatalog-database default \ --hcatalog-table tb_order_rs \ --input-fields-terminated-by '\t' \ --update-key daystr \ --update-mode allowinsert \ -m 1 echo "step3:导出的程序运行结束"
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
)
row format delimited fields terminated by '\t';
insert into table default.tb_order_rs
select
a.daystr,
a.order_number,
b.order_price
from default.tb_order_num_rs a join default.tb_order_price_rs b on a.daystr = b.daystr
where a.daystr='${hiveconf:yest}';
上传
cp /export/data/sqoop.passwd /export/data/shell/
hdfs dfs -put /export/data/shell /user/oozie/
在MySQL中导入最新数据
use db_order;
insert into tb_order values('o00013','p00009','u00001',121,'2021-05-17 00:01:01');
insert into tb_order values('o00014','p00010','u00002',122,'2021-05-17 10:01:02');
insert into tb_order values('o00015','p00011','u00003',123,'2021-05-17 11:01:03');
insert into tb_order values('o00016','p00012','u00004',124,'2021-05-17 23:01:04');
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。