当前位置:   article > 正文

数仓(八)从0到1简单搭建加载数仓DWD层(用户行为日志数据解析)

事件日志数据和启动日志数据属于用户行为日志的( )。

数仓(一)简介数仓,OLTP和OLAP

数仓(二)关系建模和维度建模

数仓(三)简析阿里、美团、网易、恒丰银行、马蜂窝5家数仓分层架构

数仓(四)数据仓库分层

数仓(五)元数据管理系统解析

数仓(六)从0到1简单搭建数仓ODS层(埋点日志 + 业务数据)

数仓(七)从0到1简单搭建加载数仓DIM层以及拉链表处理

上一节我们讲解了数仓DIM维度层的搭建和使用,并且讲解了拉链表的概念和使用。这节我们讲解DWD层关于用户行为日志数据的搭建和使用。下一次分享DWD层关于业务数据的搭建和使用。

一、DWD层用户行为日志解析结构

DWD层是对用户的日志行为进行解析,以及对业务数据采用维度模型的方式重新建模(维度退化)。本节我们先来回顾一下用户行为日志的结构。

1、前端埋点日志信息

前端埋点日志信息都是JSON格式形式,主要包括两方面:

(1)启动日志;(2)事件日志;

我们之前已经把前端埋点的日志信息,写到ODS层ods_log表了,传入的参数是一个String类型字符串即一条日志信息一个String类型字符串。

4c468e9f18734d2a4c32ad1eed856765.png

2、日志解析思路

我们根据ODS层日志数据内容来解析到DWD层分为5个表,也可以根据启动日志和事件日志来解析到DWD层分为2个表。前者是根据内容抽象来解析,颗粒度更细,一般大公司使用。后者比较简单就是根据数据的类型来解析。我们这里采用第一种方式,把页面日志和启动日志信息的数据需要装载到DWD层里面的五张表。所以DWD层就是解析这五张表。

30aae8ca9a3219d034fcc186fef87dac.png

我们下面来依次解析每一张表。

二、DWD层-启动日志表

启动日志表中每一行数据对应一个启动记录,一个启动记录应该包含日志中的公共信息和启动信息。

常规解析思路是:可以先将所有包含start字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

1、启动日志表结构

  • 分区:

        dt = 2020-06-14

  • 过滤条件:

        利用get_json_object函数,解析start内容不为空说明是启动日志信息;

  • 范围

        包括:公共信息common、启动信息start、启动app时间ts;

2fddc793023d64b1b46d984c5e907980.png

