当前位置:   article > 正文

大数据项目之电商数仓(业务数据仓库)_yanagishima为啥不更新了

yanagishima为啥不更新了

 

第1章 电商业务与数据结构简介

1.1 电商业务流程

 

1.2 电商表结构

电商业务流程

1.2.1 电商常识(SKU、SPU)

 SKU=Stock Keeping Unit(库存量单位)。即库存进出计量的基本单元,可以是以件,盒,托盘等为单位。SKU这是对于大型连锁超市DC(配送中心)物流管理的一个必要的方法。现在已经被引申为产品统一编号的简称,每种产品均对应有唯一的SKU号。

 SPU(Standard Product Unit):标准化产品单元。是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息的集合,该集合描述了一个产品的特性。

首先通过检索搜索出来的商品列表中,每个商品都是一个SKU。每个SKU都有自己独立的库存数。也就是说每一个商品详情展示都是一个SKU。

比如,咱们购买一台iPhoneX手机,iPhoneX手机就是一个SPU,但是你购买的时候,不可能是以iPhoneX手机为单位买的,商家也不可能以iPhoneX为单位记录库存。必须要以什么颜色什么版本的iPhoneX为单位。比如,你购买的是一台银色、128G内存的、支持联通网络的iPhoneX ,商家也会以这个单位来记录库存数。那这个更细致的单位就叫库存单元(SKU)。

那spu又是干什么的呢?

如上图,一般的电商系统你点击进去以后,都能看到这个商品关联了其他好几个类似的商品,而且这些商品很多的信息都是共用的,比如商品图片,海报、销售属性等。

1.2.2 订单表(order_info)

标签

含义

id

订单编号

total_amount

订单金额

order_status

订单状态

user_id

用户id

payment_way

支付方式

out_trade_no

支付流水号

create_time

创建时间

operate_time

操作时间

 

1.2.3 订单详情表(order_detail)

标签

含义

id

订单编号

order_id

订单号

user_id

用户id

sku_id

商品id

sku_name

商品名称

order_price

下单价格

sku_num

商品数量

create_time

创建时间

1.2.4 商品表

标签

含义

id

skuId

spu_id

spuid

price

价格

sku_name

商品名称

sku_desc

商品描述

weight

重量

tm_id

品牌id

category3_id

品类id

create_time

创建时间

1.2.5 用户表

标签

含义

id

用户id

name

姓名

birthday

生日

gender

性别

email

邮箱

user_level

用户等级

create_time

创建时间

1.2.6 商品一级分类表

标签

含义

id

id

name

名称

1.2.7 商品二级分类表

标签

含义

id

id

name

名称

category1_id

一级品类id

1.2.8 商品三级分类表

标签

含义

id

id

name

名称

Category2_id

二级品类id

1.2.9 支付流水表

标签

含义

id

编号

out_trade_no

对外业务编号

order_id

订单编号

user_id

用户编号

alipay_trade_no

支付宝交易流水编号

total_amount

支付金额

subject

交易内容

payment_type

支付类型

payment_time

支付时间

第2章 数仓理论

2.1 表的分类

2.1.1 实体表

  • 定义

实体表,一般是指一个现实存在的业务对象,比如用户,商品,商家,销售员等等。

用户表:

用户id

姓名

生日

性别

邮箱

用户等级

创建时间

1

张三

2011-11-11

zs@163.com

2

2018-11-11

2

李四

2011-11-11

ls@163.com

3

2018-11-11

3

王五

2011-11-11

中性

ww@163.com

1

2018-11-11

2.1.2 维度表

  • 定义

维度表,一般是指对应一些业务状态,代码的解释表。也可以称之为码表。

比如地区表,订单状态,支付方式,审批状态,商品分类等等。

订单状态表:

订单状态编号

订单状态名称

1

未支付

2

支付

3

发货中

4

已发货

5

已完成

商品分类表:

商品分类编号

分类名称

1

少儿

2

文艺

3

生活

4

科技

2.1.3 事务型事实表

  • 定义

事务型事实表,一般指随着业务发生不断产生的数据。特点是一旦发生不会再变化。

一般比如,交易流水,操作日志,出库入库记录等等。

交易流水表:

编号

对外业务编号

订单编号

用户编号

支付宝交易流水编号

支付金额

交易内容

支付类型

支付时间

1

7577697945

1

111

QEyF-63000323

223.00

海狗人参丸1

alipay

2019-02-10 00:50:02

2

0170099522

2

222

qdwV-25111279

589.00

海狗人参丸2

wechatpay

2019-02-10 00:50:02

3

1840931679

3

666

hSUS-65716585

485.00

海狗人参丸3

unionpay

2019-02-10 00:50:02

。。。

。。。

。。。

。。。

。。。

。。。

。。。

。。。

。。。

2.1.4 周期型事实表

  • 定义

周期型事实表,一般指随着业务发生不断产生的数据。

与事务型不同的是,数据会随着业务周期性的推进而变化。

 比如订单,其中订单状态会周期性变化。再比如,请假、贷款申请,随着批复状态在周期性变化。

订单表:

订单编号

订单金额

订单状态

用户id

支付方式

支付流水号

创建时间

操作时间

1

223.00

2

111

alipay

QEyF-63000323

2019-02-10 00:01:29

2019-02-10 00:01:29

2

589.00

2

222

wechatpay

qdwV-25111279

2019-02-10 00:05:02

2019-02-10 00:05:02

3

485.00

1

666

unionpay

hSUS-65716585

2019-02-10 00:50:02

2019-02-10 00:50:02

。。。

。。。

。。。

。。。

。。。

。。。

。。。

。。。

2.2 同步策略

数据同步策略的类型包括:全量表、增量表、新增及变化表、拉链表

全量表:存储完整的数据。

增量表:存储新增加的数据。

新增及变化表:存储新增加的数据和变化的数据。

拉链表:对新增及变化表做定期合并。

2.2.1 实体表同步策略

实体表:比如用户,商品,商家,销售员等

实体表数据量比较小:通常可以做每日全量,就是每天存一份完整数据。即每日全量。

2.2.2 维度表同步策略

维度表:比如订单状态,审批状态,商品分类

维度表数据量比较小:通常可以做每日全量,就是每天存一份完整数据。即每日全量。

说明:

1)针对可能会有变化的状态数据可以存储每日全量

2)没变化的客观世界的维度(比如性别,地区,民族,政治成分,鞋子尺码)可以就存一份固定值。

2.2.3 事务型事实表同步策略

事务型事实表:比如,交易流水,操作日志,出库入库记录等。

因为数据不会变化,而且数据量巨大,所以每天只同步新增数据即可,所以可以做成每日增量表,即每日创建一个分区存储。

2.2.4 周期型事实表同步策略

周期型事实表:比如,订单、请假、贷款申请等

这类表从数据量的角度,存每日全量的话,数据量太大,冗余也太大。如果用每日增量的话无法反应数据变化。

 每日新增及变化量可以用,包括了当日的新增和修改。一般来说这个表,足够计算大部分当日数据的。但是这种依然无法解决能够得到某一个历史时间点(时间切片)的切片数据。 

 所以要用利用每日新增和变化表,制作一张拉链表,以方便的取到某个时间切片的快照数据。所以我们需要得到每日新增及变化量。

拉链表:

name姓名

start新名字创建时间

end名字更改时间

张三

1990/1/1

2018/12/31

张三三

2019/1/1

9999-99-99

。。。

。。。

。。。

select * from user where start =<2020-1-1and end>=2020-1-1

2.3 范式理论

2.3.1 范式概念

关系型数据库设计时,遵照一定的规范要求,目的在于降低数据的冗余性,目前业界范式有:第一范式(1NF)、第二范式(2NF)、第三范式(3NF)、巴斯-科德范式(BCNF)、第四范式(4NF)、第五范式(5NF)。

范式的标准定义是:符合某一种级别的关系模式的集合,表示一个关系内部各属性之间的联系的合理化程度。通俗地讲,范式可以理解为一张数据表的表结构,所符合的某种设计标准的级别。

 使用范式的根本目的是:

  1)减少数据冗余,尽量让每个数据只出现一次。

  2)保证数据一致性

 缺点是获取数据时,需要通过join拼接出最后的数据。

2.3.2 函数依赖

2.3.3 三范式区分

第一范式

 

第二范式

 

第三范式

 

2.4 关系建模与维度建模

  • 关系模型

关系模型主要应用与OLTP系统中,为了保证数据的一致性以及避免冗余,所以大部分业务系统的表都是遵循第三范式的。

  • 维度模型

维度模型主要应用于OLAP系统中,因为关系模型虽然冗余少,但是在大规模数据,跨表分析统计查询过程中会造成多表关联,这会大大降低执行效率

所以把相关各种表整理成两种:事实表和维度表两种。所有维度表围绕着事实表进行解释。

2.5 雪花模型、星型模型和星座模型

在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。

第3章 数仓搭建

3.1 业务数据生成

3.1.1 建表语句

1)通过SQLyog创建数据库gmall

2)设置数据库编码

3)导入建表语句(1建表脚本)

选择->1建表脚本.sql

  1. CREATE TABLE `order_info` (
  2. `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  3. `consignee` VARCHAR(100) DEFAULT NULL COMMENT '收货人',
  4. `consignee_tel` VARCHAR(20) DEFAULT NULL COMMENT '收件人电话',
  5. `total_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '总金额',
  6. `order_status` VARCHAR(20) DEFAULT NULL COMMENT '订单状态',
  7. `user_id` BIGINT(20) DEFAULT NULL COMMENT '用户id',
  8. `payment_way` VARCHAR(20) DEFAULT NULL COMMENT '付款方式',
  9. `delivery_address` VARCHAR(1000) DEFAULT NULL COMMENT '送货地址',
  10. `order_comment` VARCHAR(200) DEFAULT NULL COMMENT '订单备注',
  11. `out_trade_no` VARCHAR(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  12. `trade_body` VARCHAR(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  13. `create_time` DATETIME DEFAULT NULL COMMENT '创建时间',
  14. `operate_time` DATETIME DEFAULT NULL COMMENT '操作时间',
  15. `expire_time` DATETIME DEFAULT NULL COMMENT '失效时间',
  16. `tracking_no` VARCHAR(100) DEFAULT NULL COMMENT '物流单编号',
  17. `parent_order_id` BIGINT(20) DEFAULT NULL COMMENT '父订单编号',
  18. `img_url` VARCHAR(200) DEFAULT NULL COMMENT '图片路径',
  19. PRIMARY KEY (`id`)
  20. ) ENGINE=INNODB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表 订单表';
  21. DROP TABLE IF EXISTS sku_info;
  22. CREATE TABLE `sku_info` (
  23. `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '库存id(itemID)',
  24. `spu_id` BIGINT(20) DEFAULT NULL COMMENT '商品id',
  25. `price` DECIMAL(10,0) DEFAULT NULL COMMENT '价格',
  26. `sku_name` VARCHAR(200) DEFAULT NULL COMMENT 'sku名称',
  27. `sku_desc` VARCHAR(2000) DEFAULT NULL COMMENT '商品规格描述',
  28. `weight` DECIMAL(10,2) DEFAULT NULL COMMENT '重量',
  29. `tm_id` BIGINT(20) DEFAULT NULL COMMENT '品牌(冗余)',
  30. `category3_id` BIGINT(20) DEFAULT NULL COMMENT '三级分类id(冗余)',
  31. `sku_default_img` VARCHAR(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
  32. `create_time` DATETIME DEFAULT NULL COMMENT '创建时间',
  33. PRIMARY KEY (`id`)
  34. ) ENGINE=INNODB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8 COMMENT='库存单元表';
  35. CREATE TABLE `user_info` (
  36. `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  37. `login_name` VARCHAR(200) DEFAULT NULL COMMENT '用户名称',
  38. `nick_name` VARCHAR(200) DEFAULT NULL COMMENT '用户昵称',
  39. `passwd` VARCHAR(200) DEFAULT NULL COMMENT '用户密码',
  40. `name` VARCHAR(200) DEFAULT NULL COMMENT '用户姓名',
  41. `phone_num` VARCHAR(200) DEFAULT NULL COMMENT '手机号',
  42. `email` VARCHAR(200) DEFAULT NULL COMMENT '邮箱',
  43. `head_img` VARCHAR(200) DEFAULT NULL COMMENT '头像',
  44. `user_level` VARCHAR(200) DEFAULT NULL COMMENT '用户级别',
  45. `birthday` DATE DEFAULT NULL COMMENT '用户生日',
  46. `gender` VARCHAR(1) DEFAULT NULL COMMENT '性别 M男,F女',
  47. `create_time` DATETIME DEFAULT NULL COMMENT '创建时间',
  48. PRIMARY KEY (`id`)
  49. ) ENGINE=INNODB AUTO_INCREMENT=9501 DEFAULT CHARSET=utf8 COMMENT='用户表';
  50. CREATE TABLE `order_detail` (
  51. `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  52. `order_id` BIGINT(20) DEFAULT NULL COMMENT '订单编号',
  53. `sku_id` BIGINT(20) DEFAULT NULL COMMENT 'sku_id',
  54. `sku_name` VARCHAR(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  55. `img_url` VARCHAR(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  56. `order_price` DECIMAL(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  57. `sku_num` VARCHAR(200) DEFAULT NULL COMMENT '购买个数',
  58. PRIMARY KEY (`id`)
  59. ) ENGINE=INNODB AUTO_INCREMENT=55750 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
  60. DROP TABLE IF EXISTS `payment_info`;
  61. CREATE TABLE `payment_info`
  62. (
  63. `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '编号',
  64. `out_trade_no` VARCHAR(20) COMMENT '对外业务编号',
  65. `order_id` VARCHAR(20) COMMENT '订单编号',
  66. `user_id` VARCHAR(20) COMMENT '用户编号',
  67. `alipay_trade_no` VARCHAR(20) COMMENT '支付宝交易流水编号',
  68. `total_amount` DECIMAL(16,2) COMMENT '支付金额',
  69. `subject` VARCHAR(20) COMMENT '交易内容',
  70. `payment_type` VARCHAR(20) COMMENT '支付方式',
  71. `payment_time` VARCHAR(20) COMMENT '支付时间',
  72. PRIMARY KEY (`id`)
  73. ) ENGINE=INNODB AUTO_INCREMENT=55750 DEFAULT CHARSET=utf8 COMMENT='支付流水表';
  74. CREATE TABLE `base_category1`
  75. (
  76. `id` BIGINT AUTO_INCREMENT PRIMARY KEY NOT NULL COMMENT '编号',
  77. `name` VARCHAR(10) NOT NULL COMMENT '分类名称'
  78. );
  79. ALTER TABLE `base_category1` COMMENT= '一级分类表';
  80. CREATE TABLE `base_category2`
  81. (
  82. `id` BIGINT AUTO_INCREMENT PRIMARY KEY NOT NULL COMMENT '编号',
  83. `name` VARCHAR(200) NOT NULL COMMENT '二级分类名称',
  84. `category1_id` BIGINT COMMENT '一级分类编号'
  85. );
  86. ALTER TABLE `base_category2` COMMENT= '二级分类表';
  87. CREATE TABLE `base_category3`
  88. (
  89. `id` BIGINT AUTO_INCREMENT PRIMARY KEY NOT NULL COMMENT '编号',
  90. `name` VARCHAR(200) NOT NULL COMMENT '三级分类名称',
  91. `category2_id` BIGINT COMMENT '二级分类编号'
  92. );
  93. ALTER TABLE `base_category3` COMMENT= '三级分类表';
  94. #####

4)重复步骤3的导入方式,依次导入:2商品分类数据插入脚本、3函数脚本、4存储过程脚本。

3.1.2 生成业务数据

1)生成业务数据函数说明

       init_data ( do_date_string VARCHAR(20) , order_incr_num INT, user_incr_num INT , sku_num INT , if_truncate BOOLEAN  ):

       参数一:do_date_string生成数据日期

       参数二:order_incr_num订单id个数

       参数三:user_incr_num用户id个数

       参数四:sku_num商品sku个数

       参数五:if_truncate是否参数数据

