赞
踩
整体设计
日志划分
业务日志批量更新,用户行为日志实时更新
虚拟机
虚拟机登录 root root sudo systemctl start/status mysqld mysql -uroot -p '000000' gmail bin/sqoop list-databases --connect jdbc:mysql://warehouse:3306/ --username root --password 000000 hadoop通过yarn控制datanode的资源配置,通过zookeeper控制namenode的高可用(数据同步,主从选举),zk和yarn无关 kafka通过zk记录注册的broker信息,选举leader,消费组reblance /opt/module/hadoop-3.1.3 启动hadoop sbin/start-dfs.sh 启动yarn sbin/start-yarn.sh 启动zk zookeeper-3.5.7/bin/zkServer.sh start /status看状态 群启/停hadoop/yarn /home/atguigu/bin sh hdp.sh start/stop 启停zookeeper sh zk.sh/stop 启停kafka sh kf.sh start/stop 启停flume采集,channel存kafka上 f1.sh 启停flume消费上个flume的kafka channel,通过file channel f2.sh (flume默认按照机器时间来判定数据落盘位置,会造成数据0点飘逸 所以flume需要代码自定义拦截器设置按照日志产生的时间戳来判定数据落盘位置, 通过读取数据body解析日志内提前设定好的时间戳字段,然后赋值给flume header时间戳) cluster.sh 包含上面所有 熟悉业务表含义,表结构 结合业务过程分析数据变化
维度设计过程
选择业务过程:确认有哪些事实表
声明粒度:每张事实表的每行数据是什么,粒度尽可能小
确认维度:确认如相关事实表的用户,地区等维度。事实表确认维度外键
确认事实:确认事实表度量值字段
事实表三个分类
事务型事实表:适合不发生变化的事实表,通常增量同步
周期性快照事实表:适合不保留所有数据,只保留固定时间间隔数据。通常全量同步
累计型快照事实表:用于跟踪业务事实的变化
维度表设计
新增和变化表和增量表都可以用拉链表来记录(9999-99-99和正常日期分区,新旧full join实现)
事实表设计
有时候如评价字段理论上是维度内容,不属于维度外键和度量值
但评价维度字段很少,为了提高性能可以维度退化,将评价字段直接写入评价事实表
维度和事实关联
1 设计业务过程的时候,需要考虑订单/订单详情的类似情况。比如订单详情数据量更大,后续做分析很多时候并不需要详情的信息,所以添加一个订单事实表可以提高分析效率
2 DWT对DWS层对汇聚的时候,很多情况DWS和DWD是完全相同的计算,仅仅是如时间长短的汇聚区别,为了避免重复计算。可以一次计算保存在一个宽表,来记录不同时间粒度的汇聚。后续直接读取宽表即可
一 sqoop 装载数据
二 全量同步表入hive
如业务维度表,全量表分区规划
三 新增和变化同步表入hive
新增和变化的用户表可以用拉链表记录,并动态分区写入
如用户维度表
1 动态分区和静态分区区别
表象:静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。详细来说,静态分区的列实在编译时期,通过用户传递来决定的;动态分区只有在SQL执行时才能决定
深层次:动态分区与静态分区还有一个细微的差别是,静态分区一 定会创建分区,不管SELECT语句的结果有没有数据。而动态分区,只有在SELECT结果的记录数>0的时候,才会创建分区。因此在不同的业务场景下,可能会选择不同的方案。
另外使用动态分区时需要注意的比较重要的一点是,动态分区会为每一个分区分配reduce数。比如说你在脚本上面写了:set mapred.reduce.tasks=100;
并且有两个分区:pt, if_online。如果结果集中pt=20121023,if_online=0/1,那么它就会为pt=20121023/if_online=0,pt=20121023/if_online=1各分配100个reduce。也就是说,namenode会同时处理200个文件的写操作。这在分区值很多的情况下,会成为一个灾难,容易直接把namenode给搞挂掉,是非常危险的。因此使用动态分区时,一定要清楚地知道产生的动态分区值,并且合理地设置reduce数量
2 Why 拉链表
3 用户拉链表分区规划走向
4 SQL实现思路
with tmp as ( select old.id old_id, old.login_name old_login_name, old.nick_name old_nick_name, old.name old_name, old.phone_num old_phone_num, old.email old_email, old.user_level old_user_level, old.birthday old_birthday, old.gender old_gender, old.create_time old_create_time, old.operate_time old_operate_time, old.start_date old_start_date, old.end_date old_end_date, new.id new_id, new.login_name new_login_name, new.nick_name new_nick_name, new.name new_name, new.phone_num new_phone_num, new.email new_email, new.user_level new_user_level, new.birthday new_birthday, new.gender new_gender, new.create_time new_create_time, new.operate_time new_operate_time, new.start_date new_start_date, new.end_date new_end_date from ( select id, login_name, nick_name, name, phone_num, email, user_level, birthday, gender, create_time, operate_time, start_date, end_date from dim_user_info where dt='9999-99-99' ) old full outer join ( select id, login_name, nick_name, md5(name) name, md5(phone_num) phone_num, md5(email) email, user_level, birthday, gender, create_time, operate_time, '2020-06-15' start_date, '9999-99-99' end_date from ods_user_info where dt='2020-06-15' ) new on old.id=new.id ) insert overwrite table dim_user_info partition(dt) select nvl(new_id,old_id), nvl(new_login_name,old_login_name), nvl(new_nick_name,old_nick_name), nvl(new_name,old_name), nvl(new_phone_num,old_phone_num), nvl(new_email,old_email), nvl(new_user_level,old_user_level), nvl(new_birthday,old_birthday), nvl(new_gender,old_gender), nvl(new_create_time,old_create_time), nvl(new_operate_time,old_operate_time), nvl(new_start_date,old_start_date), nvl(new_end_date,old_end_date), nvl(new_end_date,old_end_date) dt from tmp union all select old_id, old_login_name, old_nick_name, old_name, old_phone_num, old_email, old_user_level, old_birthday, old_gender, old_create_time, old_operate_time, old_start_date, cast(date_add('2020-06-15',-1) as string), cast(date_add('2020-06-15',-1) as string) dt from tmp where new_id is not null and old_id is not null;
注:事实表实现方式和维度表一样
页面埋点日志和启动日志
get_json_object函数解析json日志数据,写入hive mysql表中(启动日志,页面日志,动作日志,曝光日志,错误日志表)
分区走向
动作日志表Hive UDTF设计
java代码自定义UDTF实现一列转一列多行
jar包导入hdfs user/hive/jars路径,创建永久函数和jar包class关联,hive内调用函数即可
public class ExplodeJSONArray extends GenericUDTF { @Override public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { // 1 参数合法性检查 if (argOIs.length != 1) { throw new UDFArgumentException("explode_json_array 只需要一个参数"); } // 2 第一个参数必须为string //判断参数是否为基础数据类型 if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("explode_json_array 只接受基础类型参数"); } //将参数对象检查器强转为基础类型对象检查器 PrimitiveObjectInspector argumentOI = (PrimitiveObjectInspector) argOIs[0]; //判断参数是否为String类型 if (argumentOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("explode_json_array 只接受string类型的参数"); } // 3 定义返回值名称和类型 List<String> fieldNames = new ArrayList<String>();//装列名 List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();//装列对应的类型 fieldNames.add("items"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } public void process(Object[] objects) throws HiveException { // 1 获取传入的数据,第一列的数据 String jsonArray = objects[0].toString(); // 2 将string转换为json数组 JSONArray actions = new JSONArray(jsonArray); // 3 循环一次,取出数组中的一个json,并写出 for (int i = 0; i < actions.length(); i++) { String[] result = new String[1]; result[0] = actions.getString(i); forward(result); } } public void close() throws HiveException { } }
1 动态分区默认最后一个字段为分区字段 自定义udtf组合可一行转多列 contat拼接 2 NVL NVL(表达式1,表达式2) 如果表达式1为空值,NVL返回值为表达式2的值,否则返回表达式1的值。 该函数的目的是把一个空值(null)转换成一个实际的值。其表达式的值可以是数字型、字符型和日期型。但是表达式1 和表达式2的数据类型必须为同一个类型。 hive (gmall)> select nvl(1,0); 输出1 3 日期函数 date_format函数(根据格式整理日期) hive (gmall)> select date_format('2020-06-14','yyyy-MM'); date_add函数(加减日期) hive (gmall)> select date_add('2020-06-14',-1); next_day函数 (取当前天的下一个周一) hive (gmall)> select next_day('2020-06-14','MO'); 4 复杂数据函数 map结构数据定义 map<string,string> array结构数据定义 array<string> struct结构数据定义 struct<id:int,name:string,age:int> struct和array嵌套定义 array<struct<id:int,name:string,age:int>> 例子 字段转map: str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') 转struct后转array collect_set(named_struct('page_id',page_id,'page_count',page_count,'during_time',during_time)) 判断arry是否包含 if(array_contains(collect_set(is_new),'0'),'0','1') 5 if sum(if(dt=‘2020-06-14‘,count,0)) 如果dt符合要求则sum count字段值,否则sum 0 6 获取下一行数据 (lead获取下n行/lag获取上n行) lead(page_id,1,null)over (partition by session_id order by ts)获取page_id下一行,如果没有取null 7 case when case when 条件1 then 结果1 when 条件2 then 结果2 end 7 一条sql同时计算最近1天,7天,30天的数据 将表数据通过explode三倍展开,对三个每份数据通过recent_days分别做计算,如下图
8 排序序号
维度为基础的宽表汇总
如地区主题,每行记录一天的一个地区的活跃用户的汇总记录总宽表
通过宽表,对应一个维度id,一天粒度的度量值汇总
另外可以不用full join方式,对子查询每个字段补0+union all子查询+group by的形式代替实现
和DWS一样,维度为基础的宽表汇总
由一天转多个向前n天的度量值汇总,分区规则依然为1天一个文件
通过DWS表,昨天+今天数据减去7天/30天/n天等来实现DWT层的表,如下图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。