当前位置:   article > 正文

离线电商数仓知识笔记沉淀-流程及用户行为采集平台

离线电商数仓知识笔记沉淀-流程及用户行为采集平台

learn by :尚硅谷数仓6.0

前置储备:

采集项目和数仓项目的区别:二者具有独立性

1功能角度:

采集项目:以数据的采集、传输为主

数仓项目:以数据计算为主、同时可以储存数据

2以功能延伸到技术区别:

采集项目:flume、kafka、datax、maxwell

数仓项目:mysql、hdfs、spark、Flink、MR、hive

数据库和数据仓库区分:

名称区分:

数据库database:基础核心的数据

数据仓库 data Warehouse:warehouse还有大商店之意,所以存数据不是数据仓库的本质目的,是为了把数据加工处理后对外提供服务

数据来源区分:

数据库:企业中基础核心的业务数据

数据仓库:数据库中的数据

数据存储区分:

数据库:核心作用是查找业务数据,行式存储,索引(快速定位),不能存储海量数据

数据仓库:核心作用是统计分析数据,列式存储,存储海量数据(数据越多统计越准确)

数据价值区分:

数据库:保障全企业全业务正常运行

数据仓库:将数据的统计结果为企业的经营决策提供数据支撑,不是数据流转的终点,需要将统计结果将可视化平台呈现给客户

数据流传过程

(不完整,只记录涉及到的)

客户端——业务服务器(业务数据,行为数据)——数据存储(业务数据库,行为日志)——数据的统计分析(数据仓库)—— 数据可视化(可视化平台,图表)

数据仓库:核心功能统计分析(Hive)

Spark, MR, Flink

SQL:两条技术路线  Spark On Hive ; Hive On Spark,区别:谁解析SQL谁放前面

因为Hive基于Hadoop,所以国内开发用的多一点

统计分析的基本步骤:

数据源——对接并加工数据——统计数据(算一下)——分析数据

数据仓库也应遵循这个步骤

数据存储————数据可视化

如果将数据库直接作为数据仓库的数据源会出现的问题:

1.业务数据库为行式存储,而数据仓库是列式存储,数据不能直接对接——需要把行式数据转化为列式数据

2.业务数据库中存储的数据不是海量,但数仓要求海量,所以直接对接数据量不够

3.数据库不是为了数据仓库服务的,访问会对数据库造成性能影响

所以数据仓库应该设计一个自己的数据源,为了代替和补充数据库:数据存储应和数据库同步的(汇总)

数据仓库的开发用SQL语言进行处理,那么数据的处理步骤应该采用什么方法?

需要将数据转化为结构数据——表,且每一步都有表(应对多需求,类似缓存)

数据仓库的数据源数据需要从数据库中周期性同步,一般将这个同步过程称之为采集

若数据采集时,如果想要将数据同步到数据仓库的数据源,那么就必须知道表结构

        那么采集项目和数据仓库项目就会存在耦合性,但二者应有独立性

所以实际开发中,需要将采集项目和数仓项目解耦合

数据存储—— HDFS(解耦合)——数据仓库数据源

        原理:data&file——HDFS(file)—— hive(table)=> HDFS(file)(hive将磁盘文件管     理成表),如果不选择hive,解耦合用hdfs就未必合适

数据存储(MySQL)—— 数据采集(DataX,Maxwell(data—file),Flume(file—file))——HDFS(解耦合)——数据仓库数据源

数据采集部分

数仓部分

数仓尾巴(Hive)— MySQL (解耦合)— 数据可视化

集群资源规划