2)案例测试:

(1)需求:生成日期2019年2月10日数据、订单1000个、用户200个、商品sku300个、不删除数据。

  1. DELIMITER $$
  2. DROP PROCEDURE IF EXISTS `insert_sku`$$
  3. ##新增max_num个sku
  4. CREATE DEFINER=`root`@`localhost` PROCEDURE `insert_sku`( create_time_string VARCHAR(200),max_num INT )
  5. BEGIN
  6. DECLARE v_create_time DATETIME DEFAULT NULL;
  7. DECLARE i INT DEFAULT 0;
  8. SET autocommit = 0;
  9. REPEAT
  10. SET i = i + 1;
  11. SET v_create_time=DATE_ADD(DATE_FORMAT(create_time_string,'%Y-%m-%d') ,INTERVAL rand_num(1,3600*24) SECOND);
  12. INSERT INTO sku_info (spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,create_time )
  13. VALUES (rand_num(1,1000) ,rand_num(10,5000) , rand_string(20), rand_string(30),CAST(rand_num(50,500) AS DECIMAL(10,2))/100.0 ,rand_num(1,100), rand_num(1,5000),CONCAT('http://',rand_string(40)), v_create_time );
  14. UNTIL i = max_num
  15. END REPEAT;
  16. COMMIT;
  17. END$$
  18. DELIMITER ;
  19. DELIMITER $$
  20. DROP PROCEDURE IF EXISTS `insert_user`$$
  21. #随机产生max_num个用户
  22. CREATE DEFINER=`root`@`localhost` PROCEDURE `insert_user`( create_time_string VARCHAR(200),max_num INT )
  23. BEGIN
  24. DECLARE v_create_time DATETIME DEFAULT NULL;
  25. DECLARE i INT DEFAULT 0;
  26. DECLARE v_birthday DATE DEFAULT 0;
  27. DECLARE v_gender VARCHAR(1) DEFAULT NULL;
  28. SET autocommit = 0;
  29. REPEAT
  30. SET i = i + 1;
  31. SET v_create_time=DATE_ADD(DATE_FORMAT(create_time_string,'%Y-%m-%d') ,INTERVAL rand_num(1,3600*24) SECOND);
  32. SET v_birthday=DATE_ADD(DATE_FORMAT('1950-01-01','%Y-%m-%d') ,INTERVAL rand_num(1,365*50) DAY);
  33. SET v_gender=IF(rand_num(0,1)=0,'M','F');
  34. INSERT INTO user_info (login_name,nick_name,passwd,NAME,phone_num,email,head_img,user_level,birthday,gender,create_time )
  35. VALUES (rand_string(20) ,rand_string(20) , CONCAT('pwd',rand_string(20)), rand_string(30), CONCAT('13',rand_nums(0,9,9,'')) ,CONCAT(rand_string(8),'@',rand_string(3),'.com') , CONCAT('http://',rand_string(40)), rand_num(1,5),v_birthday,v_gender,v_create_time );
  36. UNTIL i = max_num
  37. END REPEAT;
  38. COMMIT;
  39. END$$
  40. DELIMITER ;
  41. DELIMITER $$
  42. DROP PROCEDURE IF EXISTS `insert_order`$$
  43. ##生成订单
  44. CREATE DEFINER=`root`@`localhost` PROCEDURE `insert_order`( create_time_string VARCHAR(200),max_num INT,user_num INT ,sku_num INT )
  45. BEGIN
  46. DECLARE v_create_time DATETIME DEFAULT NULL;
  47. DECLARE i INT DEFAULT 0;
  48. DECLARE v_order_status INT DEFAULT 0;
  49. DECLARE v_operate_time DATETIME DEFAULT NULL;
  50. DECLARE v_order_id INT DEFAULT NULL;
  51. DECLARE v_order_detail_num INT DEFAULT NULL;
  52. DECLARE j INT DEFAULT 0;
  53. SET autocommit = 0;
  54. REPEAT
  55. SET i = i + 1;
  56. SET v_create_time=DATE_ADD(DATE_FORMAT(create_time_string,'%Y-%m-%d') ,INTERVAL rand_num(30,3600*23) SECOND);
  57. SET v_order_status=rand_num(1,2); ##
  58. IF v_order_status>1 THEN
  59. SET v_operate_time= DATE_ADD(v_create_time ,INTERVAL rand_num(30,3600) SECOND);
  60. ELSE
  61. SET v_operate_time=NULL ;
  62. END IF ;
  63. INSERT INTO order_info (consignee, consignee_tel,total_amount ,order_status ,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time, tracking_no,parent_order_id ,img_url)
  64. VALUES (rand_string(6) , CONCAT('13',rand_nums(0,9,9,'')),CAST(rand_num(50,1000) AS DECIMAL(10,2)) ,v_order_status ,rand_num(1,user_num), rand_num(1,2),rand_string(20),rand_string(20),rand_nums(0,9,10,''),'',v_create_time, v_operate_time,NULL,NULL,NULL,NULL );
  65. SELECT LAST_INSERT_ID() INTO v_order_id ;
  66. SET v_order_detail_num=rand_num(1,5);
  67. WHILE j<v_order_detail_num DO
  68. SET j=j+1;
  69. INSERT INTO order_detail (order_id , sku_id,sku_name ,img_url ,order_price,sku_num )
  70. VALUES (v_order_id , rand_num(1,sku_num),rand_string(10),CONCAT('http://',rand_string(40)) ,CAST(rand_num(20,5000) AS DECIMAL(10,2)), rand_num(1,5) );
  71. END WHILE;
  72. SET j=0;
  73. UNTIL i = max_num
  74. END REPEAT;
  75. COMMIT;
  76. END$$
  77. DELIMITER ;
  78. DELIMITER $$
  79. DROP PROCEDURE IF EXISTS `update_order`$$
  80. ## 随机让订单状态小于5的订单 发生状态改变
  81. CREATE DEFINER=`root`@`localhost` PROCEDURE `update_order`(operate_time_string VARCHAR(20))
  82. BEGIN
  83. DECLARE v_operate_time DATETIME DEFAULT NULL;
  84. SET v_operate_time=DATE_FORMAT(operate_time_string,'%Y-%m-%d');
  85. UPDATE order_info o SET o.`order_status`=o.`order_status`+rand_num_seed(0,1,o.id) ,operate_time= IF( rand_num_seed(0,1,o.id) >0 , DATE_ADD(v_operate_time ,INTERVAL rand_num(30,20*3600) SECOND),operate_time)
  86. WHERE o.`order_status`<5;
  87. END$$
  88. DELIMITER ;
  89. DELIMITER$$
  90. DROP PROCEDURE IF EXISTS `insert_payment`$$
  91. ## 只要订单状态更新为2 ,给当天插入一条支付信息
  92. CREATE DEFINER=`root`@`localhost` PROCEDURE `insert_payment`( do_date_str VARCHAR(200) )
  93. BEGIN
  94. INSERT INTO payment_info (out_trade_no,order_id,user_id,alipay_trade_no,total_amount,`subject`,payment_type,payment_time )
  95. SELECT o.out_trade_no,o.id,user_id,
  96. CONCAT( rand_string(4),'-',rand_nums(0,9,8,'')) alipay_trade_no,
  97. o.total_amount,
  98. rand_string(8) `subject`,
  99. ( CASE rand_num(1,3) WHEN 1 THEN 'wechatpay' WHEN 2 THEN 'alipay' WHEN 3 THEN 'unionpay' END) payment_type ,
  100. IF(o.operate_time IS NULL,o.create_time,o.operate_time) payment_time
  101. FROM order_info o
  102. WHERE (DATE_FORMAT(o.create_time,'%Y-%m-%d')= do_date_str OR DATE_FORMAT(o.operate_time,'%Y-%m-%d')= do_date_str ) AND o.order_status='2';
  103. COMMIT;
  104. END$$
  105. DELIMITER ;
  106. DELIMITER $$
  107. DROP PROCEDURE IF EXISTS `init_data`$$
  108. CREATE DEFINER=`root`@`localhost` PROCEDURE `init_data`( do_date_string VARCHAR(20) ,order_incr_num INT,user_incr_num INT ,sku_num INT ,if_truncate BOOLEAN )
  109. BEGIN
  110. DECLARE user_count INT DEFAULT 0;
  111. DECLARE sku_count INT DEFAULT 0;
  112. DECLARE do_date VARCHAR(20) DEFAULT do_date_string;
  113. IF if_truncate THEN
  114. TRUNCATE TABLE order_info ;
  115. TRUNCATE TABLE order_detail ;
  116. TRUNCATE TABLE sku_info ;
  117. TRUNCATE TABLE user_info ;
  118. END IF ;
  119. CALL insert_sku(do_date,sku_num );
  120. SELECT COUNT(*) INTO sku_count FROM sku_info;
  121. CALL insert_user(do_date,user_incr_num );
  122. SELECT COUNT(*) INTO user_count FROM user_info;
  123. CALL update_order(do_date);
  124. CALL insert_order(do_date,order_incr_num,user_count,sku_count);
  125. CALL insert_payment(do_date);
  126. END$$
  127. DELIMITER ;
CALL init_data('2019-02-10',1000,200,300,FALSE);

(2)查询生成数据结果

  1. SELECT * from base_category1;
  2. SELECT * from base_category2;
  3. SELECT * FROM base_category3;
  4. SELECT * FROM order_info;
  5. SELECT * from order_detail;
  6. SELECT * from sku_info;
  7. SELECT * from user_info;
  8. SELECT * FROM payment_info;

(3)生成2019年2月11日数据

CALL init_data('2019-02-11',1000,200,300,FALSE);

3.2 业务数据导入数仓

Flume消费

3.2.1 Sqoop安装

=================================== Begin ===========================

第3章 Sqoop安装 

安装Sqoop的前提是已经具备Java和Hadoop的环境。

3.1 下载并解压

1) 下载地址:http://mirrors.hust.edu.cn/apache/sqoop/1.4.6/

2) 上传安装包sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz到虚拟机中

3) 解压sqoop安装包到指定目录,如:

$ tar -zxf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/module/

3.2 修改配置文件

Sqoop的配置文件与大多数大数据框架类似,在sqoop根目录下的conf目录中。

1) 重命名配置文件

$ mv sqoop-env-template.sh sqoop-env.sh

2) 修改配置文件

