赞
踩
业务域的数据来自业务系统的数据库
通过sqoop(或datax)抽取到数仓的ods层
在ods层对有需要的表进行增量合并,字段选择,反范式话,形成dwd明细层表
在明细层基础上,进行各类主题的数据统计、分析
课程中,主要分析的主题有:
什么是业务系统:
公司向用户提供业务功能的系统,比如
京东:京东商城!
头条:头条网站
这一类系统,通常都是web系统,简单来说,分为:
界面(浏览器网页)+后端(web服务)+存储(mysql数据库)
sqoop 是 apache 旗下一款“Hadoop中的各种存储系统(HDFS、HIVE、HBASE) 和关系数据库(mysql、oracle、sqlserver等)服务器之间传送数据”的工具。
核心的功能有两个:
导入(迁入)
导出(迁出)
导入数据:MySQL,Oracle 导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统
导出数据:从 Hadoop 的文件系统中导出数据到关系数据库 mysql 等 Sqoop 的本质还是一个命令行工具,和 HDFS,Hive 相比,并没有什么高深的理论。
底层工作机制
将导入或导出命令翻译成 MapReduce 程序来实现
在翻译出的 MapReduce 中主要是对InputFormat 和 OutputFormat 进行定制
sqoop import \ --connect jdbc:mysql://h3:3306/ry \ --username root \ --password haitao.211123 \ --hive-import \ --hive-table yiee_dw.doit_jw_stu_base4 \ --as-textfile \ --fields-terminated-by ',' \ --compress \ --compression-codec gzip \ --split-by stu_id \ --null-string '\\N' \ --null-non-string '\\N' \ --hive-overwrite \ --query 'select stu_id,stu_name,stu_age,stu_term from doit_jw_stu_base where stu_createtime>"2019-09-24 23:59:59" and stu_sex="1" and $CONDITIONS' \ --target-dir '/user/root/tmp' \ -m 2
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
官网地址:https://github.com/alibaba/DataX
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题
核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:
https://github.com/alibaba/DataX/blob/master/introduction.md
1,官网下载datax包,之后解压;
2,编写json配置文件,配置文件结构如下:
{ "job": { "content": [ "reader": { }, "writer": { } ], "setting": { "speed": { "channel": "1" } } } }
具体的reader,writer参数官网有说明。
3,执行 python datax.py xx.json
4,调优,主要是调整channel,byte,record参数,不过具体性能还是取决于源端数据库的表是否适合切分,是否有合适的切分字段,切分字段最好为数字。
从mysql抽取数据到hdfs
{ "setting": {}, "job": { "setting": { "speed": { "channel": 2 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "ABC123abc.123", "column": [ "id", "name", "gender", "addr" ], "splitPk": "id", "connection": [ { "table": [ "demo1" ], "jdbcUrl": [ "jdbc:mysql://doitedu01:3306/dataxtest?useUnicode=true&characterEncoding=utf8" ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://doitedu01:8020", "fileType": "orc", "path": "/user/hive/warehouse/test.db/stu/", "fileName": "stu", "column": [ { "name": "id", "type": "INT" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "INT" }, { "name": "gender", "type": "STRING" }, { "name": "addr", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress":"NONE" } } } ] } }
本层直接对接DATAX/SQOOP从业务库抽取过来的各类数据表
实体表
实体表小表(品类信息表,活动信息表,优惠券信息表等),每天抽取过来一份全量(或者一周、一月)
实体表大表(商品信息表),每天抽取过来一份增量数据
事实表
订单相关表
优惠券领取使用记录表
秒杀订阅记录表
每天都会抽取一份增量数据
总原则
小表——全量抽取
大表——增量抽取
商品信息(主要信息、详情信息、类目信息、属性信息、商品相册信息)
用户信息(主要信息、附加信息、会员等级信息)
订单信息及购物车相关(主要信息、详情信息、物流信息、评论信息)
内容管理(话题,文章,评论)
营销管理(优惠券、代金券、活动规则、主题推荐)
策略
为了便于后续的统计分析方便,用增量抽取策略抽取过来的增量数据,都要每天进行滚动合并
合并的技术手段:
1)方便起见,可以使用 sqoop merge
命令进行
2)如果有特别情况,可以自己写spark程序
来实现
3)直接用hive的sql来实现(分组top1模式 或者 JOIN模式)
增量合并实战
bin/sqoop codegen \ --connect jdbc:mysql://impala01:3306/sqooptest \ --username root \ --password ABC123abc.123 \ --table stu \ --bindir /opt/apps/code/stu \ --class-name Stu \ --fields-terminated-by "," bin/sqoop merge \ --new-data /sqoopdata/stu1 \ --onto /sqoopdata/stu0 \ --target-dir /sqoopdata/stu_all \ --jar-file /opt/apps/code/stu/Stu.jar \ --class-name Stu \ --merge-key id
5.3.1概念介绍
以订单表为例,表中90%的数据基本不会随着时间而变化,只有最近一段时间内的数据会有变化
对于这种类型的表,我们往往需要保存好每一条数据的每一天的状态
方案1:
可以每天保存一份全量表,并长期存储,这样可以实现每天状态的保存,也方便查询任何一天中数据的状态;
弊端:由于表中90%的数据都不会变化,因此,各天的全量表,其实大量数据都是相同的,存储冗余度太高
方案2:
使用拉链表模型,来实现每条数据每天状态的变化情况
优点:既能保留每天状态,又比较节省存储空间
弊端:使用、查询的时候,略增加了一点复杂性
拉链表查询举例:比如查询2020-07-05日的所有订单数据
select * from zipper where and start_dt<='2020-07-05' and end_dt>='2020-07-05'
5.3.2拉链表开发
整体流程
核心逻辑
T-1日拉链表 LEFT JOIN T日增量 (对能关联上的数据进行拉链表的end_dt做封闭)
UNION ALL
T日增量 (生成start_dt=T , end_dt = 9999-12-31)
本层主要表类型:
核心事实表:
oms_order_info
oms_order_item
关联维度表:
pms_product
pms_product_category
ums_member
本层主要处理:
按照维度建模的思想,按各主题,将核心事实表关联需要的维度表,得到宽表
假如有如下报表统计需求:
核心度量:
主要维度:
我们可以做一张dws层的宽表,来支撑这个统计需求
dws.oms_order_and_return
核心度量:
主要维度:
设计一张DWS层的服务表:订单表 + 退货申请记录表
核心度量:
主要维度:
使用到的表: sms_coupon 实体描述表 (优惠券信息表)
sms_coupon_history 操作事务表(优惠券领取使用记录表)
核心度量:
主要维度:
使用到的表: sms_flash_promotion_log 操作事务表 (秒杀订阅通知记录)
oms_order_item 操作事务表(订单商品详情记录)
ums_member 实体表(用户信息表)
sms_flash_promotion_session 实体表(秒杀场次信息)
需求说明
给用户打上一些消费相关(下单、退货、金额、客单价)的统计数据标签
drop table if exists ads_user_order_tag; create table ads_user_order_tag( user_id bigint ,--用户 first_order_time string ,--首单日期 last_order_time string ,--末单日期 first_order_ago bigint ,--首单距今时间 last_order_ago bigint ,--末单距今时间 month1_order_cnt bigint ,--近30天下单次数 month1_order_amt double ,--近30天购买金额(总金额) month2_order_cnt bigint ,--近60天购买次数 month2_order_amt double ,--近60天购买金额 month3_order_cnt bigint ,--近90天购买次数 month3_order_amt double ,--近90天购买金额 max_order_amt double ,--最大订单金额 min_order_amt double ,--最小订单金额 total_order_cnt bigint ,--累计消费次数(不含退拒) total_order_amt double ,--累计消费金额(不含退拒) total_coupon_amt double ,--累计使用代金券金额 user_avg_order_amt double ,--平均订单金额(含退拒) month3_user_avg_amt double ,--近90天平均订单金额(含退拒) common_address string ,--常用收货地址 common_paytype string ,--常用支付方式 month1_cart_cnt_30 bigint ,--最近30天加购次数 month1_cart_goods_cnt_30 bigint ,--最近30天加购商品件数 month1_cart_cancel_cnt bigint ,--最近30天取消商品件数 dw_date string ,计算日期 ) partitioned by (dt string) ;
该表的计算,需要用到3张源表: 订单表,退拒货申请记录表,购物车表
《详见项目代码》
计算方案
代码实现
需求说明
drop table if exists ads_user_profile_reject_tag; create table ads_user_profile_reject_tag( user_id bigint ,-- 用户 p_sales_cnt bigint ,-- 不含退拒商品购买数量 p_sales_amt double ,-- 不含退拒商品购买的商品总价 p_sales_cut_amt double ,-- 不含退拒实付金额(扣促销减免) h_sales_cnt bigint ,-- 含退拒购买数量 h_sales_amt double ,-- 含退拒购买金额 h_sales_cut_amt double ,-- 含退拒购买金额(扣促销减免) return_cnt bigint ,-- 退货商品数量 return_amt double ,-- 退货商品金额 dw_date bigint ) partitioned by (dt string) stored as parquet ;
计算方案
从 oms_order_item 关联 oms_order_return_apply
得到 如下数据:
订单,商品,商品价格,购买数量,实付金额,是否退货,退货件数,退货的金额
代码实现
需求说明
drop table if exists ads_user_profile_favor_tag;
create table ads_user_profile_favor_tag(
user_id bigint ,-- 用户
common_first_cat bigint ,-- 最常购买一级类目名称
common_second_cat bigint ,-- 最常购买二级类目名称
common_third_cat bigint ,--最常购买三级类目名称
most_brand_id bigint ,--最常购买的品牌
second_brand_id bigint ,--第二多购买的品牌
third_brand_id bigint ,--第三多购买的品牌
third_brand_id bigint ,--最喜欢的颜色
dw_date bigint
) partitioned by (dt string)
stored as parquet
;
最核心的要点是,收集到用户的每一次购买的行为记录
计算方案
代码实现
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。