1)生产集群(参考腾讯云EMR官方推荐部署

        Master节点:管理节点,保证集群的调度正常运行;主要部署NameNode, ResourceManager, HMaster等进程;非HA模式下数量为1,HA模式数量为2。

注:HA模式-------------------

在数据仓库(Data Warehouse)搭建中,"HA" 模式通常指的是高可用性(High Availability)模式。高可用性是指系统能够在面对硬件故障、软件故障或其他可预见的事件时保持持续运行而不中断服务的能力。

在数据仓库的环境中,高可用性模式可以确保数据仓库系统的稳定性和可靠性,以确保业务的持续运行和数据的安全性。一般来说,实现高可用性模式需要采取多种技术手段,包括但不限于:

  1. 故障转移(Failover):在主节点(Primary Node)出现故障时,系统能够自动切换到备用节点(Secondary Node)以保持服务的连续性。

  2. 负载均衡(Load Balancing):将流量分发到多个节点上,以防止某个节点过载,从而提高整个系统的稳定性和性能。

  3. 数据复制(Data Replication):将数据复制到多个节点上,以确保即使某个节点发生故障,数据仍然可以从其他节点获取,确保数据的可用性和一致性。

  4. 监控和自动恢复(Monitoring and Automatic Recovery):实时监控系统的运行状况,当检测到异常时,自动触发相应的恢复机制,尽快恢复服务。

  5. 灾难恢复(Disaster Recovery):建立备份系统或数据中心,以应对灾难性事件,确保即使整个数据中心或系统发生严重故障,业务也能够在短时间内恢复运行。

        高可用性模式在数据仓库中尤为重要,因为数据仓库通常承载着企业的重要业务数据和决策支 持信息。通过采取高可用性措施,可以最大限度地减少系统停机时间,提高业务连续性和数据安全性。

---------------------

Core节点:为计算及存储节点,在HDFS中的数据全部存储于core节点中,因为为保证数据安全,扩容Core节点后不允许缩容;主要部署DataNode, NodeManager, RegionServer等进程。非HA>=2, HA>=3。

Common节点:为HA集群Master节点提供数据共享同步已经高可用容错服务;主要部署分布式协调器组件,如ZooKeeper,JournalNode等节点,非HA为0,HA>=3。

消耗内存的分开部署

数据传输数据比较紧密的放在一起(Kafka,ClickHouse)

客户端尽量放到一到两台服务器上,方便外部访问

有依赖关系的尽量放到同一台服务器(如:Ds-worker和Hive/Spark)

Master

Master

core

core

core

common

common

common

nn

nn

dn

dn

dn

JournalNode

JournalNode

JournalNode

rm

rm

nm

nm

nm

zk

zk

zk

hive

hive

hive

hive

hive

kafka

kafka

kafka

spark

spark

spark

spark

spark

datax

datax

datax

datax

datax

Ds-master

Ds-master

Ds-worker

Ds-worker

Ds-worker

maxwell

superset

mysql

flume

flume

flink

flink

clickhouse

redis

hbase 

2)测试集群服务器规划

服务名称

服务

服务器

hadoop102

服务器

hadoop103

服务器

hadoop104

HDFS

NameNode

DataNode

SecondaryNameNode

Yarn

NodeManager

Resourcemanager

Zookeeper

Zookeeper Server