sqoop-env.sh

  1. export HADOOP_COMMON_HOME=/opt/module/hadoop-2.7.2
  2. export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
  3. export HIVE_HOME=/opt/module/hive
  4. export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
  5. export ZOOCFGDIR=/opt/module/zookeeper-3.4.10
  6. export HBASE_HOME=/opt/module/hbase

3.3 拷贝JDBC驱动

拷贝jdbc驱动到sqoop的lib目录下,如:

$ cp mysql-connector-java-5.1.27-bin.jar /opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/lib/

3.4 验证Sqoop

我们可以通过某一个command来验证sqoop配置是否正确:

$ bin/sqoop help

出现一些Warning警告(警告信息已省略),并伴随着帮助命令的输出:

  1. Available commands:
  2. codegen Generate code to interact with database records
  3. create-hive-table Import a table definition into Hive
  4. eval Evaluate a SQL statement and display the results
  5. export Export an HDFS directory to a database table
  6. help List available commands
  7. import Import a table from a database to HDFS
  8. import-all-tables Import tables from a database to HDFS
  9. import-mainframe Import datasets from a mainframe server to HDFS
  10. job Work with saved jobs
  11. list-databases List available databases on a server
  12. list-tables List available tables in a database
  13. merge Merge results of incremental imports
  14. metastore Run a standalone Sqoop metastore
  15. version Display version information

3.5 测试Sqoop是否能够成功连接数据库

$ bin/sqoop list-databases --connect jdbc:mysql://hadoop102:3306/ --username root --password 000000

出现如下输出:

  1. information_schema
  2. metastore
  3. mysql
  4. oozie
  5. performance_schema

第4章 Sqoop的简单使用案例

4.1 导入数据

在Sqoop中,“导入”概念指:从非大数据集群(RDBMS)向大数据集群(HDFS,HIVE,HBASE)中传输数据,叫做:导入,即使用import关键字。

4.1.1 RDBMS到HDFS

1) 确定Mysql服务开启正常

2) 在Mysql中新建一张表并插入一些数据

  1. $ mysql -uroot -p000000
  2. mysql> create database company;
  3. mysql> create table company.staff(id int(4) primary key not null auto_increment, name varchar(255), sex varchar(255));
  4. mysql> insert into company.staff(name, sex) values('Thomas', 'Male');
  5. mysql> insert into company.staff(name, sex) values('Catalina', 'FeMale');

3) 导入数据

       (1)全部导入

  1. $ bin/sqoop import \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --table staff \
  6. --target-dir /user/company \
  7. --delete-target-dir \
  8. --num-mappers 1 \
  9. --fields-terminated-by "\t"

       (2)查询导入

  1. $ bin/sqoop import \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --target-dir /user/company \
  6. --delete-target-dir \
  7. --num-mappers 1 \
  8. --fields-terminated-by "\t" \
  9. --query 'select name,sex from staff where id <=1 and $CONDITIONS;'

提示:must contain '$CONDITIONS' in WHERE clause.

如果query后使用的是双引号,则$CONDITIONS前必须加转移符,防止shell识别为自己的变量。

       (3)导入指定列

  1. $ bin/sqoop import \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --target-dir /user/company \
  6. --delete-target-dir \
  7. --num-mappers 1 \
  8. --fields-terminated-by "\t" \
  9. --columns id,sex \
  10. --table staff

提示:columns中如果涉及到多列,用逗号分隔,分隔时不要添加空格

       (4)使用sqoop关键字筛选查询导入数据

  1. $ bin/sqoop import \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --target-dir /user/company \
  6. --delete-target-dir \
  7. --num-mappers 1 \
  8. --fields-terminated-by "\t" \
  9. --table staff \
  10. --where "id=1"

4.1.2 RDBMS到Hive

  1. $ bin/sqoop import \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --table staff \
  6. --num-mappers 1 \
  7. --hive-import \
  8. --fields-terminated-by "\t" \
  9. --hive-overwrite \
  10. --hive-table staff_hive

提示:该过程分为两步,第一步将数据导入到HDFS,第二步将导入到HDFS的数据迁移到Hive仓库,第一步默认的临时目录是/user/newbies/表名

4.1.3 RDBMS到Hbase

  1. $ bin/sqoop import \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --table company \
  6. --columns "id,name,sex" \
  7. --column-family "info" \
  8. --hbase-create-table \
  9. --hbase-row-key "id" \
  10. --hbase-table "hbase_company" \
  11. --num-mappers 1 \
  12. --split-by id

提示:sqoop1.4.6只支持HBase1.0.1之前的版本的自动创建HBase表的功能

解决方案:手动创建HBase表

hbase> create 'hbase_company,'info'

(5) 在HBase中scan这张表得到如下内容

hbase> scan ‘hbase_company’

4.2、导出数据

在Sqoop中,“导出”概念指:从大数据集群(HDFS,HIVE,HBASE)向非大数据集群(RDBMS)中传输数据,叫做:导出,即使用export关键字。

4.2.1 HIVE/HDFS到RDBMS

  1. $ bin/sqoop export \
  2. --connect jdbc:mysql://hadoop102:3306/company \
  3. --username root \
  4. --password 000000 \
  5. --table staff \
  6. --num-mappers 1 \
  7. --export-dir /user/hive/warehouse/staff_hive \
  8. --input-fields-terminated-by "\t"

提示:Mysql中如果表不存在,不会自动创建

4.3 脚本打包

使用opt格式的文件打包sqoop命令,然后执行

1) 创建一个.opt文件

  1. $ mkdir opt
  2. $ touch opt/job_HDFS2RDBMS.opt

2) 编写sqoop脚本

  1. $ vi opt/job_HDFS2RDBMS.opt
  2. export
  3. --connect
  4. jdbc:mysql://hadoop102:3306/company
  5. --username
  6. root
  7. --password
  8. 000000
  9. --table
  10. staff
  11. --num-mappers
  12. 1
  13. --export-dir
  14. /user/hive/warehouse/staff_hive
  15. --input-fields-terminated-by
  16. "\t"

3) 执行该脚本

$ bin/sqoop --options-file opt/job_HDFS2RDBMS.opt

=========================== End ==============================================

3.2.2 Sqoop导入命令

  1. /opt/module/sqoop/bin/sqoop import \
  2. --connect \
  3. --username \
  4. --password \
  5. --target-dir \
  6. --delete-target-dir \
  7. --num-mappers \
  8. --fields-terminated-by \
  9. --query "$2"' and $CONDITIONS;'

3.2.3 分析表

分析表的同步策略

3.2.4 Sqoop定时导入脚本

1)在/home/newbies/bin目录下创建脚本sqoop_import.sh

[newbies@hadoop102 bin]$ vim sqoop_import.sh

       在脚本中填写如下内容

  1. #!/bin/bash
  2. db_date=$2
  3. echo $db_date
  4. db_name=gmall
  5. import_data() {
  6. /opt/module/sqoop/bin/sqoop import \
  7. --connect jdbc:mysql://hadoop102:3306/$db_name \
  8. --username root \
  9. --password 000000 \
  10. --target-dir /origin_data/$db_name/db/$1/$db_date \
  11. --delete-target-dir \
  12. --num-mappers 1 \
  13. --fields-terminated-by "\t" \
  14. --query "$2"' and $CONDITIONS;'
  15. }
  16. import_sku_info(){
  17. import_data "sku_info" "select
  18. id, spu_id, price, sku_name, sku_desc, weight, tm_id,
  19. category3_id, create_time
  20. from sku_info where 1=1"
  21. }
  22. import_user_info(){
  23. import_data "user_info" "select
  24. id, name, birthday, gender, email, user_level,
  25. create_time
  26. from user_info where 1=1"
  27. }
  28. import_base_category1(){
  29. import_data "base_category1" "select
  30. id, name from base_category1 where 1=1"
  31. }
  32. import_base_category2(){
  33. import_data "base_category2" "select
  34. id, name, category1_id from base_category2 where 1=1"
  35. }
  36. import_base_category3(){
  37. import_data "base_category3" "select id, name, category2_id from base_category3 where 1=1"
  38. }
  39. import_order_detail(){
  40. import_data "order_detail" "select
  41. od.id,
  42. order_id,
  43. user_id,
  44. sku_id,
  45. sku_name,
  46. order_price,
  47. sku_num,
  48. o.create_time
  49. from order_info o , order_detail od
  50. where o.id=od.order_id
  51. and DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date'"
  52. }
  53. import_payment_info(){
  54. import_data "payment_info" "select
  55. id,
  56. out_trade_no,
  57. order_id,
  58. user_id,
  59. alipay_trade_no,
  60. total_amount,
  61. subject ,
  62. payment_type,
  63. payment_time
  64. from payment_info
  65. where DATE_FORMAT(payment_time,'%Y-%m-%d')='$db_date'"
  66. }
  67. import_order_info(){
  68. import_data "order_info" "select
  69. id,
  70. total_amount,
  71. order_status,
  72. user_id,
  73. payment_way,
  74. out_trade_no,
  75. create_time,
  76. operate_time
  77. from order_info
  78. where (DATE_FORMAT(create_time,'%Y-%m-%d')='$db_date' or DATE_FORMAT(operate_time,'%Y-%m-%d')='$db_date')"
  79. }
  80. case $1 in
  81. "base_category1")
  82. import_base_category1
  83. ;;
  84. "base_category2")
  85. import_base_category2
  86. ;;
  87. "base_category3")
  88. import_base_category3
  89. ;;
  90. "order_info")
  91. import_order_info
  92. ;;
  93. "order_detail")
  94. import_order_detail
  95. ;;
  96. "sku_info")
  97. import_sku_info
  98. ;;
  99. "user_info")
  100. import_user_info
  101. ;;
  102. "payment_info")
  103. import_payment_info
  104. ;;
  105. "all")
  106. import_base_category1
  107. import_base_category2
  108. import_base_category3
  109. import_order_info
  110. import_order_detail
  111. import_sku_info
  112. import_user_info
  113. import_payment_info
  114. ;;
  115. esac

2)增加脚本执行权限

[newbies@hadoop102 bin]$ chmod 777 sqoop_import.sh

3)执行脚本导入数据

  1. [newbies@hadoop102 bin]$ sqoop_import.sh all 2019-02-10
  2. [newbies@hadoop102 bin]$ sqoop_import.sh all 2019-02-11

3.3 ODS层

完全仿照业务数据库中的表字段,一模一样的创建ODS层对应表。

3.3.1 创建订单表

  1. hive (gmall)>
  2. drop table if exists ods_order_info;
  3. create table ods_order_info (
  4. `id` string COMMENT '订单编号',
  5. `total_amount` decimal(10,2) COMMENT '订单金额',
  6. `order_status` string COMMENT '订单状态',
  7. `user_id` string COMMENT '用户id' ,
  8. `payment_way` string COMMENT '支付方式',
  9. `out_trade_no` string COMMENT '支付流水号',
  10. `create_time` string COMMENT '创建时间',
  11. `operate_time` string COMMENT '操作时间'
  12. ) COMMENT '订单表'
  13. PARTITIONED BY ( `dt` string)
  14. row format delimited fields terminated by '\t'
  15. location '/warehouse/gmall/ods/ods_order_info/'
  16. tblproperties ("parquet.compression"="snappy")
  17. ;

3.3.2 创建订单详情表

  1. hive (gmall)>
  2. drop table if exists ods_order_detail;
  3. create table ods_order_detail(
  4. `id` string COMMENT '订单编号',
  5. `order_id` string COMMENT '订单号',
  6. `user_id` string COMMENT '用户id' ,
  7. `sku_id` string COMMENT '商品id',
  8. `sku_name` string COMMENT '商品名称',
  9. `order_price` string COMMENT '下单价格',
  10. `sku_num` string COMMENT '商品数量',
  11. `create_time` string COMMENT '创建时间'
  12. ) COMMENT '订单明细表'
  13. PARTITIONED BY ( `dt` string)
  14. row format delimited fields terminated by '\t'
  15. location '/warehouse/gmall/ods/ods_order_detail/'
  16. tblproperties ("parquet.compression"="snappy")
  17. ;

 3.3.3 创建商品表

  1. hive (gmall)>
  2. drop table if exists ods_sku_info;
  3. create table ods_sku_info(
  4. `id` string COMMENT 'skuId',
  5. `spu_id` string COMMENT 'spuid',
  6. `price` decimal(10,2) COMMENT '价格' ,
  7. `sku_name` string COMMENT '商品名称',
  8. `sku_desc` string COMMENT '商品描述',
  9. `weight` string COMMENT '重量',
  10. `tm_id` string COMMENT '品牌id',
  11. `category3_id` string COMMENT '品类id',
  12. `create_time` string COMMENT '创建时间'
  13. ) COMMENT '商品表'
  14. PARTITIONED BY ( `dt` string)
  15. row format delimited fields terminated by '\t'
  16. location '/warehouse/gmall/ods/ods_sku_info/'
  17. tblproperties ("parquet.compression"="snappy")
  18. ;

