赞
踩
掌握flume的使用,将模拟生成的数据通过日志拦截器的选择,上传到kafka,再由kafka上传到hdfs中存储
在产品的开发和设计,尤其针对互联网的产品,对用户行为的数据进行筛选、统计、分析,从而发现用户的一些使用习惯,操作规律,并将这些规律反馈到产品的设计、开发、运维、市场以及营销上,进而对一些预先的方案进行调整和修正从而达到更好的效果。所以进行用户行为数据分析是非常必要的,而用户行为数据分析的前置要求就是,需要一个用户行为数据采集的平台。
1.目标数据:我们要收集和分析的数据主要包括页面数据、事件数据、曝光数据、启动数据和错误数据。
①页面数据
页面数据主要记录一个页面的用户访问情况,包括访问时间、停留书简、页面路径等信息。
1)所有页面id如下
- home("首页"),
-
- category("分类页"),
-
- discovery("发现页"),
-
- top_n("热门排行"),
-
- favor("收藏页"),
-
- search("搜索页"),
-
- good_list("商品列表页"),
-
- good_detail("商品详情"),
-
- good_spec("商品规格"),
-
- comment("评价"),
-
- comment_done("评价完成"),
-
- comment_list("评价列表"),
-
- cart("购物车"),
-
- trade("下单结算"),
-
- payment("支付页面"),
-
- payment_done("支付完成"),
-
- orders_all("全部订单"),
-
- orders_unpaid("订单待支付"),
-
- orders_undelivered("订单待发货"),
-
- orders_unreceipted("订单待收货"),
-
- orders_wait_comment("订单待评价"),
-
- mine("我的"),
-
- activity("活动"),
-
- login("登录"),
-
- register("注册");
2)所有页面对象类型如下:
- sku_id("商品skuId"),
-
- keyword("搜索关键词"),
-
- sku_ids("多个商品skuId"),
-
- activity_id("活动id"),
-
- coupon_id("购物券id");
3)所有来源类型如下:
- promotion("商品推广"),
-
- recommend("算法推荐商品"),
-
- query("查询结果商品"),
-
- activity("促销活动");
②事件数据
事件数据主要记录应用内一个具体操作行为,包括操作类型、操作对象、操作对象描述等信息。
1)所有动作类型如下:
- favor_add("添加收藏"),
-
- favor_canel("取消收藏"),
-
- cart_add("添加购物车"),
-
- cart_remove("删除购物车"),
-
- cart_add_num("增加购物车商品数量"),
-
- cart_minus_num("减少购物车商品数量"),
-
- trade_add_address("增加收货地址"),
-
- get_coupon("领取优惠券");
注:对于下单、支付等业务数据,可从业务数据库获取。
2)所有动作目标类型如下:
- sku_id("商品"),
-
- coupon_id("购物券");
③曝光数据
曝光数据主要记录页面所曝光的内容,包括曝光对象,曝光类型等信息。
1)所有曝光类型如下:
- promotion("商品推广"),
- recommend("算法推荐商品"),
- query("查询结果商品"),
- activity("促销活动");
2)所有曝光对象类型如下:
- sku_id("商品skuId"),
-
- activity_id("活动id");
④启动数据:启动数据记录应用的启动信息。
1)所有启动入口类型如下:
- icon("图标"),
- notification("通知"),
- install("安装后启动");
⑤错误数据:错误数据记录应用使用过程中的错误信息,包括错误编号及错误信息。
6.数据埋点
①主流埋点方式
目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点三种。
代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。
可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。
全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。
②埋点数据日志结构
我们的日志结构大致可分为两类,一是普通页面埋点日志,二是启动日志。
普通页面日志结构如下,每条日志包含了,当前页面的页面信息,所有事件(动作)、所有曝光信息以及错误信息。除此之外,还包含了一系列公共信息,包括设备信息,地理位置,应用信息等,即下边的common字段。
1)普通页面埋点日志格式
- {
- "common": { -- 公共信息
- "ar": "230000", -- 地区编码
- "ba": "iPhone", -- 手机品牌
- "ch": "Appstore", -- 渠道
- "md": "iPhone 8", -- 手机型号
- "mid": "YXfhjAYH6As2z9Iq", -- 设备id
- "os": "iOS 13.2.9", -- 操作系统
- "uid": "485", -- 会员id
- "vc": "v2.1.134" -- app版本号
- },
- "actions": [ --动作(事件)
- {
- "action_id": "favor_add", --动作id
- "item": "3", --目标id
- "item_type": "sku_id", --目标类型
- "ts": 1585744376605 --动作时间戳
- }
- ],
- "displays": [
- {
- "displayType": "query", -- 曝光类型
- "item": "3", -- 曝光对象id
- "item_type": "sku_id", -- 曝光对象类型
- "order": 1 --出现顺序
- },
- {
- "displayType": "promotion",
- "item": "6",
- "item_type": "sku_id",
- "order": 2
- },
- {
- "displayType": "promotion",
- "item": "9",
- "item_type": "sku_id",
- "order": 3
- },
- {
- "displayType": "recommend",
- "item": "6",
- "item_type": "sku_id",
- "order": 4
- },
- {
- "displayType": "query ",
- "item": "6",
- "item_type": "sku_id",
- "order": 5
- }
- ],
- "page": { --页面信息
- "during_time": 7648, -- 持续时间毫秒
- "item": "3", -- 目标id
- "item_type": "sku_id", -- 目标类型
- "last_page_id": "login", -- 上页类型
- "page_id": "good_detail", -- 页面ID
- "sourceType": "promotion" -- 来源类型
- },
- "err":{ --错误
- "error_code": "1234", --错误码
- "msg": "***********" --错误信息
- },
- "ts": 1585744374423 --跳入时间戳
- }
2)启动日志格式
启动日志结构相对简单,主要包含公共信息,启动信息和错误信息。
- {
- "common": {
- "ar": "370000",
- "ba": "Honor",
- "ch": "wandoujia",
- "md": "Honor 20s",
- "mid": "eQF5boERMJFOujcp",
- "os": "Android 11.0",
- "uid": "76",
- "vc": "v2.1.134"
- },
- "start": {
- "entry": "icon", --icon手机图标 notice 通知 install 安装后启动
- "loading_time": 18803, --启动加载时间
- "open_ad_id": 7, --广告页ID
- "open_ad_ms": 3449, -- 广告总共播放时间
- "open_ad_skip_ms": 1989 -- 用户跳过广告时点
- },
- "err":{ --错误
- "error_code": "1234", --错误码
- "msg": "***********" --错误信息
- },
- "ts": 1585744304000
- }
③埋点数据上报时机
埋点数据上报时机包括两种方式。
方式一,在离开该页面时,上传在这个页面产生的所有数据(页面、事件、曝光、错误等)。优点,批处理,减少了服务器接收数据压力。缺点,不是特别及时。
方式二,每个事件、动作、错误等,产生后,立即发送。优点,响应及时。缺点,对服务器接收数据压力比较大。
ubuntu 16.04
openssh 8.4
hadoop 3.1.3
flume 1.9.0
kafka 2.41
zookeeper 3.5.7
1.启动服务
想要进行数据仓库的搭建,首先要做的就是把搭建在本地的一个hadoop集群给启动起来,这里准备好了启动集群所需的命令文件,执行即可
首先是Hadoop服务
初始化namenode并启动Hadoop服务
- hadoop namenode -format
- start-all.sh
然后是zookeeper服务
zkServer.sh start
最后是kafka服务
-
- /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
-
-
复制到实验机
可使用jps命令查看是否服务全部启动
2.模拟数据
在下载application.properties、gmall-mock-log.jar、path.json、logback.xml,四个文件,这个是生成数据的java文件,其中application.properties可以配置我们需要生成日志文件的内容
- mkdir /opt/module
- cd /opt/module
- wget http://10.90.3.2/LMS/Project/shucang/applog.zip
- unzip applog.zip
- cd applog
- vim application.properties
复制到实验机
- #外部配置打开
- logging.config=./logback.xml
- #业务日期
- mock.date=2021-01-25
- #模拟数据发送模式
- mock.type=log
- #mock.type=http
- #http 模式下,发送的地址
- mock.url=http://localhost:8080/applog
- #启动次数
- mock.startup.count=100
- #设备最大值
- mock.max.mid=50
- #会员最大值
- mock.max.uid=500
- #商品最大值
- mock.max.sku-id=10
- #页面平均访问时间
- mock.page.during-time-ms=20000
- #错误概率 百分比
- mock.error.rate=3
- #每条日志发送延迟 ms
- mock.log.sleep=10
- #商品详情来源 用户查询,商品推广,智能推荐, 促销活动
- mock.detail.source-type-rate=40:25:15:20
复制到实验机
业务日期处可以更改为自选日期
其余文件不需更改,其中path.json用来配置访问路径,也可以根据需求灵活配置用户点击路径
logback配置文件可以修改日志生成路径,修改内容在第三行处的property name
返回/opt/module/applog路径执行命令
java -jar gmall-mock-log.jar
复制到实验机
运行完成后日志生成在/opt/module/applog/log中,日志上的日期是你执行文件的日期,但是内容中的日期是你设置的日期,这一点要分开
2.日志采集flume配置
下载日志拦截器到flume下的lib
wget -P /opt/apache-flume-1.9.0-bin/lib http://10.90.3.2/LMS/Project/shucang/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
打开到对应目录
cd /opt/apache-flume-1.9.0-bin/conf
复制到实验机
创建编辑配置文件
vim file-flume-kafka.conf
复制到实验机
- #为各组件命名
- a1.sources = r1
- a1.channels = c1
-
- #描述source
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.filegroups = f1
- a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
- a1.sources.r1.positionFile = /opt/apache-flume-1.9.0-bin/taildir_position.json
- a1.sources.r1.interceptors = i1
- #这里用到了一个日志拦截器,放在了flume的lib中,用途是把集群运行日志,和用户行为日志分开
- a1.sources.r1.interceptors.i1.type = com.sugonedu.flume.interceptor.LogInterceptor$Builder
-
- #描述channel
- a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers = localhost:9092
- a1.channels.c1.kafka.topic = topic_log
- a1.channels.c1.parseAsFlumeEvent = false
-
- #绑定source和channel以及sink和channel的关系
- a1.sources.r1.channels = c1
该配置文件就是扫描/opt/module/applog/log下的log文件,通过日志拦截器的选择,将有用的用户行为部分,传入kafka的topic_log中
我们可以写一个运行这个配置的flume
nohup /opt/apache-flume-1.9.0-bin/bin/flume-ng agent --conf-file /opt/apache-flume-1.9.0-bin/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/apache-flume-1.9.0-bin/log1.txt 2>&1 &
在这里我们用了nohub,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
flume-ng是运行flume的可执行文件,使用--conf-file指定通过哪儿个配置文件启动,--name中指定的a1就是我们在配置文件中书写的a1
3.然后是日志消费flume的配置
vim /opt/apache-flume-1.9.0-bin/conf/kafka-flume-hdfs.conf
复制到实验机
- ## 组件
- a2.sources=r1
- a2.channels=c1
- a2.sinks=k1
-
- ## source1
- a2.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
- a2.sources.r1.batchSize = 5000
- a2.sources.r1.batchDurationMillis = 2000
- a2.sources.r1.kafka.bootstrap.servers = localhost:9092
- a2.sources.r1.kafka.topics=topic_log
- a2.sources.r1.interceptors = i1
- a2.sources.r1.interceptors.i1.type = com.sugonedu.flume.interceptor.TimeStampInterceptor$Builder
-
- ## channel1
- a2.channels.c1.type = file
- a2.channels.c1.checkpointDir = /opt/apache-flume-1.9.0-bin/checkpoint/behavior1
- a2.channels.c1.dataDirs = /opt/apache-flume-1.9.0-bin/data/behavior1/
- a2.channels.c1.maxFileSize = 2146435071
- a2.channels.c1.capacity = 1000000
- a2.channels.c1.keep-alive = 6
-
-
- ## sink1
- a2.sinks.k1.type = hdfs
- a2.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
- a2.sinks.k1.hdfs.filePrefix = log-
- a2.sinks.k1.hdfs.round = false
-
-
- a2.sinks.k1.hdfs.rollInterval = 10
- a2.sinks.k1.hdfs.rollSize = 134217728
- a2.sinks.k1.hdfs.rollCount = 0
-
- ## 控制输出文件是原生文件。
- a2.sinks.k1.hdfs.fileType = DataStream
- ## 拼装
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel= c1
复制到实验机
这个配置文件就是通过flume将topic_log中的信息生成文件传入hdfs中的/origin_data/gmall/log/topic_log/目录下
同样可以来启动这一flume
nohup /opt/apache-flume-1.9.0-bin/bin/flume-ng agent --conf-file /opt/apache-flume-1.9.0-bin/conf/kafka-flume-hdfs.conf --name a2 -Dflume.root.logger=INFO,LOGFILE >/opt/apache-flume-1.9.0-bin/log2.txt 2>&1 &
复制到实验机
这样我们就启动成功两个flume,现在我们的flume就在扫描/opt/module/applog/log下的新生成的文件,和文件中新增的数据,并把它传入kafka的topic中,然后再传到hdfs中
我们再生成一次数据
- cd /opt/module/applog
- java -jar gmall-mock-log.jar
查看创建的topic
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
查看topic中的信息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_log
就会输出topic中的内容,而且会一直监听这个topic如果我们再生成一次数据,会看到这里的内容开始刷新
最后我们看一下输出在hdfs中的文件,打开桌面上的浏览器
http://localhost:50070
复制到实验机
输入网址可看到
这个界面,然后我们
选择浏览文件系统,就可以看到生成的文件夹
同样,也能看到生成的数据
通过这次实验,我们学了flume的使用,了解了在数仓项目中,是如何将日志文件中的用户行为的数据提取出来,以kafka为中间节点,传入hdfs中进行存储。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。