Flume(采集日志

Flume

Kafka

Kafka

Flume

消费Kafka日志

Flume

Flume

消费Kafka业务

Flume

Hive

MySQL

MySQL

DataX

Spark

DolphinScheduler

ApiApplicationServer

AlertServer

MasterServer

WorkerServer

LoggerServer

Superset

Superset

Flink

ClickHouse

Redis

Hbase

服务数总计

20

11

12

用户行为日志

包括用户的各项行为信息以及行为所处的环境信息,收集手段通常为埋点。

主流埋点方式:代码埋点、可视化埋点、全埋点

用户行为日志内容:

        本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录

页面浏览记录:记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。

动作记录:记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。

曝光记录:记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。

启动记录:记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。

错误记录:记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。

日志格式:大致分两类:页面日志和启动日志

页面日志:以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

  1. {
  2. "common": { -- 环境信息
  3. "ar": "15", -- 省份ID
  4. "ba": "iPhone", -- 手机品牌
  5. "ch": "Appstore", -- 渠道
  6. "is_new": "1", -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。
  7. "md": "iPhone 8", -- 手机型号
  8. "mid": "YXfhjAYH6As2z9Iq", -- 设备id
  9. "os": "iOS 13.2.9", -- 操作系统
  10. "sid": "3981c171-558a-437c-be10-da6d2553c517" -- 会话id
  11. "uid": "485", -- 会员id
  12. "vc": "v2.1.134" -- app版本号
  13. },
  14. "actions": [{ -- 动作(事件)
  15. "action_id": "favor_add", -- 动作id
  16. "item": "3", -- 目标id
  17. "item_type": "sku_id", -- 目标类型
  18. "ts": 1585744376605 -- 动作时间戳
  19. }
  20. ],
  21. "displays": [{ -- 曝光
  22. "displayType": "query", -- 曝光类型
  23. "item": "3", -- 曝光对象id
  24. "item_type": "sku_id", -- 曝光对象类型
  25. "order": 1, -- 出现顺序
  26. "pos_id": 2 -- 曝光位置
  27. "pos_seq": 1 -- 曝光序列号(同一坑位多个对象的编号)
  28. },
  29. {
  30. "displayType": "promotion",
  31. "item": "6",
  32. "item_type": "sku_id",
  33. "order": 2,
  34. "pos_id": 1
  35. "pos_seq": 1
  36. },
  37. {
  38. "displayType": "promotion",
  39. "item": "9",
  40. "item_type": "sku_id",
  41. "order": 3,
  42. "pos_id": 3
  43. "pos_seq": 1
  44. },
  45. {
  46. "displayType": "recommend",
  47. "item": "6",
  48. "item_type": "sku_id",
  49. "order": 4,
  50. "pos_id": 2
  51. "pos_seq": 1
  52. },
  53. {
  54. "displayType": "query ",
  55. "item": "6",
  56. "item_type": "sku_id",
  57. "order": 5,
  58. "pos_id": 1
  59. "pos_seq": 1
  60. }
  61. ],
  62. "page": { -- 页面信息
  63. "during_time": 7648, -- 持续时间毫秒
  64. "item": "3", -- 目标id
  65. "item_type": "sku_id", -- 目标类型
  66. "last_page_id": "login", -- 上页ID
  67. "page_id": "good_detail", -- 页面ID
  68. "from_pos_id":999, -- 来源坑位ID
  69. "from_pos_seq":999, -- 来源坑位序列号
  70. "refer_id":"2", -- 外部营销渠道ID
  71. "sourceType": "promotion" -- 来源类型
  72. },
  73. "err": { --错误
  74. "error_code": "1234", --错误码
  75. "msg": "***********" --错误信息
  76. },
  77. "ts": 1585744374423 --跳入时间戳
  78. }

启动日志:以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

  1. {
  2. "common": {
  3. "ar": "370000",
  4. "ba": "Honor",
  5. "ch": "wandoujia",
  6. "is_new": "1",
  7. "md": "Honor 20s",
  8. "mid": "eQF5boERMJFOujcp",
  9. "os": "Android 11.0",
  10. "sid":"a1068e7a-e25b-45dc-9b9a-5a55ae83fc81"
  11. "uid": "76",
  12. "vc": "v2.1.134"
  13. },
  14. "start": {
  15. "entry": "icon", --icon手机图标 notice 通知 install 安装后启动
  16. "loading_time": 18803, --启动加载时间
  17. "open_ad_id": 7, --广告页ID
  18. "open_ad_ms": 3449, -- 广告总共播放时间
  19. "open_ad_skip_ms": 1989 -- 用户跳过广告时点
  20. },
  21. "err":{ --错误
  22. "error_code": "1234", --错误码
  23. "msg": "***********" --错误信息
  24. },
  25. "ts": 1585744304000
  26. }

服务器和JDK准备

配置hadoop102、hadoop103、hadoop104三台主机(问题及Hadoop相关另行总结)

编写集群分发脚本xsync

1)xsync集群分发脚本

        需求:循环复制文件到所有节点的相同目录下

        需求分析:

        ①rsync命令原始拷贝

rsync  -av     /opt/module  		 root@hadoop103:/opt/

        ②期望脚本:xsync要同步的文件名称

        ③说明:在/home/atguigu/bin这个目录下存放的脚本,atguigu用户可以在系统任何地方直接执行。

  1. [atguigu@hadoop102 ~]$ echo $PATH
  2. /usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/atguigu/.local/bin:/home/atguigu/bin