3.3.4 创建用户表

  1. hive (gmall)>
  2. drop table if exists ods_user_info;
  3. create table ods_user_info(
  4. `id` string COMMENT '用户id',
  5. `name` string COMMENT '姓名',
  6. `birthday` string COMMENT '生日' ,
  7. `gender` string COMMENT '性别',
  8. `email` string COMMENT '邮箱',
  9. `user_level` string COMMENT '用户等级',
  10. `create_time` string COMMENT '创建时间'
  11. ) COMMENT '用户信息'
  12. PARTITIONED BY ( `dt` string)
  13. row format delimited fields terminated by '\t'
  14. location '/warehouse/gmall/ods/ods_user_info/'
  15. tblproperties ("parquet.compression"="snappy")
  16. ;

3.3.5 创建商品一级分类表

  1. hive (gmall)>
  2. drop table if exists ods_base_category1;
  3. create table ods_base_category1(
  4. `id` string COMMENT 'id',
  5. `name` string COMMENT '名称'
  6. ) COMMENT '商品一级分类'
  7. PARTITIONED BY ( `dt` string)
  8. row format delimited fields terminated by '\t'
  9. location '/warehouse/gmall/ods/ods_base_category1/'
  10. tblproperties ("parquet.compression"="snappy")
  11. ;

3.3.6 创建商品二级分类表

  1. hive (gmall)>
  2. drop table if exists ods_base_category2;
  3. create external table ods_base_category2(
  4. `id` string COMMENT ' id',
  5. `name` string COMMENT '名称',
  6. category1_id string COMMENT '一级品类id'
  7. ) COMMENT '商品二级分类'
  8. PARTITIONED BY ( `dt` string)
  9. row format delimited fields terminated by '\t'
  10. location '/warehouse/gmall/ods/ods_base_category2/'
  11. tblproperties ("parquet.compression"="snappy")
  12. ;

3.3.7 创建商品三级分类表

  1. hive (gmall)>
  2. drop table if exists ods_base_category3;
  3. create table ods_base_category3(
  4. `id` string COMMENT ' id',
  5. `name` string COMMENT '名称',
  6. category2_id string COMMENT '二级品类id'
  7. ) COMMENT '商品三级分类'
  8. PARTITIONED BY ( `dt` string)
  9. row format delimited fields terminated by '\t'
  10. location '/warehouse/gmall/ods/ods_base_category3/'
  11. tblproperties ("parquet.compression"="snappy")
  12. ;

3.3.8 创建支付流水表

  1. hive (gmall)>
  2. drop table if exists `ods_payment_info`;
  3. create table `ods_payment_info`(
  4. `id` bigint COMMENT '编号',
  5. `out_trade_no` string COMMENT '对外业务编号',
  6. `order_id` string COMMENT '订单编号',
  7. `user_id` string COMMENT '用户编号',
  8. `alipay_trade_no` string COMMENT '支付宝交易流水编号',
  9. `total_amount` decimal(16,2) COMMENT '支付金额',
  10. `subject` string COMMENT '交易内容',
  11. `payment_type` string COMMENT '支付类型',
  12. `payment_time` string COMMENT '支付时间'
  13. ) COMMENT '支付流水表'
  14. PARTITIONED BY ( `dt` string)
  15. row format delimited fields terminated by '\t'
  16. location '/warehouse/gmall/ods/ods_payment_info/'
  17. tblproperties ("parquet.compression"="snappy")
  18. ;

3.3.9 ODS层数据导入脚本

1)在/home/newbies/bin目录下创建脚本ods_db.sh

[newbies@hadoop102 bin]$ vim ods_db.sh

       在脚本中填写如下内容

  1. #!/bin/bash
  2. do_date=$1
  3. APP=gmall
  4. hive=/opt/module/hive/bin/hive
  5. sql="
  6. load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table $APP"".ods_order_info partition(dt='$do_date');
  7. load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table $APP"".ods_order_detail partition(dt='$do_date');
  8. load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table $APP"".ods_sku_info partition(dt='$do_date');
  9. load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table $APP"".ods_user_info partition(dt='$do_date');
  10. load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table $APP"".ods_payment_info partition(dt='$do_date');
  11. load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table $APP"".ods_base_category1 partition(dt='$do_date');
  12. load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table $APP"".ods_base_category2 partition(dt='$do_date');
  13. load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table $APP"".ods_base_category3 partition(dt='$do_date');
  14. "
  15. $hive -e "$sql"

2)增加脚本执行权限

[newbies@hadoop102 bin]$ chmod 777 ods_db.sh

3)执行脚本导入数据

  1. [newbies@hadoop102 bin]$ ods_db.sh 2019-02-10
  2. [newbies@hadoop102 bin]$ ods_db.sh 2019-02-11

4)查询导入数据

  1. hive (gmall)>
  2. select * from ods_order_info where dt='2019-02-10' limit 1;
  3. select * from ods_order_info where dt='2019-02-11' limit 1;

3.4 DWD层

积压ODS层对数据进行判空过滤。对商品分类表进行维度退化(降维)。

3.4.1 创建订单表

  1. hive (gmall)>
  2. drop table if exists dwd_order_info;
  3. create external table dwd_order_info (
  4. `id` string COMMENT '',
  5. `total_amount` decimal(10,2) COMMENT '',
  6. `order_status` string COMMENT ' 1 2 3 4 5',
  7. `user_id` string COMMENT 'id' ,
  8. `payment_way` string COMMENT '',
  9. `out_trade_no` string COMMENT '',
  10. `create_time` string COMMENT '',
  11. `operate_time` string COMMENT ''
  12. ) COMMENT ''
  13. PARTITIONED BY ( `dt` string)
  14. stored as parquet
  15. location '/warehouse/gmall/dwd/dwd_order_info/'
  16. tblproperties ("parquet.compression"="snappy")
  17. ;

3.4.2 创建订单详情表

  1. hive (gmall)>
  2. drop table if exists dwd_order_detail;
  3. create external table dwd_order_detail(
  4. `id` string COMMENT '',
  5. `order_id` decimal(10,2) COMMENT '',
  6. `user_id` string COMMENT 'id' ,
  7. `sku_id` string COMMENT 'id',
  8. `sku_name` string COMMENT '',
  9. `order_price` string COMMENT '',
  10. `sku_num` string COMMENT '',
  11. `create_time` string COMMENT ''
  12. ) COMMENT ''
  13. PARTITIONED BY ( `dt` string)
  14. stored as parquet
  15. location '/warehouse/gmall/dwd/dwd_order_detail/'
  16. tblproperties ("parquet.compression"="snappy")
  17. ;

3.4.3 创建用户表

  1. hive (gmall)>
  2. drop table if exists dwd_user_info;
  3. create external table dwd_user_info(
  4. `id` string COMMENT 'id',
  5. `name` string COMMENT '',
  6. `birthday` string COMMENT '' ,
  7. `gender` string COMMENT '',
  8. `email` string COMMENT '',
  9. `user_level` string COMMENT '',
  10. `create_time` string COMMENT ''
  11. ) COMMENT ''
  12. PARTITIONED BY ( `dt` string)
  13. stored as parquet
  14. location '/warehouse/gmall/dwd/dwd_user_info/'
  15. tblproperties ("parquet.compression"="snappy")
  16. ;

3.4.4 创建支付流水表

  1. hive (gmall)>
  2. drop table if exists `dwd_payment_info`;
  3. create external table `dwd_payment_info`(
  4. `id` bigint COMMENT '',
  5. `out_trade_no` string COMMENT '',
  6. `order_id` string COMMENT '',
  7. `user_id` string COMMENT '',
  8. `alipay_trade_no` string COMMENT '',
  9. `total_amount` decimal(16,2) COMMENT '',
  10. `subject` string COMMENT '',
  11. `payment_type` string COMMENT '',
  12. `payment_time` string COMMENT ''
  13. ) COMMENT ''
  14. PARTITIONED BY ( `dt` string)
  15. stored as parquet
  16. location '/warehouse/gmall/dwd/dwd_payment_info/'
  17. tblproperties ("parquet.compression"="snappy")
  18. ;

3.4.5 创建商品表(增加分类)

创建商品表
  1. hive (gmall)>
  2. drop table if exists dwd_sku_info;
  3. create external table dwd_sku_info(
  4. `id` string COMMENT 'skuId',
  5. `spu_id` string COMMENT 'spuid',
  6. `price` decimal(10,2) COMMENT '' ,
  7. `sku_name` string COMMENT '',
  8. `sku_desc` string COMMENT '',
  9. `weight` string COMMENT '',
  10. `tm_id` string COMMENT 'id',
  11. `category3_id` string COMMENT '1id',
  12. `category2_id` string COMMENT '2id',
  13. `category1_id` string COMMENT '3id',
  14. `category3_name` string COMMENT '3',
  15. `category2_name` string COMMENT '2',
  16. `category1_name` string COMMENT '1',
  17. `create_time` string COMMENT ''
  18. ) COMMENT ''
  19. PARTITIONED BY ( `dt` string)
  20. stored as parquet
  21. location '/warehouse/gmall/dwd/dwd_sku_info/'
  22. tblproperties ("parquet.compression"="snappy")
  23. ;

3.4.6 DWD层数据导入脚本

1)在/home/newbies/bin目录下创建脚本dwd_db.sh

[newbies@hadoop102 bin]$ vim dwd_db.sh

在脚本中填写如下内容

  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/module/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n $1 ] ;then
  7. log_date=$1
  8. else
  9. log_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. set hive.exec.dynamic.partition.mode=nonstrict;
  13. insert overwrite table "$APP".dwd_order_info partition(dt)
  14. select * from "$APP".ods_order_info
  15. where dt='$log_date' and id is not null;
  16. insert overwrite table "$APP".dwd_order_detail partition(dt)
  17. select * from "$APP".ods_order_detail
  18. where dt='$log_date' and id is not null;
  19. insert overwrite table "$APP".dwd_user_info partition(dt)
  20. select * from "$APP".ods_user_info
  21. where dt='$log_date' and id is not null;
  22. insert overwrite table "$APP".dwd_payment_info partition(dt)
  23. select * from "$APP".ods_payment_info
  24. where dt='$log_date' and id is not null;
  25. insert overwrite table "$APP".dwd_sku_info partition(dt)
  26. select
  27. sku.id,
  28. sku.spu_id,
  29. sku.price,
  30. sku.sku_name,
  31. sku.sku_desc,
  32. sku.weight,
  33. sku.tm_id,
  34. sku.category3_id,
  35. c2.id category2_id ,
  36. c1.id category1_id,
  37. c3.name category3_name,
  38. c2.name category2_name,
  39. c1.name category1_name,
  40. sku.create_time,
  41. sku.dt
  42. from
  43. "$APP".ods_sku_info sku
  44. join "$APP".ods_base_category3 c3 on sku.category3_id=c3.id
  45. join "$APP".ods_base_category2 c2 on c3.category2_id=c2.id
  46. join "$APP".ods_base_category1 c1 on c2.category1_id=c1.id
  47. where sku.dt='$log_date' and c2.dt='$log_date'
  48. and c3.dt='$log_date' and c1.dt='$log_date'
  49. and sku.id is not null;
  50. "
  51. $hive -e "$sql"

2)增加脚本执行权限

[newbies@hadoop102 bin]$ chmod 777 dwd_db.sh

3)执行脚本导入数据

  1. [newbies@hadoop102 bin]$ dwd_db.sh 2019-02-10
  2. [newbies@hadoop102 bin]$ dwd_db.sh 2019-02-11

4)查看导入数据

  1. hive (gmall)>
  2. select * from dwd_sku_info where dt='2019-02-10' limit 2;
  3. select * from dwd_sku_info where dt='2019-02-11' limit 2;

3.4.3 小结

思考:

1)维度退化要付出什么代价?或者说会造成什么样的需求处理不了?

      如果被退化的维度,还有其他业务表使用,退化后处理起来就麻烦些。

2)想想在实际业务中还有那些维度表可以退化

3.5 DWS层之用户行为宽表

1)为什么要建宽表

需求目标,把每个用户单日的行为聚合起来组成一张多列宽表,以便之后关联用户维度信息后进行,不同角度的统计分析。

用户行为宽表

 

3.5.1 创建用户行为宽表

  1. hive (gmall)>
  2. drop table if exists dws_user_action;
  3. create external table dws_user_action
  4. (
  5. user_id string comment '用户 id',
  6. order_count bigint comment '下单次数 ',
  7. order_amount decimal(16,2) comment '下单金额 ',
  8. payment_count bigint comment '支付次数',
  9. payment_amount decimal(16,2) comment '支付金额 ',
  10. comment_count bigint comment '评论次数'
  11. ) COMMENT '每日用户行为宽表'
  12. PARTITIONED BY ( `dt` string)
  13. stored as parquet
  14. location '/warehouse/gmall/dws/dws_user_action/'
  15. tblproperties ("parquet.compression"="snappy");

3.5.2 向用户行为宽表导入数据