2、创建表结构

  1. DROP TABLE IF EXISTS dwd_start_log;
  2. CREATE EXTERNAL TABLE dwd_start_log(
  3. `area_code` STRING COMMENT '地区编码',
  4. `brand` STRING COMMENT '手机品牌',
  5. `channel` STRING COMMENT '渠道',
  6. `is_new` STRING COMMENT '是否首次启动',
  7. `model` STRING COMMENT '手机型号',
  8. `mid_id` STRING COMMENT '设备id',
  9. `os` STRING COMMENT '操作系统',
  10. `user_id` STRING COMMENT '会员id',
  11. `version_code` STRING COMMENT 'app版本号',
  12. `entry` STRING COMMENT 'icon手机图标 notice 通知 install 安装后启动',
  13. `loading_time` BIGINT COMMENT '启动加载时间',
  14. `open_ad_id` STRING COMMENT '广告页ID ',
  15. `open_ad_ms` BIGINT COMMENT '广告总共播放时间',
  16. `open_ad_skip_ms` BIGINT COMMENT '用户跳过广告时点',
  17. `ts` BIGINT COMMENT '时间'
  18. ) COMMENT '启动日志表'
  19. PARTITIONED BY (`dt` STRING) -- 按照时间创建分区
  20. STORED AS PARQUET -- 采用parquet列式存储
  21. LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
  22. TBLPROPERTIES('parquet.compression'='lzo'-- 采用LZO压缩;

3、装载数据

首日和每日加载数据分区都是一样的策略,每天DWD层从ODS层获取数据后加载。

9c999e92bc9dbea3949d5e0c27f70733.png

SQL实现

  1. insert overwrite table dwd_start_log partition(dt='2020-06-14')
  2. select
  3. get_json_object(line,'$.common.ar'),
  4. get_json_object(line,'$.common.ba'),
  5. get_json_object(line,'$.common.ch'),
  6. get_json_object(line,'$.common.is_new'),
  7. get_json_object(line,'$.common.md'),
  8. get_json_object(line,'$.common.mid'),
  9. get_json_object(line,'$.common.os'),
  10. get_json_object(line,'$.common.uid'),
  11. get_json_object(line,'$.common.vc'),
  12. get_json_object(line,'$.start.entry'),
  13. get_json_object(line,'$.start.loading_time'),
  14. get_json_object(line,'$.start.open_ad_id'),
  15. get_json_object(line,'$.start.open_ad_ms'),
  16. get_json_object(line,'$.start.open_ad_skip_ms'),
  17. get_json_object(line,'$.ts')
  18. from ods_log
  19. where dt='2020-06-14'
  20. and get_json_object(line,'$.start'is not null;

注意:这里hive需要使用HiveInputFormat,而不是CombineHiveInputFormat

SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

后者不能识别lzo.index的索引文件,会把索引文件当做普通文件来处理,并且导致lzo文件无法切片。而ODS层数据我们处理的时候是带lzo索引文件的。

三、DWD层-页面日志表

页面日志表和启动日志表的处理逻辑一样。页面日志表中每一行数据对应一个页面的访问记录,一个页面访问记录应该包含日志中的公共信息和页面信息。

常规解析思路是:也是先将所有包含page字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

1、页面日志表结构

  • 分区:

        dt = 2020-06-14

  • 过滤条件:

        利用get_json_object函数,解析page内容不为空说明是页面日志信息;

  • 范围

        包括:公共信息common、页面信息page、启动app时间ts;

da5644a1e0635d547edbc8b9e386d865.png

3、创建表结构

  1. drop table if exists dwd_page_log;
  2. CREATE EXTERNAL TABLE dwd_page_log(
  3. `area_code` string COMMENT '地区编码',
  4. `brand` string COMMENT '手机品牌',
  5. `channel` string COMMENT '渠道',
  6. `model` string COMMENT '手机型号',
  7. `mid_id` string COMMENT '设备id',
  8. `os` string COMMENT '操作系统',
  9. `user_id` string COMMENT '会员id',
  10. `version_code` string COMMENT 'app版本号',
  11. `during_time` bigint COMMENT '持续时间毫秒',
  12. `page_item` string COMMENT '目标id ',
  13. `page_item_type` string COMMENT '目标类型',
  14. `last_page_id` string COMMENT '上页类型',
  15. `page_id` string COMMENT '页面ID ',
  16. `source_type` string COMMENT '来源类型',
  17. `ts` bigint
  18. ) COMMENT '页面日志表'
  19. PARTITIONED BY (dt string)
  20. stored as parquet
  21. LOCATION '/warehouse/gmall/dwd/dwd_page_log'
  22. TBLPROPERTIES('parquet.compression'='lzo');

4、装载数据

和启动日志表一样,首日和每日加载数据分区都是一样的策略,每天DWD层从ODS层获取数据后加载。

  1. insert overwrite table dwd_page_log partition(dt='2020-06-14')
  2. select
  3. get_json_object(line,'$.common.ar'),
  4. get_json_object(line,'$.common.ba'),
  5. get_json_object(line,'$.common.ch'),
  6. get_json_object(line,'$.common.md'),
  7. get_json_object(line,'$.common.mid'),
  8. get_json_object(line,'$.common.os'),
  9. get_json_object(line,'$.common.uid'),
  10. get_json_object(line,'$.common.vc'),
  11. get_json_object(line,'$.page.during_time'),
  12. get_json_object(line,'$.page.item'),
  13. get_json_object(line,'$.page.item_type'),
  14. get_json_object(line,'$.page.last_page_id'),
  15. get_json_object(line,'$.page.page_id'),
  16. get_json_object(line,'$.page.sourceType'),
  17. get_json_object(line,'$.ts')
  18. from ods_log
  19. where dt='2020-06-14'
  20. and get_json_object(line,'$.page'is not null;

四、DWD层-动作日志表

动作日志表,比前面启动日志和页面信息表要复杂的多。动作日志表中每行数据对应的是用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。

常规解析思路是:先将包含action字段的日志过滤出来,然后通过UDF、UDTF函数(用户定义表生成函数user-defined table-generating function)将action数组“炸裂”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。

1、页面日志表结构

6c29f02a5cc209faeb813066caaf869e.png

根据图示我们可以看到:

(1)需要自定义创建UDTF函数

来完成对actions动作数组的“炸裂”,实现“一行输入,多行输出”的需求。即输入JSON数组字符串actions,输出每一个JSON数组元素action。

(2)然后通过get_json_object(action,$action元素字段)获取信息;

  • 分区:

        dt = 2020-06-14

  • 过滤条件:

      利用get_json_object函数,解析actions内容不为空;

  • 范围

        包括:公共信息common、页面信息page、动作信息、启动app时间ts;

2、创建表结构

  1. drop table if exists dwd_action_log;
  2. CREATE EXTERNAL TABLE dwd_action_log(
  3. `area_code` string COMMENT '地区编码',
  4. `brand` string COMMENT '手机品牌',
  5. `channel` string COMMENT '渠道',
  6. `model` string COMMENT '手机型号',
  7. `mid_id` string COMMENT '设备id',
  8. `os` string COMMENT '操作系统',
  9. `user_id` string COMMENT '会员id',
  10. `version_code` string COMMENT 'app版本号',
  11. `during_time` bigint COMMENT '持续时间毫秒',
  12. `page_item` string COMMENT '目标id ',
  13. `page_item_type` string COMMENT '目标类型',
  14. `last_page_id` string COMMENT '上页类型',
  15. `page_id` string COMMENT '页面id ',
  16. `source_type` string COMMENT '来源类型',
  17. `action_id` string COMMENT '动作id',
  18. `item` string COMMENT '目标id ',
  19. `item_type` string COMMENT '目标类型',
  20. `ts` bigint COMMENT '时间'
  21. ) COMMENT '动作日志表'
  22. PARTITIONED BY (dt string)
  23. stored as parquet
  24. LOCATION '/warehouse/gmall/dwd/dwd_action_log'
  25. TBLPROPERTIES('parquet.compression'='lzo');

3、创建UDTF函数

我们通过java代码来实现功能,并且通过MAVEN来打成jar。然后上传到HDFS集群,给hive做关联调用。当然也可以使用hive自带的UDTF函数含来完成解析。但是后者必须是在hive的客户端完成,必须带hive库名,不够灵活。

3.1、编写java代码实现UDTF功能

1、pom.xml文件中hive版本依赖

我这里hive的版本是3.1.2

  1. <dependencies>
  2. <!--添加hive依赖-->
  3. <dependency>
  4. <groupId>org.apache.hive</groupId>
  5. <artifactId>hive-exec</artifactId>
  6. <version>3.1.2</version>
  7. </dependency>
  8. </dependencies>

2、项目工程结构

这个比较简单。

e39493dd8fe71d22ae33a42df1850fd4.png

3、ExplodeJSONArray类编写

思路是:

(1)ExplodeJSONArray继承hive的GenericUDTF类;

(2)实现initialize、process、close三个方法;

initialize方法:主要对解析的变量名、变量类型做校验

process方法:完成对“JSON数组炸裂成单一元素”的功能。这里是一行in,多行out,并且forward到元素里面

  1. package com.qiusheng.hive.udtf;
  2. import org.apache.hadoop.hive.ql.exec.MapredContext;
  3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  4. import org.apache.hadoop.hive.ql.metadata.HiveException;
  5. import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
  6. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  7. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  8. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  9. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
  10. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  11. import org.json.JSONArray;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. /**
  15. * @author qiusheng
  16.  * @date 2021年11月04日
  17. *
  18. */
  19. public class ExplodeJSONArray extends GenericUDTF
  20. {
  21. /**
  22. * 1、第一步需要自定义的类继承GenericUDTF类
  23. * 并且重写initialize方法;
  24. * 返回StructObjectInspector对象
  25. *
  26. * @qiusheng
  27. * @param argOIs
  28. * @return
  29. * @throws UDFArgumentException
  30. */
  31. @Override
  32. public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException
  33. {
  34. //1、判断参数的合法性argOI实际就是actions
  35. //(1)判断参数的个数(需要传递一个参数)
  36. //如果传递的参数个数不是1个,则抛异常;
  37. if(argOIs.getAllStructFieldRefs().size() != 1)
  38. {
  39. throw new UDFArgumentException("参数个数错误!只需要1个参数......");
  40. }
  41. //(2)判断传递的参数类型,必须是String类型
  42. //如果不是string类型,则抛异常;
  43. if(!"String".equals(argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName()))
  44. {
  45. throw new UDFArgumentException("参数类型不对!应该是String类型......");
  46. }
  47. //2、返回StructObjectInspector对象
  48. //第一个参数:变量名,类型是List<String>数组;
  49. //第二个参数:检验变量,类型是List<ObjectInspector>;
  50. //定义返回值名称
  51. List<String> fieldNames = new ArrayList<String>();
  52. //定义返回值类型
  53. List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
  54. //把items添加到返回值名称
  55. fieldNames.add("items");
  56. //调用检验方法对items
  57. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
  58. //3、返回类型
  59. return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
  60.     }
  61. /**
  62. * 2、第二步重写process方法
  63. *
  64. * function:炸裂功能
  65. * @author qiusheng
  66. * @param objects
  67. * @throws HiveException
  68. */
  69. @Override
  70. public void process(Object[] objects) throws HiveException
  71. {
  72. //1、获取传入的数据就是actions
  73. String jsonArray = objects.toString();
  74. //2、把传入的数据(string)类型转化为JSON数组JSONArray类型
  75. JSONArray actions = new JSONArray(jsonArray);
  76. //3、循环取出JSONArray对象中的JSON,并且写出来
  77. for (int i = 0;i < actions.length();i++)
  78. {
  79. //定义一个字符串数组,长度就是1
  80. String[] result = new String[1];
  81. //getString(i)把元素取出来,添加到这个数组中
  82. result[0] = actions.getString(i);
  83. //写到String数组里面
  84. forward(result);
  85. }
  86. }
  87. /**
  88. * @author qiusheng
  89. * @throws HiveException
  90. */
  91. @Override
  92. public void close() throws HiveException
  93. {
  94. }
  95. }

3.2、通过maven打成jar包

4e07df9704c476725704b3d4b700fa5c.png

3.3、上传jar包到hadoop集群路径下

(1)先上传jar包到CentOS集群的node1节点

f712a2b43c01d34272aa600af7394768.png

(2)上传到HDFS集群系统

先创建目录functions/hive/jars

[qiusheng@node01 module]$ hadoop fs -mkdir -p /functions/hive/jars

检查查看目录是否已经创建;

0d770bc9dbff347c1f79e08f524e9721.png

上传jar包到这个目录/functions/hive/jars

ea8c928e70b76da5d5aea03a987f4490.png

3.4、创建UDTF和jar包关联

在hive客户端,创建function关联jar包

  1. hive (gmall)>
  2. create function explode_json_array as 'com.qiusheng.hive.udtf.ExplodeJSONArray' using jar 'hdfs://mode01:8020/functions/hive/jars/HiveDWDActionLog-1.0-SNAPSHOT.jar';

4、装载数据

所需要的字段都拼接齐全了,我们来写装载语句。

  1. insert overwrite table dwd_action_log partition(dt='2020-06-14')
  2. select
  3. get_json_object(line,'$.common.ar'),
  4. get_json_object(line,'$.common.ba'),
  5. get_json_object(line,'$.common.ch'),
  6. get_json_object(line,'$.common.md'),
  7. get_json_object(line,'$.common.mid'),
  8. get_json_object(line,'$.common.os'),
  9. get_json_object(line,'$.common.uid'),
  10. get_json_object(line,'$.common.vc'),
  11. get_json_object(line,'$.page.during_time'),
  12. get_json_object(line,'$.page.item'),
  13. get_json_object(line,'$.page.item_type'),
  14. get_json_object(line,'$.page.last_page_id'),
  15. get_json_object(line,'$.page.page_id'),
  16. get_json_object(line,'$.page.sourceType'),
  17. get_json_object(action,'$.action_id'),
  18. get_json_object(action,'$.item'),
  19. get_json_object(action,'$.item_type'),
  20. get_json_object(action,'$.ts')
  21. from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action
  22. where dt='2020-06-14'
  23. and get_json_object(line,'$.actions'is not null;

这里lateral view意思是:为原始表的每行调用UDTF,UTDF会把一行拆分成一或者多行,lateral view再把结果组合,产生一个支持别名表的虚拟表action。

from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action

这样我们动作日志表就完成了。

五、DWD层-曝光日志表

曝光日志表和动作日志表,解析一样。曝光日志表中每行数据对应一个曝光记录,一个曝光记录应当包含公共信息、页面信息以及曝光信息。

常规思路:先将包含display字段的日志过滤出来,然后通过UDTF函数,将display数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。

1、页面日志表结构

15df573d3e13e8a823a556f8d0989048.png

后面建表、做UDTF、一系列操作、装载数据和动作日志表一样。留给读者自行实践一下。

六、DWD层-错误日志表

动作日志表,比前面启动日志和页面信息表要复杂一些。错误日志表中每行数据对应一个错误记录,为方便定位错误,一个错误记录应当包含与之对应的公共信息、页面信息、曝光信息、动作信息、启动信息以及错误信息。先将包含err字段的日志过滤出来,然后使用get_json_object函数解析所有字段。

1、页面日志表结构

d76f4c2e576dfc4798957578d4a69edd.png

含有2个actions 、displays数组的UDTF炸裂。

2、创建表结构

  1. drop table if exists dwd_error_log;
  2. CREATE EXTERNAL TABLE dwd_error_log(
  3. `area_code` string COMMENT '地区编码',
  4. `brand` string COMMENT '手机品牌',
  5. `channel` string COMMENT '渠道',
  6. `model` string COMMENT '手机型号',
  7. `mid_id` string COMMENT '设备id',
  8. `os` string COMMENT '操作系统',
  9. `user_id` string COMMENT '会员id',
  10. `version_code` string COMMENT 'app版本号',
  11. `page_item` string COMMENT '目标id ',
  12. `page_item_type` string COMMENT '目标类型',
  13. `last_page_id` string COMMENT '上页类型',
  14. `page_id` string COMMENT '页面ID ',
  15. `source_type` string COMMENT '来源类型',
  16. `entry` string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
  17. `loading_time` string COMMENT '启动加载时间',
  18. `open_ad_id` string COMMENT '广告页ID ',
  19. `open_ad_ms` string COMMENT '广告总共播放时间',
  20. `open_ad_skip_ms` string COMMENT '用户跳过广告时点',
  21. `actions` string COMMENT '动作',
  22. `displays` string COMMENT '曝光',
  23. `ts` string COMMENT '时间',
  24. `error_code` string COMMENT '错误码',
  25. `msg` string COMMENT '错误信息'
  26. ) COMMENT '错误日志表'
  27. PARTITIONED BY (dt string)
  28. stored as parquet
  29. LOCATION '/warehouse/gmall/dwd/dwd_error_log'
  30. TBLPROPERTIES('parquet.compression'='lzo');

注意:

对动作数组和曝光数组做处理,如需分析错误与单个动作或曝光的关联,可先使用explode_json_array函数将数组“炸开”,再使用get_json_object函数获取具体字

3、装载表数据

  1. insert overwrite table dwd_error_log partition(dt='2020-06-14')
  2. select
  3. get_json_object(line,'$.common.ar'),
  4. get_json_object(line,'$.common.ba'),
  5. get_json_object(line,'$.common.ch'),
  6. get_json_object(line,'$.common.md'),
  7. get_json_object(line,'$.common.mid'),
  8. get_json_object(line,'$.common.os'),
  9. get_json_object(line,'$.common.uid'),
  10. get_json_object(line,'$.common.vc'),
  11. get_json_object(line,'$.page.item'),
  12. get_json_object(line,'$.page.item_type'),
  13. get_json_object(line,'$.page.last_page_id'),
  14. get_json_object(line,'$.page.page_id'),
  15. get_json_object(line,'$.page.sourceType'),
  16. get_json_object(line,'$.start.entry'),
  17. get_json_object(line,'$.start.loading_time'),
  18. get_json_object(line,'$.start.open_ad_id'),
  19. get_json_object(line,'$.start.open_ad_ms'),
  20. get_json_object(line,'$.start.open_ad_skip_ms'),
  21. get_json_object(line,'$.actions'),
  22. get_json_object(line,'$.displays'),
  23. get_json_object(line,'$.ts'),
  24. get_json_object(line,'$.err.error_code'),
  25. get_json_object(line,'$.err.msg')
  26. from ods_log
  27. where dt='2020-06-14'
  28. and get_json_object(line,'$.err'is not null;

这样我们就完成了最后一张表从ODS层到DWD层的解析。

3e1afc32a230ad5c97d6e2c3cbd24204.png

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/592062
推荐阅读
相关标签
  

闽ICP备14008679号