脚本实现:

①在用的家目录/home/atguigu下创建bin文件夹

[atguigu@hadoop102 ~]$ mkdir bin

②在/home/atguigu/bin目录下创建xsync文件,以便全局调用

  1. [atguigu@hadoop102 ~]$ cd /home/atguigu/bin
  2. [atguigu@hadoop102 ~]$ vim xsync

        在该文件中编写如下代码

  1. #!/bin/bash
  2. #1. 判断参数个数
  3. if [ $# -lt 1 ]
  4. then
  5. echo Not Enough Arguement!
  6. exit;
  7. fi
  8. #2. 遍历集群所有机器
  9. for host in hadoop102 hadoop103 hadoop104
  10. do
  11. echo ==================== $host ====================
  12. #3. 遍历所有目录,挨个发送
  13. for file in $@
  14. do
  15. #4 判断文件是否存在
  16. if [ -e $file ]
  17. then
  18. #5. 获取父目录
  19. pdir=$(cd -P $(dirname $file); pwd)
  20. #6. 获取当前文件的名称
  21. fname=$(basename $file)
  22. ssh $host "mkdir -p $pdir"
  23. rsync -av $pdir/$fname $host:$pdir
  24. else
  25. echo $file does not exists!
  26. fi
  27. done
  28. done

③修改脚本xsync具有执行权限

[atguigu@hadoop102 bin]$ chmod 777 xsync

④测试脚本

atguigu@hadoop102 bin]$ xsync xsync

SSH无密登录配置

说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。

1)hadoop102上生成公钥和私钥:

[atguigu@hadoop102 .ssh]$ ssh-keygen -t rsa

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)。

2)将hadoop102公钥拷贝到要免密登录的目标机器上

  1. [atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop102
  2. [atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop103
  3. [atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop104

3)hadoop103上生成公钥和私钥:

[atguigu@hadoop103 .ssh]$ ssh-keygen -t rsa

4)拷贝操作亦同hadoop102

JDK准备

1)卸载三台节点上的现有JDK

  1. [atguigu@hadoop102 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
  2. [atguigu@hadoop103 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
  3. [atguigu@hadoop104 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

操作批注:

(1)rpm -qa:表示查询所有已经安装的软件包

(2)grep -i:表示过滤时不区分大小写

(3)xargs -n1:表示一次获取上次执行结果的一个值

(4)rpm -e --nodeps:表示卸载软件

2)用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面

3)在Linux系统下的opt目录查看是否导入成功(ls)

4)解压JDK到/opt/module目录下(tar)

  1. [atguigu@hadoop102 software]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
  2. [atguigu@hadoop102 module]$ mv jdk1.8.0_212/ jdk-1.8.0

5)配置JDK环境变量

(1)新建/etc/profile.d/my_env.sh文件(在module下sudo vim)

        添加如下内容,然后保存(:wq)退出。

  1. #JAVA_HOME
  2. export JAVA_HOME=/opt/module/jdk-1.8.0
  3. export PATH=$PATH:$JAVA_HOME/bin

(2)让环境变量生效

[atguigu@hadoop102 software]$ source /etc/profile.d/my_env.sh

6)测试安装是否成功(java -version)

7)分发JDK(执行刚才的xsync脚本)

[atguigu@hadoop102 module]$ xsync /opt/module/jdk-1.8.0

8)分发环境变量配置文件

[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh

9)在hadoop103,hadoop104上分别执行source