1)导入数据

  1. hive (gmall)>
  2. with
  3. tmp_order as
  4. (
  5. select
  6. user_id,
  7. sum(oc.total_amount) order_amount,
  8. count(*) order_count
  9. from dwd_order_info oc
  10. where date_format(oc.create_time,'yyyy-MM-dd')='2019-02-10'
  11. group by user_id
  12. ) ,
  13. tmp_payment as
  14. (
  15. select
  16. user_id,
  17. sum(pi.total_amount) payment_amount,
  18. count(*) payment_count
  19. from dwd_payment_info pi
  20. where date_format(pi.payment_time,'yyyy-MM-dd')='2019-02-10'
  21. group by user_id
  22. ),
  23. tmp_comment as
  24. (
  25. select
  26. user_id,
  27. count(*) comment_count
  28. from dwd_comment_log c
  29. where date_format(c.dt,'yyyy-MM-dd')='2019-02-10'
  30. group by user_id
  31. )
  32. insert overwrite table dws_user_action partition(dt='2019-02-10')
  33. select
  34. user_actions.user_id,
  35. sum(user_actions.order_count),
  36. sum(user_actions.order_amount),
  37. sum(user_actions.payment_count),
  38. sum(user_actions.payment_amount),
  39. sum(user_actions.comment_count)
  40. from
  41. (
  42. select
  43. user_id,
  44. order_count,
  45. order_amount ,
  46. 0 payment_count ,
  47. 0 payment_amount,
  48. 0 comment_count
  49. from tmp_order
  50. union all
  51. select
  52. user_id,
  53. 0,
  54. 0,
  55. payment_count,
  56. payment_amount,
  57. 0
  58. from tmp_payment
  59. union all
  60. select
  61. user_id,
  62. 0,
  63. 0,
  64. 0,
  65. 0,
  66. comment_count
  67. from tmp_comment
  68. ) user_actions
  69. group by user_id;

2)查询导入结果

hive (gmall)> select * from dws_user_action ;

3.5.3 DWS层用户行为数据宽边导入脚本

1)在/home/newbies/bin目录下创建脚本dws_db_wide.sh

[newbies@hadoop102 bin]$ vim dws_db_wide.sh

       在脚本中填写如下内容

  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/module/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n $1 ] ;then
  7. log_date=$1
  8. else
  9. log_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. with
  13. tmp_order as
  14. (
  15. select
  16. user_id,
  17. sum(oc.total_amount) order_amount,
  18. count(*) order_count
  19. from "$APP".dwd_order_info oc
  20. where date_format(oc.create_time,'yyyy-MM-dd')='$log_date'
  21. group by user_id
  22. ) ,
  23. tmp_payment as
  24. (
  25. select
  26. user_id,
  27. sum(pi.total_amount) payment_amount,
  28. count(*) payment_count
  29. from "$APP".dwd_payment_info pi
  30. where date_format(pi.payment_time,'yyyy-MM-dd')='$log_date'
  31. group by user_id
  32. ),
  33. tmp_comment as
  34. (
  35. select
  36. user_id,
  37. count(*) comment_count
  38. from "$APP".dwd_comment_log c
  39. where date_format(c.dt,'yyyy-MM-dd')='$log_date'
  40. group by user_id
  41. )
  42. insert overwrite table "$APP".dws_user_action partition(dt='$log_date')
  43. select
  44. user_actions.user_id,
  45. sum(user_actions.order_count),
  46. sum(user_actions.order_amount),
  47. sum(user_actions.payment_count),
  48. sum(user_actions.payment_amount),
  49. sum(user_actions.comment_count)
  50. from
  51. (
  52. select
  53. user_id,
  54. order_count,
  55. order_amount ,
  56. 0 payment_count ,
  57. 0 payment_amount,
  58. 0 comment_count
  59. from tmp_order
  60. union all
  61. select
  62. user_id,
  63. 0,
  64. 0,
  65. payment_count,
  66. payment_amount,
  67. 0
  68. from tmp_payment
  69. union all
  70. select
  71. user_id,
  72. 0,
  73. 0,
  74. 0,
  75. 0,
  76. comment_count
  77. from tmp_comment
  78. ) user_actions
  79. group by user_id;
  80. "
  81. $hive -e "$sql"

2)增加脚本执行权限

[newbies@hadoop102 bin]$ chmod 777 dws_db_wide.sh

3)执行脚本导入数据

[newbies@hadoop102 bin]$ dws_db_wide.sh 2019-02-11

4)查看导入数据

  1. hive (gmall)>
  2. select * from dws_user_action where dt='2019-02-11' limit 2;

 第4章 需求四:GMV成交总额

4.1 ADS层

4.1.1 什么是GMV

4.1.2 建表语句

  1. hive (gmall)>
  2. drop table if exists ads_gmv_sum_day;
  3. create table ads_gmv_sum_day(
  4. `dt` string COMMENT '统计日期',
  5. `gmv_count` bigint COMMENT '当日gmv订单个数',
  6. `gmv_amount` decimal(16,2) COMMENT '当日gmv订单总金额',
  7. `gmv_payment` decimal(16,2) COMMENT '当日支付金额'
  8. ) COMMENT '每日活跃用户数量'
  9. row format delimited fields terminated by '\t'
  10. location '/warehouse/gmall/ads/ads_gmv_sum_day/'
  11. ;
GMV分析

4.1.3 数据导入

1)数据导入

  1. hive (gmall)>
  2. insert into table ads_gmv_sum_day
  3. select
  4. '2019-02-10' dt ,
  5. sum(order_count) gmv_count ,
  6. sum(order_amount) gmv_amount ,
  7. sum(payment_amount) payment_amount
  8. from dws_user_action
  9. where dt ='2019-02-10'
  10. group by dt
  11. ;

2)查询导入数据

hive (gmall)> select * from ads_gmv_sum_day;

 第5章 需求五:转化率

5.1 什么是转化率

5.2 ADS层之新增用户占日活跃用户比率

新增用户占日活跃用户比率

 

5.2.1 建表语句

  1. hive (gmall)>
  2. drop table if exists ads_user_convert_day;
  3. create table ads_user_convert_day(
  4. `dt` string COMMENT '统计日期',
  5. `uv_m_count` bigint COMMENT '当日活跃设备',
  6. `new_m_count` bigint COMMENT '当日新增设备',
  7. `new_m_ratio` decimal(10,2) COMMENT '当日新增占日活的比率'
  8. ) COMMENT '每日活跃用户数量'
  9. row format delimited fields terminated by '\t'
  10. location '/warehouse/gmall/ads/ads_user_convert_day/'
  11. ;

5.2.2 数据导入

1)数据导入

  1. hive (gmall)>
  2. insert into table ads_user_convert_day
  3. select
  4. '2019-02-10',
  5. sum( uc.dc) sum_dc,
  6. sum( uc.nmc) sum_nmc,
  7. cast(sum( uc.nmc)/sum( uc.dc)*100 as decimal(10,2)) new_m_ratio
  8. from
  9. (
  10. select
  11. day_count dc,
  12. 0 nmc
  13. from ads_uv_count
  14. where dt='2019-02-10'
  15. union all
  16. select
  17. 0 dc,
  18. new_mid_count nmc
  19. from ads_new_mid_count
  20. where create_date='2019-02-10'
  21. )uc;

2)查看导入数据

  1. hive (gmall)>
  2. select * from ads_user_convert_day;

5.3 ADS层之用户行为漏斗分析

用户行为漏斗分析

5.3.1 建表语句

  1. hive (gmall)>
  2. drop table if exists ads_user_action_convert_day;
  3. create table ads_user_action_convert_day(
  4. `dt` string COMMENT '统计日期',
  5. `total_visitor_m_count` bigint COMMENT '总访问人数',
  6. `order_u_count` bigint COMMENT '下单人数',
  7. `visitor2order_convert_ratio` decimal(10,2) COMMENT '访问到下单转化率',
  8. `payment_u_count` bigint COMMENT '支付人数',
  9. `order2payment_convert_ratio` decimal(10,2) COMMENT '下单到支付的转化率'
  10. ) COMMENT '每日用户行为转化率统计'
  11. row format delimited fields terminated by '\t'
  12. location '/warehouse/gmall/ads/ads_user_convert_day/'
  13. ;

5.3.2 数据导入

1)数据导入

  1. hive (gmall)>
  2. insert into table ads_user_action_convert_day
  3. select
  4. '2019-02-10',
  5. uv.day_count,
  6. ua.order_count,
  7. cast(ua.order_count/uv.day_count*100 as decimal(10,2)) visitor2order_convert_ratio,
  8. ua.payment_count,
  9. cast(ua.payment_count/ua.order_count*100 as decimal(10,2)) order2payment_convert_ratio
  10. from
  11. (
  12. select
  13. sum(if(order_count>0,1,0)) order_count,
  14. sum(if(payment_count>0,1,0)) payment_count
  15. from dws_user_action
  16. where dt='2019-02-10'
  17. )ua, ads_uv_count uv
  18. where uv.dt='2019-02-10'
  19. ;

2)查询导入数据

hive (gmall)> select * from ads_user_action_convert_day;

第6章 需求六:品牌复购率

需求:以月为单位统计,购买2次以上商品的用户

6.1 复购率计算分析

6.2 DWS层

6.2.1 用户购买商品明细表(宽表)

  1. hive (gmall)>
  2. drop table if exists dws_sale_detail_daycount;
  3. create external table dws_sale_detail_daycount
  4. ( user_id string comment '用户 id',
  5. sku_id string comment '商品 Id',
  6. user_gender string comment '用户性别',
  7. user_age string comment '用户年龄',
  8. user_level string comment '用户等级',
  9. order_price decimal(10,2) comment '订单价格',
  10. sku_name string comment '商品名称',
  11. sku_tm_id string comment '品牌id',
  12. sku_category3_id string comment '商品三级品类id',
  13. sku_category2_id string comment '商品二级品类id',
  14. sku_category1_id string comment '商品一级品类id',
  15. sku_category3_name string comment '商品三级品类名称',
  16. sku_category2_name string comment '商品二级品类名称',
  17. sku_category1_name string comment '商品一级品类名称',
  18. spu_id string comment '商品 spu',
  19. sku_num int comment '购买个数',
  20. order_count string comment '当日下单单数',
  21. order_amount string comment '当日下单金额'
  22. ) COMMENT '用户购买商品明细表'
  23. PARTITIONED BY ( `dt` string)
  24. stored as parquet
  25. location '/warehouse/gmall/dws/dws_user_sale_detail_daycount/'
  26. tblproperties ("parquet.compression"="snappy");

6.2.2 数据导入

  1. hive (gmall)>
  2. with
  3. tmp_detail as
  4. (
  5. select
  6. user_id,
  7. sku_id,
  8. sum(sku_num) sku_num ,
  9. count(*) order_count ,
  10. sum(od.order_price*sku_num) order_amount
  11. from ods_order_detail od
  12. where od.dt='2019-02-10' and user_id is not null
  13. group by user_id, sku_id
  14. )
  15. insert overwrite table dws_sale_detail_daycount partition(dt='2019-02-10')
  16. select
  17. tmp_detail.user_id,
  18. tmp_detail.sku_id,
  19. u.gender,
  20. months_between('2019-02-10', u.birthday)/12 age,
  21. u.user_level,
  22. price,
  23. sku_name,
  24. tm_id,
  25. category3_id ,
  26. category2_id ,
  27. category1_id ,
  28. category3_name ,
  29. category2_name ,
  30. category1_name ,
  31. spu_id,
  32. tmp_detail.sku_num,
  33. tmp_detail.order_count,
  34. tmp_detail.order_amount
  35. from tmp_detail
  36. left join dwd_user_info u on u.id=tmp_detail.user_id and u.dt='2019-02-10'
  37. left join dwd_sku_info s on tmp_detail.sku_id =s.id and s.dt='2019-02-10'
  38. ;

6.2.3 数据导入脚本

1)在/home/newbies/bin目录下创建脚本dws_sale.sh

[newbies@hadoop102 bin]$ vim dws_sale.sh

在脚本中填写如下内容

  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/module/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n $1 ] ;then
  7. log_date=$1
  8. else
  9. log_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. set hive.exec.dynamic.partition.mode=nonstrict;
  13. with
  14. tmp_detail as
  15. (
  16. select
  17. user_id,
  18. sku_id,
  19. sum(sku_num) sku_num ,
  20. count(*) order_count ,
  21. sum(od.order_price*sku_num) order_amount
  22. from "$APP".ods_order_detail od
  23. where od.dt='$log_date' and user_id is not null
  24. group by user_id, sku_id
  25. )
  26. insert overwrite table "$APP".dws_sale_detail_daycount partition(dt='$log_date')
  27. select
  28. tmp_detail.user_id,
  29. tmp_detail.sku_id,
  30. u.gender,
  31. months_between('$log_date', u.birthday)/12 age,
  32. u.user_level,
  33. price,
  34. sku_name,
  35. tm_id,
  36. category3_id ,
  37. category2_id ,
  38. category1_id ,
  39. category3_name ,
  40. category2_name ,
  41. category1_name ,
  42. spu_id,
  43. tmp_detail.sku_num,
  44. tmp_detail.order_count,
  45. tmp_detail.order_amount
  46. from tmp_detail
  47. left join "$APP".dwd_user_info u
  48. on u.id=tmp_detail.user_id and u.dt='$log_date'
  49. left join "$APP".dwd_sku_info s on tmp_detail.sku_id =s.id and s.dt='$log_date';
  50. "
  51. $hive -e "$sql"

2)增加脚本执行权限

[newbies@hadoop102 bin]$ chmod 777 dws_sale.sh

3)执行脚本导入数据

[newbies@hadoop102 bin]$ dws_sale.sh 2019-02-11

4)查看导入数据

  1. hive (gmall)>
  2. select * from dws_sale_detail_daycount where dt='2019-02-11' limit 2;

6.3 ADS层

6.3.1 建表语句

  1. hive (gmall)>
  2. drop table ads_sale_tm_category1_stat_mn;
  3. create table ads_sale_tm_category1_stat_mn
  4. (
  5. tm_id string comment '品牌id ' ,
  6. category1_id string comment '1级品类id ',
  7. category1_name string comment '1级品类名称 ',
  8. buycount bigint comment '购买人数',
  9. buy_twice_last bigint comment '两次以上购买人数',
  10. buy_twice_last_ratio decimal(10,2) comment '单次复购率',
  11. buy_3times_last bigint comment '三次以上购买人数',
  12. buy_3times_last_ratio decimal(10,2) comment '多次复购率' ,
  13. stat_mn string comment '统计月份',
  14. stat_date string comment '统计日期'
  15. ) COMMENT '复购率统计'
  16. row format delimited fields terminated by '\t'
  17. location '/warehouse/gmall/ads/ads_sale_tm_category1_stat_mn/'
  18. ;

6.3.2 数据导入

1)数据导入

  1. hive (gmall)>
  2. insert into table ads_sale_tm_category1_stat_mn
  3. select
  4. mn.sku_tm_id,
  5. mn.sku_category1_id,
  6. mn.sku_category1_name,
  7. sum(if(mn.order_count>=1,1,0)) buycount,
  8. sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
  9. sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
  10. sum(if(mn.order_count>3,1,0)) buy3timeLast ,
  11. sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio ,
  12. date_format('2019-02-10' ,'yyyy-MM') stat_mn,
  13. '2019-02-10' stat_date
  14. from
  15. (
  16. select od.sku_tm_id,
  17. od.sku_category1_id,
  18. od.sku_category1_name,
  19. user_id ,
  20. sum(order_count) order_count
  21. from dws_sale_detail_daycount od
  22. where
  23. date_format(dt,'yyyy-MM')<=date_format('2019-02-10' ,'yyyy-MM')
  24. group by
  25. od.sku_tm_id, od.sku_category1_id, user_id, od.sku_category1_name
  26. ) mn
  27. group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name
  28. ;

2)查询导入数据

hive (gmall)> select * from ads_sale_tm_category1_stat_mn;

6.3.3 数据导入脚本

1)在/home/newbies/bin目录下创建脚本ads_sale.sh

[newbies@hadoop102 bin]$ vim ads_sale.sh

在脚本中填写如下内容

  1. #!/bin/bash
  2. # 定义变量方便修改
  3. APP=gmall
  4. hive=/opt/module/hive/bin/hive
  5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  6. if [ -n $1 ] ;then
  7. log_date=$1
  8. else
  9. log_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="
  12. set hive.exec.dynamic.partition.mode=nonstrict;
  13. insert into table "$APP".ads_sale_tm_category1_stat_mn
  14. select
  15. mn.sku_tm_id,
  16. mn.sku_category1_id,
  17. mn.sku_category1_name,
  18. sum(if(mn.order_count>=1,1,0)) buycount,
  19. sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
  20. sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
  21. sum(if(mn.order_count>3,1,0)) buy3timeLast ,
  22. sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio ,
  23. date_format('$log_date' ,'yyyy-MM') stat_mn,
  24. '$log_date' stat_date
  25. from
  26. (
  27. select od.sku_tm_id,
  28. od.sku_category1_id,
  29. od.sku_category1_name,
  30. user_id ,
  31. sum(order_count) order_count
  32. from "$APP".dws_sale_detail_daycount od
  33. where date_format(dt,'yyyy-MM')<=date_format('$log_date' ,'yyyy-MM')
  34. group by od.sku_tm_id, od.sku_category1_id, user_id, od.sku_category1_name
  35. ) mn
  36. group by mn.sku_tm_id, mn.sku_category1_id, mn.sku_category1_name;
  37. "
  38. $hive -e "$sql"

2)增加脚本执行权限

[newbies@hadoop102 bin]$ chmod 777 ads_sale.sh

3)执行脚本导入数据

[newbies@hadoop102 bin]$ ads_sale.sh 2019-02-11

4)查看导入数据

  1. hive (gmall)>
  2. select * from ads_sale_tm_category1_stat_mn limit 2;

6.4 品牌复购率结果输出到MySQL

1)在MySQL中创建ads_sale_tm_category1_stat_mn

  1. create table ads_sale_tm_category1_stat_mn
  2. (
  3. tm_id varchar(200) comment '品牌id ' ,
  4. category1_id varchar(200) comment '1级品类id ',
  5. category1_name varchar(200) comment '1级品类名称 ',
  6. buycount varchar(200) comment '购买人数',
  7. buy_twice_last varchar(200) comment '两次以上购买人数',
  8. buy_twice_last_ratio varchar(200) comment '单次复购率',
  9. buy_3times_last varchar(200) comment '三次以上购买人数',
  10. buy_3times_last_ratio varchar(200) comment '多次复购率' ,
  11. stat_mn varchar(200) comment '统计月份',
  12. stat_date varchar(200) comment '统计日期'
  13. )

2)编写Sqoop导出脚本

在/home/newbies/bin目录下创建脚本sqoop_export.sh

[newbies@hadoop102 bin]$ vim sqoop_export.sh

       在脚本中填写如下内容

  1. #!/bin/bash
  2. db_name=gmall
  3. export_data() {
  4. /opt/module/sqoop/bin/sqoop export \
  5. --connect "jdbc:mysql://hadoop102:3306/${db_name}?useUnicode=true&characterEncoding=utf-8" \
  6. --username root \
  7. --password 000000 \
  8. --table $1 \
  9. --num-mappers 1 \
  10. --export-dir /warehouse/$db_name/ads/$1 \
  11. --input-fields-terminated-by "\t" \
  12. --update-key "tm_id,category1_id,stat_mn,stat_date" \
  13. --update-mode allowinsert \
  14. --input-null-string '\\N' \
  15. --input-null-non-string '\\N'
  16. }
  17. case $1 in
  18. "ads_sale_tm_category1_stat_mn")
  19. export_data "ads_sale_tm_category1_stat_mn"
  20. ;;
  21. "all")
  22. export_data "ads_sale_tm_category1_stat_mn"
  23. ;;
  24. esac

关于导出update还是insert的问题

--update-mode

参数 :

updateonly   只更新,无法插入新数据

        allowinsert   允许新增 

--update-key  

允许更新的情况下,指定哪些字段匹配视为同一条数据,进行更新而不增加。多个字段用逗号分隔。

3)执行Sqoop导出脚本

  1. [newbies@hadoop102 bin]$ chmod 777 sqoop_export.sh
  2. [newbies@hadoop102 bin]$ sqoop_export.sh all

4)在MySQL中查看结果

SELECT * FROM ads_sale_tm_category1_stat_mn;

6.5 练习:求每个等级的用户对应的复购率前十的商品排行

1)每个等级,每种商品,买一次的用户数,买两次的用户数=》得出复购率

2)利用开窗函数,取每个等级的前十

3)形成脚本

第7章 Azkaban调度器

7.1 Azkaban安装

================================ Begin =========================

二 Azkaban安装部署

2.1 安装前准备
1)    将Azkaban Web服务器、Azkaban执行服务器、Azkaban的sql执行脚本及MySQL安装包拷贝到hadoop102虚拟机/opt/software目录下
a)    azkaban-web-server-2.5.0.tar.gz
b)    azkaban-executor-server-2.5.0.tar.gz
c)    azkaban-sql-script-2.5.0.tar.gz
d)    mysql-libs.zip

2)    选择Mysql作为Azkaban数据库,因为Azkaban建立了一些Mysql连接增强功能,以方便Azkaban设置,并增强服务可靠性。(参见hive文档2.4)
2.2 安装Azkaban
1)    在/opt/module/目录下创建azkaban目录
 

[newbies@hadoop102 module]$ mkdir azkaban

2)    解压azkaban-web-server-2.5.0.tar.gz、azkaban-executor-server-2.5.0.tar.gz、azkaban-sql-script-2.5.0.tar.gz到/opt/module/azkaban目录下

  1. [newbies@hadoop102 software]$ tar -zxvf azkaban-web-server-2.5.0.tar.gz -C /opt/module/azkaban/
  2. [newbies@hadoop102 software]$ tar -zxvf azkaban-executor-server-2.5.0.tar.gz -C /opt/module/azkaban/
  3. [newbies@hadoop102 software]$ tar -zxvf azkaban-sql-script-2.5.0.tar.gz -C /opt/module/azkaban/

3)    对解压后的文件重新命名

  1. [newbies@hadoop102 azkaban]$ mv azkaban-web-2.5.0/ server
  2. [newbies@hadoop102 azkaban]$ mv azkaban-executor-2.5.0/ executor

4)    azkaban脚本导入
    进入mysql,创建azkaban数据库,并将解压的脚本导入到azkaban数据库。
 

  1. [newbies@hadoop102 azkaban]$ mysql -uroot -p000000
  2. mysql> create database azkaban;
  3. mysql> use azkaban;
  4. mysql> source /opt/module/azkaban/azkaban-2.5.0/create-all-sql-2.5.0.sql

注:source后跟.sql文件,用于批量处理.sql文件中的sql语句。

2.3 生成密钥库

Keytool是java数据证书的管理工具,使用户能够管理自己的公/私钥对及相关证书。

-keystore    指定密钥库的名称及位置(产生的各类信息将不在.keystore文件中)

-genkey      在用户主目录中创建一个默认文件".keystore"

-alias  对我们生成的.keystore 进行指认别名;如果没有默认是mykey

-keyalg  指定密钥的算法 RSA/DSA 默认是DSA

1)生成 keystore的密码及相应信息的密钥库

  1. [newbies@hadoop102 azkaban]$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA
  2. 输入密钥库口令:
  3. 再次输入新口令:
  4. 您的名字与姓氏是什么?
  5. [Unknown]:
  6. 您的组织单位名称是什么?
  7. [Unknown]:
  8. 您的组织名称是什么?
  9. [Unknown]:
  10. 您所在的城市或区域名称是什么?
  11. [Unknown]:
  12. 您所在的省//自治区名称是什么?
  13. [Unknown]:
  14. 该单位的双字母国家/地区代码是什么?
  15. [Unknown]:
  16. CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown是否正确?
  17. [否]: y
  18. 输入 <jetty> 的密钥口令
  19. (如果和密钥库口令相同, 按回车):
  20. 再次输入新口令:

注意:

密钥库的密码至少必须6个字符,可以是纯数字或者字母或者数字和字母的组合等等

密钥库的密码最好和<jetty> 的密钥相同,方便记忆

2)将keystore 拷贝到 azkaban web服务器根目录中

[newbies@hadoop102 azkaban]$ mv keystore /opt/module/azkaban/server/

2.4 时间同步配置

先配置好服务器节点上的时区

  1. 如果在/usr/share/zoneinfo/这个目录下不存在时区配置文件Asia/Shanghai,就要用 tzselect 生成。
  1. [newbies@hadoop102 azkaban]$ tzselect
  2. Please identify a location so that time zone rules can be set correctly.
  3. Please select a continent or ocean.
  4. 1) Africa
  5. 2) Americas
  6. 3) Antarctica
  7. 4) Arctic Ocean
  8. 5) Asia
  9. 6) Atlantic Ocean
  10. 7) Australia
  11. 8) Europe
  12. 9) Indian Ocean
  13. 10) Pacific Ocean
  14. 11) none - I want to specify the time zone using the Posix TZ format.
  15. #? 5
  16. Please select a country.
  17. 1) Afghanistan 18) Israel 35) Palestine
  18. 2) Armenia 19) Japan 36) Philippines
  19. 3) Azerbaijan 20) Jordan 37) Qatar
  20. 4) Bahrain 21) Kazakhstan 38) Russia
  21. 5) Bangladesh 22) Korea (North) 39) Saudi Arabia
  22. 6) Bhutan 23) Korea (South) 40) Singapore
  23. 7) Brunei 24) Kuwait 41) Sri Lanka
  24. 8) Cambodia 25) Kyrgyzstan 42) Syria
  25. 9) China 26) Laos 43) Taiwan
  26. 10) Cyprus 27) Lebanon 44) Tajikistan
  27. 11) East Timor 28) Macau 45) Thailand
  28. 12) Georgia 29) Malaysia 46) Turkmenistan
  29. 13) Hong Kong 30) Mongolia 47) United Arab Emirates
  30. 14) India 31) Myanmar (Burma) 48) Uzbekistan
  31. 15) Indonesia 32) Nepal 49) Vietnam
  32. 16) Iran 33) Oman 50) Yemen
  33. 17) Iraq 34) Pakistan
  34. #? 9
  35. Please select one of the following time zone regions.
  36. 1) Beijing Time
  37. 2) Xinjiang Time
  38. #? 1
  39. The following information has been given:
  40. China
  41. Beijing Time
  42. Therefore TZ='Asia/Shanghai' will be used.
  43. Local time is now: Thu Oct 18 16:24:23 CST 2018.
  44. Universal Time is now: Thu Oct 18 08:24:23 UTC 2018.
  45. Is the above information OK?
  46. 1) Yes
  47. 2) No
  48. #? 1
  49. You can make this change permanent for yourself by appending the line
  50. TZ='Asia/Shanghai'; export TZ
  51. to the file '.profile' in your home directory; then log out and log in again.
  52. Here is that TZ value again, this time on standard output so that you
  53. can use the /usr/bin/tzselect command in shell scripts:
  54. Asia/Shanghai