环境变量配置说明

        Linux的环境变量可在多个文件中配置,如/etc/profile,/etc/profile.d/*.sh,~/.bashrc,~/.bash_profile等,下面说明上述几个文件之间的关系和区别。

        bash的运行模式可以分为 login shell 和 non-login shell

(例如,我们通过终端,输入用户名、密码,登录系统之后,得到就是一个login shell。而当我们执行以下命令ssh hadoop103 command,在hadoop103执行command的就是一个non-login shell。)

这两种shell的主要区别在于,它们启动时会加载不同的配置文件,login shell启动时会加载/etc/profile,~/.bash_profile~/.bashrcnon-login shell启动时会加载~/.bashrc

数据模拟

1)将application.ymlgmall-remake-mock-2023-02-17.jarpath.jsonlogback.xml上传到hadoop102的/opt/module/applog目录下(需要 mkdir创建)

2)配置文件

①application.yml文件:可以根据需求生成对应日期的用户行为日志

vim出文件后修改内容(照搬尚硅谷,太繁琐。。)

  1. # 外部配置打开
  2. logging.config: ./logback.xml
  3. #http模式下,发送的地址
  4. mock:
  5. log:
  6. type: "file" #"file" "http" "kafka" "none"
  7. http:
  8. url: "http://localhost:8090/applog"
  9. kafka:
  10. server: "hadoop102:9092,hadoop102:9092,hadoop102:9092"
  11. topic: "topic_log"
  12. spring:
  13. datasource:
  14. type: com.alibaba.druid.pool.DruidDataSource
  15. druid:
  16. url: jdbc:mysql://hadoop102:3306/gmall?characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=GMT%2B8
  17. username: root
  18. password: "000000"
  19. driver-class-name: com.mysql.cj.jdbc.Driver
  20. max-active: 20
  21. test-on-borrow: true
  22. mybatis-plus.global-config.db-config.field-strategy: not_null
  23. mybatis-plus:
  24. mapper-locations: classpath:mapper/*.xml
  25. mybatis:
  26. mapper-locations: classpath:mapper/*.xml
  27. #业务日期, 并非Linux系统时间的日期,而是生成模拟数据的日期
  28. mock.date: "2022-06-08"
  29. # 日志是否写入数据库一份 写入z_log表中
  30. mock.log.db.enable: 1
  31. # 清空
  32. mock.clear.busi: 1
  33. # 清空用户
  34. mock.clear.user: 0
  35. # 批量生成新用户
  36. mock.new.user: 0
  37. #session次数
  38. mock.user-session.count: 200
  39. #设备最大值
  40. mock.max.mid: 1000000
  41. # 是否针对实时生成数据,若启用(置为1)则数据的 yyyy-MM-dd 与 mock.date 一致而 HH:mm:ss 与系统时间一致;若禁用则数据的 yyyy-MM-dd 与 mock.date 一致而 HH:mm:ss 随机分布,此处禁用
  42. mock.if-realtime: 0
  43. #访问时间分布权重
  44. mock.start-time-weight: "10:5:0:0:0:0:5:5:5:10:10:15:20:10:10:10:10:10:20:25:30:35:30:20"
  45. #支付类型占比 支付宝 :微信 :银联
  46. mock.payment_type_weight: "40:50:10"
  47. #页面平均访问时间
  48. mock.page.during-time-ms: 20000
  49. #错误概率 百分比
  50. mock.error.rate: 3
  51. #每条日志发送延迟 ms
  52. mock.log.sleep: 100
  53. #课程详情来源 用户查询,商品推广,智能推荐, 促销活动
  54. mock.detail.source-type-rate: "40:25:15:20"
  55. mock.if-cart-rate: 100
  56. mock.if-favor-rate: 70
  57. mock.if-order-rate: 100
  58. mock.if-refund-rate: 50
  59. #搜索关键词
  60. mock.search.keyword: "java,python,多线程,前端,数据库,大数据,hadoop,flink"
  61. #用户数据变化概率
  62. mock.user.update-rate: 20
  63. # 男女浏览品牌比重(11 品牌)
  64. mock.tm-weight.male: "3:2:5:5:5:1:1:1:1:1:1"
  65. mock.tm-weight.female: "1:5:1:1:2:2:2:5:5:5:5"
  66. # 外连类型比重(5 种)
  67. mock.refer-weight: "10:2:3:4:5"
  68. # 线程池相关配置
  69. mock.pool.core: 20
  70. mock.pool.max-core: 100

②path.json, 用来配置访问路径,根据需求可以灵活配置用户点击路径

  1. [
  2. {"path":["start_app","home", "search", "good_list","good_detail","good_detail" ,"good_detail","cart","order","payment","mine","order_list","end"],"rate":100 },
  3. {"path":["start_app","home", "good_list","good_detail","good_detail" ,"good_detail","cart","end"],"rate":30 },
  4. {"path":["start_app","home", "activity1111","good_detail" ,"cart","good_detail","cart","order","payment","end"],"rate":30 },
  5. {"path":[ "activity1111","good_detail" ,"activity1111" ,"good_detail","order","payment","end"],"rate":200 },
  6. {"path":[ "start_app","home" ,"activity1111" ,"good_detail","order","payment","end"],"rate":200 },
  7. {"path":[ "start_app","home" , "good_detail","order","payment","end"],"rate":30 },
  8. {"path":[ "good_detail","order","payment","end"],"rate":650 },
  9. {"path":[ "good_detail" ],"rate":30 },
  10. {"path":[ "start_app","home","mine","good_detail" ],"rate":30 },
  11. {"path":[ "start_app","home", "good_detail","good_detail","good_detail","cart","order","payment","end" ],"rate":200 },
  12. {"path":[ "start_app","home", "search","good_list","good_detail","cart","order","payment","end" ],"rate":200 }
  13. ]

③logback配置文件,可配置日志生成路径

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3. <property name="LOG_HOME" value="/opt/module/applog/log" />
  4. <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
  5. <target>System.out</target>
  6. <encoder>
  7. <pattern>%msg%n</pattern>
  8. </encoder>
  9. </appender>
  10. <appender name="console_em" class="ch.qos.logback.core.ConsoleAppender">
  11. <target>System.err</target>
  12. <encoder>
  13. <pattern>%msg%n</pattern>
  14. </encoder>
  15. </appender>
  16. <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
  17. <file>${LOG_HOME}/app.log</file>
  18. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  19. <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
  20. </rollingPolicy>
  21. <encoder>
  22. <pattern>%msg%n</pattern>
  23. </encoder>
  24. </appender>
  25. <!-- 将某一个包下日志单独打印日志 -->
  26. <logger name="com.atguigu.mock.util.LogUtil"
  27. level="INFO" additivity="false">
  28. <appender-ref ref="rollingFile" />
  29. <!-- <appender-ref ref="console" />-->
  30. </logger>
  31. <logger name="com.atguigu.gmallre.mock.task.UserMockTask" level="INFO" additivity="false" >
  32. <appender-ref ref="console_em" />
  33. </logger>
  34. <!-- <logger name="com.alibaba.druid.pool" level="error" additivity="false" >-->
  35. <!-- <appender-ref ref="console" />-->
  36. <!-- </logger>-->
  37. <!-- <logger name="com.atguigu.edu2021.mock.mapper" level="debug">-->
  38. <!-- <appender-ref ref="console" />-->
  39. <!-- </logger>-->
  40. <!-- <logger name="com.atguigu.edu2021.mock.service.impl.UserInfoServiceImpl" level="debug">
  41. <appender-ref ref="console" />
  42. </logger>-->
  43. <root level="error" >
  44. <appender-ref ref="console_em" />
  45. <!-- <appender-ref ref="async-rollingFile" /> -->
  46. </root>
  47. </configuration>

3)生成日志

        进入到/opt/module/applog路径,执行以下命令

[atguigu@hadoop102 applog]$ java -jar gmall-remake-mock-2023-02-17.jar test 100 2022-06-08

其中:

 增加test参数为测试模式,只生成用户行为数据不生成业务数据。

 100 为产生的用户session数一个session默认产生1条启动日志和5条页面方法日志。

 第三个参数为日志数据的日期,测试模式下不会加载配置文件,要指定数据日期只能通过命令行传参实现。

 三个参数的顺序必须与示例保持一致

 第二个参数和第三个参数可以省略,如果test后面不填写参数,默认为1000

在/opt/module/applog/log目录下查看生成日志

[atguigu@hadoop102 log]$ ll

集群日志生成脚本

(1)在/home/atguigu/bin目录下vim脚本lg.sh

(2)在脚本中编写如下内容

  1. #!/bin/bash
  2. echo "========== hadoop102 =========="
  3. ssh hadoop102 "cd /opt/module/applog/; nohup java -jar gmall-remake-mock-2023-02-17.jar $1 $2 $3 >/dev/null 2>&1 &"
  4. done

注:

/opt/module/applog/为jar包及配置文件所在路径

②/dev/null代表Linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

标准输入0:从键盘获得输入 /proc/self/fd/0

标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

(3)修改脚本执行权限(chmod 777)

(4)将jar包及配置文件上传至hadoop103/opt/module/applog/路径

(5)启动脚本

atguigu@hadoop102 module]$ lg.sh test 100

(6)分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据

  1. [atguigu@hadoop102 log]$ ls
  2. app.log

用户行为数据采集模块

用户行为日志数据通道:

环境准备

集群命令批量执行脚本xcall,vim后修改然后chmod777, 启动是$ xcall jps

  1. #! /bin/bash
  2. for i in hadoop102 hadoop103 hadoop104
  3. do
  4. echo --------- $i ----------
  5. ssh $i "$*"
  6. done

注:

jps 命令是 Java Virtual Machine Process Status Tool 的缩写,用于显示 Java 虚拟机(JVM)中正在运行的 Java 进程的信息。它通常用于识别在当前系统中正在运行的 Java 进程,以及它们的进程 ID(PID)和主类名。

jps 命令可以在命令行中直接运行,不需要任何参数。它会列出当前系统中正在运行的所有 Java 进程的信息,包括它们的 PID 和主类名。具体来说,jps 命令的输出包括以下信息:

  1. 进程 ID(PID):Java 进程的唯一标识符。
  2. 主类名(Main Class Name):启动 Java 进程时指定的主类名,即包含 main() 方法的类。

jps 命令可以用于以下情况:

  • 识别在系统中运行的 Java 进程,以便进行监控、调试或管理。
  • 确定正在运行的 Java 进程的主类名,以便进一步分析或调试。

Hadoop安装

Zookeeper安装

Kafka安装

Flume安装

日志采集Flume

        Apache Flume 是一个分布式、可靠且高可用的系统,用于收集、聚合和传输大量日志数据或事件数据到中心化数据存储中,如 Apache Hadoop 的 HDFS、Apache HBase、以及 Apache Kafka 等。它的主要作用是简化大规模数据的采集和传输过程,实现数据流的可靠传输,并提供了一套灵活的配置和可扩展的架构,使其能够适应不同类型和规模的数据采集需求。

        按照规划,需要采集的用户行为日志存放在hadoop102,故需要在该节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(json)进行校验,然后将校验通过的日志发送到Kafka。↓

(注:

Flume 如何将采集的日志传输给 Kafka 的基本流程:

  1. 定义数据流配置: 首先,需要定义 Flume 的数据流配置,包括数据源、数据通道和数据目的地。数据源可以是各种数据来源,如日志文件、网络端口、消息队列等。数据通道用于在 Flume 内部缓存和传输数据。数据目的地则指定数据传输的最终目的地,如 Kafka、HDFS、HBase 等。

  2. 启动 Flume Agent: 根据定义的数据流配置,启动 Flume Agent。Flume Agent 是 Flume 的运行实例,负责接收、处理和传输数据。当 Agent 启动后,它会按照配置从数据源获取数据,并将数据传输到指定的目的地。

  3. (在Flume中使用KafkaChannel可省去这一步)配置 Kafka Sink: 在数据流配置中,需要配置一个 Kafka Sink,用于将数据传输到 Kafka。Kafka Sink 是 Flume 提供的一个插件,用于与 Kafka 交互并将数据写入 Kafka 的 Topic 中。在配置 Kafka Sink 时,需要指定 Kafka 的连接信息、Topic 名称等参数。

  4. 数据传输到 Kafka: 当 Flume Agent 运行时,它会从数据源获取数据,并通过配置的数据通道将数据传输到 Kafka Sink。Kafka Sink 将接收到的数据转换为 Kafka 的消息格式,并将消息写入指定的 Kafka Topic 中。

  5. 数据消费: 一旦数据写入 Kafka Topic,消费者可以使用 Kafka 提供的 API 或工具来消费这些数据。消费者可以订阅相应的 Topic,并实时获取到 Flume 传输的日志数据,进行后续的处理和分析。

通过以上流程,Flume 可以将采集的日志数据可靠地传输到 Kafka 中,实现数据的收集、传输和分发,为后续的数据处理和分析提供了可靠的数据来源。

此处可以选择Flume中TaildirSource和KafkaChannel组件,并配置日志校验拦截器。选择原因:

TaildirSource:

TaildirSource 是 Flume 提供的一个源(Source)组件,用于从文件系统中实时读取日志文件的内容,并将其发送到 Flume 的通道(Channel)中。它可以监视指定目录下的日志文件,不断地读取新增的日志内容,并将其转发给 Flume 的后续组件进行处理。TaildirSource 通常用于实时采集应用程序生成的日志数据。

TailDirSource相比ExecSource、SpoolingDirectorySource的优势。

TailDirSource:断点续传、多目录Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

SpoolingDirectorySource监控目录,支持断点续传。

KafkaChannel

KafkaChannel 是 Flume 提供的一个通道(Channel)组件,用于与 Apache Kafka 集成,实现将数据从 Flume 传输到 Kafka 中。KafkaChannel 使用 Kafka 作为底层的数据存储和传输介质,将 Flume 接收到的数据写入 Kafka 的 Topic 中,以便后续的数据消费和处理。KafkaChannel 可以保证数据的可靠性和高吞吐量,适用于大规模的数据传输场景。

采用Kafka Channel,省去了Sink,提高了效率。

日志采集Flume关键配置如下:

日志采集Flume配置实操

1)创建Flume配置文件

        在hadoop102节点的Flume的job目录下创建file_to_kafka.conf。(mkdir,vim)

2)配置文件内容如下

  1. #定义组件
  2. a1.sources = r1
  3. a1.channels = c1
  4. #配置source
  5. a1.sources.r1.type = TAILDIR
  6. a1.sources.r1.filegroups = f1
  7. a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
  8. a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
  9. #配置channel
  10. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  11. a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
  12. a1.channels.c1.kafka.topic = topic_log
  13. a1.channels.c1.parseAsFlumeEvent = false
  14. #组装
  15. a1.sources.r1.channels = c1

日志采集Flume测试

1)启动Zookeeper、Kafka集群

2)启动hadoop102的日志采集Flume

3)启动一个Kafka的Console-Consumer

Kafka 的 console-consumer 是 Kafka 提供的一个命令行工具,用于从 Kafka Topic 中消费消息并在控制台上显示。它是 Kafka 提供的一个实用工具,用于在开发和调试过程中查看和验证 Kafka 中的消息数据。

以下是 console-consumer 命令的基本用法和一些常用选项:

kafka-console-consumer.sh --bootstrap-server <broker-list> --topic <topic-name> [options] 

主要参数说明:

  • --bootstrap-server <broker-list>: 指定 Kafka 集群的地址列表,用于连接到 Kafka 集群。例如:localhost:9092

  • --topic <topic-name>: 指定要消费消息的 Kafka Topic 名称。

一些常用选项包括:

  • --from-beginning: 从最早的消息开始消费,即使在之前已经有消费者消费过该 Topic 中的消息,也会重新消费所有消息。

  • --max-messages <num-messages>: 指定最大消费消息的数量。

  • --property <key=value>: 设置 Kafka Consumer 的配置属性,如 auto.offset.reset, group.id 等。

  • 此处操作代码如下:

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

4)生成数据

5)观察Kafka消费者是否能消费到数据

日志采集Flume启停脚本

在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh

  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. echo " --------启动 hadoop102 采集flume-------"
  5. ssh hadoop102 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
  6. };;
  7. "stop"){
  8. echo " --------停止 hadoop102 采集flume-------"
  9. ssh hadoop102 "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
  10. };;
  11. esac

之后chmod777,启动 f1.sh start 停止 f1.sh stop

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/586155
推荐阅读
相关标签
  

闽ICP备14008679号