2)拷贝该时区文件,覆盖系统本地时区配置

[newbies@hadoop102 azkaban]$ cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime  

3)集群时间同步(同时发给三个窗口)

[newbies@hadoop102 azkaban]$ sudo date -s '2018-10-18 16:39:30'

2.5 配置文件

2.5.1 Web服务器配置

1)进入azkaban web服务器安装目录 conf目录,打开azkaban.properties文件

  1. [newbies@hadoop102 conf]$ pwd
  2. /opt/module/azkaban/server/conf
  3. [newbies@hadoop102 conf]$ vim azkaban.properties

2)按照如下配置修改azkaban.properties文件。

  1. #Azkaban Personalization Settings
  2. #服务器UI名称,用于服务器上方显示的名字
  3. azkaban.name=Test
  4. #描述
  5. azkaban.label=My Local Azkaban
  6. #UI颜色
  7. azkaban.color=#FF3601
  8. azkaban.default.servlet.path=/index
  9. #默认web server存放web文件的目录
  10. web.resource.dir=/opt/module/azkaban/server/web/
  11. #默认时区,已改为亚洲/上海 默认为美国
  12. default.timezone.id=Asia/Shanghai
  13. #Azkaban UserManager class
  14. user.manager.class=azkaban.user.XmlUserManager
  15. #用户权限管理默认类(绝对路径)
  16. user.manager.xml.file=/opt/module/azkaban/server/conf/azkaban-users.xml
  17. #Loader for projects
  18. #global配置文件所在位置(绝对路径)
  19. executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
  20. azkaban.project.dir=projects
  21. #数据库类型
  22. database.type=mysql
  23. #端口号
  24. mysql.port=3306
  25. #数据库连接IP
  26. mysql.host=hadoop102
  27. #数据库实例名
  28. mysql.database=azkaban
  29. #数据库用户名
  30. mysql.user=root
  31. #数据库密码
  32. mysql.password=000000
  33. #最大连接数
  34. mysql.numconnections=100
  35. # Velocity dev mode
  36. velocity.dev.mode=false
  37. # Azkaban Jetty server properties.
  38. # Jetty服务器属性.
  39. #最大线程数
  40. jetty.maxThreads=25
  41. #Jetty SSL端口
  42. jetty.ssl.port=8443
  43. #Jetty端口
  44. jetty.port=8081
  45. #SSL文件名(绝对路径)
  46. jetty.keystore=/opt/module/azkaban/server/keystore
  47. #SSL文件密码
  48. jetty.password=000000
  49. #Jetty主密码与keystore文件相同
  50. jetty.keypassword=000000
  51. #SSL文件名(绝对路径)
  52. jetty.truststore=/opt/module/azkaban/server/keystore
  53. #SSL文件密码
  54. jetty.trustpassword=000000
  55. # Azkaban Executor settings
  56. executor.port=12321
  57. # mail settings
  58. mail.sender=
  59. mail.host=
  60. job.failure.email=
  61. job.success.email=
  62. lockdown.create.projects=false
  63. cache.directory=cache

 3)web服务器用户配置

在azkaban web服务器安装目录 conf目录,按照如下配置修改azkaban-users.xml 文件,增加管理员用户。

  1. [newbies@hadoop102 conf]$ vim azkaban-users.xml
  2. <azkaban-users>
  3. <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />
  4. <user username="metrics" password="metrics" roles="metrics"/>
  5. <user username="admin" password="admin" roles="admin,metrics" />
  6. <role name="admin" permissions="ADMIN" />
  7. <role name="metrics" permissions="METRICS"/>
  8. </azkaban-users>

2.5.2 执行服务器配置

1)进入执行服务器安装目录conf,打开azkaban.properties

  1. [newbies@hadoop102 conf]$ pwd
  2. /opt/module/azkaban/executor/conf
  3. [newbies@hadoop102 conf]$ vim azkaban.properties

 2)按照如下配置修改azkaban.properties文件。

  1. #Azkaban
  2. #时区
  3. default.timezone.id=Asia/Shanghai
  4. # Azkaban JobTypes Plugins
  5. #jobtype 插件所在位置
  6. azkaban.jobtype.plugin.dir=plugins/jobtypes
  7. #Loader for projects
  8. executor.global.properties=/opt/module/azkaban/executor/conf/global.properties
  9. azkaban.project.dir=projects
  10. database.type=mysql
  11. mysql.port=3306
  12. mysql.host=hadoop102
  13. mysql.database=azkaban
  14. mysql.user=root
  15. mysql.password=000000
  16. mysql.numconnections=100
  17. # Azkaban Executor settings
  18. #最大线程数
  19. executor.maxThreads=50
  20. #端口号(如修改,请与web服务中一致)
  21. executor.port=12321
  22. #线程数
  23. executor.flow.threads=30

2.6 启动executor服务器

executor服务器目录下执行启动命令

  1. [newbies@hadoop102 executor]$ pwd
  2. /opt/module/azkaban/executor
  3. [newbies@hadoop102 executor]$ bin/azkaban-executor-start.sh

2.7 启动web服务器

在azkaban web服务器目录下执行启动命令

  1. [newbies@hadoop102 server]$ pwd
  2. /opt/module/azkaban/server
  3. [newbies@hadoop102 server]$ bin/azkaban-web-start.sh

注意:

先执行executor,再执行web,避免Web Server会因为找不到执行器启动失败。

jps查看进程

  1. [newbies@hadoop102 server]$ jps
  2. 3601 AzkabanExecutorServer
  3. 5880 Jps
  4. 3661 AzkabanWebServer

启动完成后,在浏览器(建议使用谷歌浏览器)中输入https://服务器IP地址:8443,即可访问azkaban服务了。

在登录中输入刚才在azkaban-users.xml文件中新添加的户用名及密码,点击 login。

三 Azkaban实战

Azkaba内置的任务类型支持command、java

3.1单一job案例

1)创建job描述文件

  1. [newbies@hadoop102 jobs]$ vim first.job
  2. #first.job
  3. type=command
  4. command=echo 'this is my first job'

2) 将job资源文件打包成zip文件

  1. [newbies@hadoop102 jobs]$ zip first.zip first.job
  2. adding: first.job (deflated 15%)
  3. [newbies@hadoop102 jobs]$ ll
  4. 总用量 8
  5. -rw-rw-r--. 1 newbies newbies 60 1018 17:42 first.job
  6. -rw-rw-r--. 1 newbies newbies 219 1018 17:43 first.zip

注意:

目前,Azkaban上传的工作流文件只支持xxx.zip文件。zip应包含xxx.job运行作业所需的文件和任何文件(文件名后缀必须以.job结尾,否则无法识别)。作业名称在项目中必须是唯一的。

3)通过azkaban的web管理平台创建project并上传job的zip包

首先创建project

上传zip包

4)启动执行该job

点击执行工作流

点击继续

5)Job执行成功

6)点击查看job日志

3.2多job工作流案例

1)创建有依赖关系的多个job描述

第一个job:start.job

  1. [newbies@hadoop102 jobs]$ vim start.job
  2. #start.job
  3. type=command
  4. command=touch /opt/module/kangkang.txt

第二个job:step1.job依赖start.job

  1. [newbies@hadoop102 jobs]$ vim step1.job
  2. #step1.job
  3. type=command
  4. dependencies=start
  5. command=echo "this is step1 job"

第三个job:step2.job依赖start.job

  1. [newbies@hadoop102 jobs]$ vim step2.job
  2. #step2.job
  3. type=command
  4. dependencies=start
  5. command=echo "this is step2 job"

第四个job:finish.job依赖step1.job和step2.job

  1. [newbies@hadoop102 jobs]$ vim finish.job
  2. #finish.job
  3. type=command
  4. dependencies=step1,step2
  5. command=echo "this is finish job"

2)将所有job资源文件打到一个zip包中

  1. [newbies@hadoop102 jobs]$ zip jobs.zip start.job step1.job step2.job finish.job
  2. updating: start.job (deflated 16%)
  3. adding: step1.job (deflated 12%)
  4. adding: step2.job (deflated 12%)
  5. adding: finish.job (deflated 14%)

3)在azkaban的web管理界面创建工程并上传zip包

4)启动工作流flow

6)查看结果

思考:

将student.txt文件上传到hdfs,根据所传文件创建外部表,再将表中查询到的结果写入到本地文件

3.3 java操作任务

使用Azkaban调度java程序

1)编写java程序

  1. import java.io.IOException;
  2. public class AzkabanTest {
  3. public void run() throws IOException {
  4. // 根据需求编写具体代码
  5. FileOutputStream fos = new FileOutputStream("/opt/module/azkaban/output.txt");
  6. fos.write("this is a java progress".getBytes());
  7. fos.close();
  8. }
  9. public static void main(String[] args) throws IOException {
  10. AzkabanTest azkabanTest = new AzkabanTest();
  11. azkabanTest.run();
  12. }
  13. }

2)将java程序打成jar包,创建lib目录,将jar放入lib内

  1. [newbies@hadoop102 azkaban]$ mkdir lib
  2. [newbies@hadoop102 azkaban]$ cd lib/
  3. [newbies@hadoop102 lib]$ ll
  4. 总用量 4
  5. -rw-rw-r--. 1 newbies newbies 3355 1018 20:55 azkaban-0.0.1-SNAPSHOT.jar

3)编写job文件

  1. [newbies@hadoop102 jobs]$ vim azkabanJava.job
  2. #azkabanJava.job
  3. type=javaprocess
  4. java.class=com.newbies.azkaban.AzkabanTest
  5. classpath=/opt/module/azkaban/lib/*

4)将job文件打成zip包

  1. [newbies@hadoop102 jobs]$ zip azkabanJava.zip azkabanJava.job
  2. adding: azkabanJava.job (deflated 19%)

5)通过azkaban的web管理平台创建project并上传job压缩包,启动执行该job

  1. [newbies@hadoop102 azkaban]$ pwd
  2. /opt/module/azkaban
  3. [newbies@hadoop102 azkaban]$ ll
  4. 总用量 24
  5. drwxrwxr-x. 2 newbies newbies 4096 1017 17:14 azkaban-2.5.0
  6. drwxrwxr-x. 10 newbies newbies 4096 1018 17:17 executor
  7. drwxrwxr-x. 2 newbies newbies 4096 1018 20:35 jobs
  8. drwxrwxr-x. 2 newbies newbies 4096 1018 20:54 lib
  9. -rw-rw-r--. 1 newbies newbies 23 1018 20:55 output
  10. drwxrwxr-x. 9 newbies newbies 4096 1018 17:17 server
  11. [newbies@hadoop102 azkaban]$ cat output
  12. this is a java progress

3.3 HDFS操作任务

1)创建job描述文件

  1. [newbies@hadoop102 jobs]$ vim fs.job
  2. #hdfs job
  3. type=command
  4. command=/opt/module/hadoop-2.7.2/bin/hadoop fs -mkdir /azkaban

2)将job资源文件打包成zip文件

  1. [newbies@hadoop102 jobs]$ zip fs.zip fs.job
  2. adding: fs.job (deflated 12%)

3)通过azkaban的web管理平台创建project并上传job压缩包

4)启动执行该job

5)查看结果

3.4 mapreduce任务

mapreduce任务依然可以使用azkaban进行调度

 1)创建job描述文件,及mr程序jar包

  1. [newbies@hadoop102 jobs]$ vim mapreduce.job
  2. #mapreduce job
  3. type=command
  4. command=/opt/module/hadoop-2.7.2/bin/hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /wordcount/input /wordcount/output

2)    将所有job资源文件打到一个zip包中

  1. [newbies@hadoop102 jobs]$ zip mapreduce.zip mapreduce.job
  2. adding: mapreduce.job (deflated 43%)

3)在azkaban的web管理界面创建工程并上传zip包
4)启动job
5)查看结果

3.5 Hive脚本任务

1)创建job描述文件和hive脚本

(1)Hive脚本:student.sql

  1. [newbies@hadoop102 jobs]$ vim student.sql
  2. use default;
  3. drop table student;
  4. create table student(id int, name string)
  5. row format delimited fields terminated by '\t';
  6. load data local inpath '/opt/module/datas/student.txt' into table student;
  7. insert overwrite local directory '/opt/module/datas/student'
  8. row format delimited fields terminated by '\t'
  9. select * from student;

(2)Job描述文件:hive.job

  1. [newbies@hadoop102 jobs]$ vim hive.job
  2. #hive job
  3. type=command
  4. command=/opt/module/hive/bin/hive -f /opt/module/azkaban/jobs/student.sql

2)    将所有job资源文件打到一个zip包中

  1. [newbies@hadoop102 jobs]$ zip hive.zip hive.job
  2. adding: hive.job (deflated 21%)

3)在azkaban的web管理界面创建工程并上传zip包
4)启动job
5)查看结果

  1. [newbies@hadoop102 student]$ cat /opt/module/datas/student/000000_0
  2. 1001 yangyang
  3. 1002 bobo
  4. 1003 banzhang
  5. 1004 pengpeng

7.2 复购率指标的产生的全调度流程

1)生成数据

CALL init_data('2019-02-12',300,200,300,FALSE);

2)编写Azkaban程序运行job.

       (1)import文件

  1. type=command
  2. do_date=${dt}
  3. command=/home/newbies/bin/sqoop_import.sh all ${do_date}

(2)ods文件

  1. type=command
  2. do_date=${dt}
  3. dependencies=import
  4. command=/home/newbies/bin/ods_db.sh ${do_date}

(3)dwd文件

  1. type=command
  2. do_date=${dt}
  3. dependencies=ods
  4. command=/home/newbies/bin/dwd_db.sh ${do_date}

       (4)dws文件

  1. type=command
  2. do_date=${dt}
  3. dependencies=dwd
  4. command=/home/newbies/bin/dws_sale.sh ${do_date}

       (5)ads文件

  1. type=command
  2. do_date=${dt}
  3. dependencies=dws
  4. command=/home/newbies/bin/ads_sale.sh ${do_date}

       (6)export文件

  1. type=command
  2. do_date=${dt}
  3. dependencies=ads
  4. command=/home/newbies/bin/sqoop_export.sh all ${do_date}

       (7)将以上6个文件压缩成gmall-job.zip文件

3)创建Azkaban工程,并上传gmall-job.zip文件。

 第8章 订单表拉链表

8.1 什么是拉链表

8.2 为什么要做拉链表

8.3 拉链表形成过程

如何使用拉链表

 

8.4 拉链表制作过程图

8.5 拉链表制作过程

8.5.1 步骤0:初始化拉链表,首次独立执行

初始化拉链表

 

1)生成10条原始订单数据

  1. CALL init_data('2019-02-13',10,5,10,TRUE);
  2. [newbies@hadoop102 bin]$ sqoop_import.sh all 2019-02-13
  3. [newbies@hadoop102 bin]$ ods_db.sh 2019-02-13
  4. [newbies@hadoop102 bin]$ dwd_db.sh 2019-02-13

2)建立拉链表

  1. hive (gmall)>
  2. drop table if exists dwd_order_info_his;
  3. create table dwd_order_info_his(
  4. `id` string COMMENT '订单编号',
  5. `total_amount` decimal(10,2) COMMENT '订单金额',
  6. `order_status` string COMMENT '订单状态',
  7. `user_id` string COMMENT '用户id' ,
  8. `payment_way` string COMMENT '支付方式',
  9. `out_trade_no` string COMMENT '支付流水号',
  10. `create_time` string COMMENT '创建时间',
  11. `operate_time` string COMMENT '操作时间' ,
  12. `start_date` string COMMENT '有效开始日期',
  13. `end_date` string COMMENT '有效结束日期'
  14. ) COMMENT '订单拉链表'
  15. stored as parquet
  16. location '/warehouse/gmall/dwd/dwd_order_info_his/'
  17. tblproperties ("parquet.compression"="snappy");

3)初始化拉链表

  1. hive (gmall)>
  2. insert overwrite table dwd_order_info_his
  3. select
  4. id,
  5. total_amount,
  6. order_status,
  7. user_id,
  8. payment_way,
  9. out_trade_no,
  10. create_time,
  11. operate_time,
  12. '2019-02-13',
  13. '9999-99-99'
  14. from ods_order_info oi
  15. where oi.dt='2019-02-13';

4)查询拉链表中数据

hive (gmall)> select * from dwd_order_info_his limit 2;

8.5.2 步骤1:先制作当日变动(包括新增,修改)每日执行

1)如何获得每日变动表

1)最好表内有创建时间和变动时间(Lucky!

(2)如果没有,可以利用第三方工具监控比如canal,监控MySQL的实时变化进行记录(麻烦)。

(3)逐行对比前后两天的数据, 检查md5(concat(全部有可能变化的字段))是否相同(low)

(4)要求业务数据库提供变动流水(人品,颜值)

2)因为dwd_order_info本身导入过来就是新增变动明细的表,所以不用处理

(1)2019-02-14日新增2条订单数据

CALL init_data('2019-02-14',2,5,10,TRUE);

(2)通过Sqoop把2019-02-14日所有数据导入

sqoop_import.sh all 2019-02-14

(3)ODS层数据导入

ods_db.sh 2019-02-14

(4)DWD层数据导入

dwd_db.sh 2019-02-14

8.5.3 步骤2:先合并变动信息,再追加新增信息 插入到临时表中

1)建立临时表

  1. hive (gmall)>
  2. drop table if exists dwd_order_info_his_tmp;
  3. create external table dwd_order_info_his_tmp(
  4. `id` string COMMENT '订单编号',
  5. `total_amount` decimal(10,2) COMMENT '订单金额',
  6. `order_status` string COMMENT '订单状态',
  7. `user_id` string COMMENT '用户id' ,
  8. `payment_way` string COMMENT '支付方式',
  9. `out_trade_no` string COMMENT '支付流水号',
  10. `create_time` string COMMENT '创建时间',
  11. `operate_time` string COMMENT '操作时间',
  12. `start_date` string COMMENT '有效开始日期',
  13. `end_date` string COMMENT '有效结束日期'
  14. ) COMMENT '订单拉链临时表'
  15. stored as parquet
  16. location '/warehouse/gmall/dwd/dwd_order_info_his_tmp/'
  17. tblproperties ("parquet.compression"="snappy");

2)导入脚本

拉链表
  1. hive (gmall)>
  2. insert overwrite table dwd_order_info_his_tmp
  3. select * from
  4. (
  5. select
  6. id,
  7. total_amount ,
  8. order_status ,
  9. user_id ,
  10. payment_way ,
  11. out_trade_no,
  12. create_time ,
  13. operate_time ,
  14. '2019-02-14' start_date,
  15. '9999-99-99' end_date
  16. from dwd_order_info where dt='2019-02-14'
  17. union all
  18. select oh.id,
  19. oh.total_amount ,
  20. oh.order_status ,
  21. oh.user_id ,
  22. oh.payment_way ,
  23. oh.out_trade_no,
  24. oh.create_time ,
  25. oh.operate_time ,
  26. oh.start_date,
  27. if(oi.id is null ,oh.end_date, date_add(oi.dt,-1)) end_date
  28. from dwd_order_info_his oh left join
  29. (
  30. select
  31. *
  32. from dwd_order_info
  33. where dt='2019-02-14'
  34. ) oi
  35. on oh.id=oi.id and oh.end_date='9999-99-99'
  36. )his
  37. order by his.id, start_date;

8.5.4 步骤3:把临时表覆盖给拉链表

1)导入数据

  1. hive (gmall)>
  2. insert overwrite table dwd_order_info_his
  3. select * from dwd_order_info_his_tmp;

2)查询导入数据

hive (gmall)> select * from dwd_order_info_his ;

8.5.5 整理为每日脚本

 

第9章 OLAP分析工具之Presto

9.1 Presto Server安装

1)下载地址

https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.196/presto-server-0.196.tar.gz

2)将presto-server-0.196.tar.gz导入hadoop102的/opt/software目录下,并解压到/opt/module目录

[newbies@hadoop102 software]$ tar -zxvf presto-server-0.196.tar.gz -C /opt/module/

3)修改名称为presto

[newbies@hadoop102 module]$ mv presto-server-0.196/ presto

4)进入到/opt/module/presto目录,并创建存储数据文件夹

[newbies@hadoop102 presto]$ mkdir data

5)进入到/opt/module/presto目录,并创建存储配置文件文件夹

[newbies@hadoop102 presto]$ mkdir etc

6)配置在/opt/module/presto/etc目录下添加jvm.config配置文件

[newbies@hadoop102 etc]$ vim jvm.config

添加如下内容

  1. -server
  2. -Xmx16G
  3. -XX:+UseG1GC
  4. -XX:G1HeapRegionSize=32M
  5. -XX:+UseGCOverheadLimit
  6. -XX:+ExplicitGCInvokesConcurrent
  7. -XX:+HeapDumpOnOutOfMemoryError
  8. -XX:+ExitOnOutOfMemoryError

7)Presto可以支持多个数据源,在Presto里面叫catalog,这里我们配置支持Hive的数据源,配置一个Hive的catalog

  1. [newbies@hadoop102 etc]$ mkdir catalog
  2. [newbies@hadoop102 catalog]$ vim hive.properties

添加如下内容

  1. connector.name=hive-hadoop2
  2. hive.metastore.uri=thrift://hadoop102:9083

8)将hadoop102上的presto分发到hadoop103、hadoop104

[newbies@hadoop102 module]$ xsync presto

9)解压之后,分别进入hadoop102、hadoop103、hadoop104三台主机的/opt/module/presto/etc的路径。配置node属性,node id每个节点都不一样。

  1. [newbies@hadoop102 etc]$vim node.properties
  2. node.environment=production
  3. node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
  4. node.data-dir=/opt/module/presto/data
  5. [newbies@hadoop103 etc]$vim node.properties
  6. node.environment=production
  7. node.id=ffffffff-ffff-ffff-ffff-fffffffffffe
  8. node.data-dir=/opt/module/presto/data
  9. [newbies@hadoop104 etc]$vim node.properties
  10. node.environment=production
  11. node.id=ffffffff-ffff-ffff-ffff-fffffffffffd
  12. node.data-dir=/opt/module/presto/data

10)Presto是由一个coordinator节点和多个worker节点组成。在hadoop102上配置成coordinator,在hadoop103、hadoop104上配置为worker。

(1)hadoop102上配置coordinator节点

[newbies@hadoop102 etc]$ vim config.properties

添加内容如下

  1. coordinator=true
  2. node-scheduler.include-coordinator=false
  3. http-server.http.port=8881
  4. query.max-memory=50GB
  5. discovery-server.enabled=true
  6. discovery.uri=http://hadoop102:8881

(2)hadoop103、hadoop104上配置worker节点

[newbies@hadoop103 etc]$ vim config.properties

添加内容如下

  1. coordinator=false
  2. http-server.http.port=8881
  3. query.max-memory=50GB
  4. discovery.uri=http://hadoop102:8881
  5. [newbies@hadoop104 etc]$ vim config.properties

添加内容如下

  1. coordinator=false
  2. http-server.http.port=8881
  3. query.max-memory=50GB
  4. discovery.uri=http://hadoop102:8881

11)在/opt/module/hive目录下,启动Hive Metastore,用newbies角色

nohup bin/hive --service metastore >/dev/null 2>&1 &

12)分别在hadoop102、hadoop103、hadoop104上启动presto server

(1)前台启动presto,控制台显示日志

  1. [newbies@hadoop102 presto]$ bin/launcher run
  2. [newbies@hadoop103 presto]$ bin/launcher run
  3. [newbies@hadoop104 presto]$ bin/launcher run

(2)后台启动presto

  1. [newbies@hadoop102 presto]$ bin/launcher start
  2. [newbies@hadoop103 presto]$ bin/launcher start
  3. [newbies@hadoop104 presto]$ bin/launcher start

13)日志查看路径/opt/module/presto/data/var/log

9.2 Presto命令行Client安装

1)下载Presto的客户端

https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.196/presto-cli-0.196-executable.jar

2)将presto-cli-0.196-executable.jar上传到hadoop102的/opt/module/presto文件夹下

3)修改文件名称

[newbies@hadoop102 presto]$ mv presto-cli-0.196-executable.jar  prestocli

4)增加执行权限

[newbies@hadoop102 presto]$ chmod +x prestocli

5)启动prestocli

[newbies@hadoop102 presto]$ ./prestocli --server hadoop102:8881 --catalog hive --schema default

6)Presto命令行操作

Presto的命令行操作,相当于hive命令行操作,不过没有use命令。每个表必须要加上schema。

例如:

select * from schema.table limit 100

9.3 Presto可视化Client安装

1)将yanagishima-18.0.zip上传到hadoop102的/opt/module目录

2)解压缩yanagishima

  1. [newbies@hadoop102 module]$ unzip yanagishima-18.0.zip
  2. cd yanagishima-18.0

3)进入到/opt/module/yanagishima-18.0/conf文件夹,编写yanagishima.properties配置

[newbies@hadoop102 conf]$ vim yanagishima.properties

       添加如下内容

  1. jetty.port=7080
  2. presto.datasources=atiguigu-presto
  3. presto.coordinator.server.atiguigu-presto=http://hadoop102:8881
  4. catalog.atiguigu-presto=hive
  5. schema.atiguigu-presto=default
  6. sql.query.engines=presto

4)在/opt/module/yanagishima-18.0路径下启动yanagishima

  1. [newbies@hadoop102 yanagishima-18.0]$
  2. nohup bin/yanagishima-start.sh >y.log 2>&1 &

5)启动web页面

http://hadoop102:7080

看到界面,进行查询了。

9.4 Presto可视化操作

查看表结构

这里有个tree view,可以查看所有表的结构,包括schema、表、字段等。

比如执行 select * from hive.dw_weather.tmp_news_click limit 10,这个句子里hive这个词可以删掉,是上面配置的catalog

 

每个表后面都有个复制键,点一下会复制完整的表名,然后再上面框里面输入sql语句,ctrl+enter键执行显示结果

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/380347
推荐阅读
相关标签
  

闽ICP备14008679号