赞
踩
learn by :尚硅谷数仓6.0
前置储备:
采集项目和数仓项目的区别:二者具有独立性
1功能角度:
采集项目:以数据的采集、传输为主
数仓项目:以数据计算为主、同时可以储存数据
2以功能延伸到技术区别:
采集项目:flume、kafka、datax、maxwell
数仓项目:mysql、hdfs、spark、Flink、MR、hive
数据库和数据仓库区分:
名称区分:
数据库database:基础核心的数据
数据仓库 data Warehouse:warehouse还有大商店之意,所以存数据不是数据仓库的本质目的,是为了把数据加工处理后对外提供服务
数据来源区分:
数据库:企业中基础核心的业务数据
数据仓库:数据库中的数据
数据存储区分:
数据库:核心作用是查找业务数据,行式存储,索引(快速定位),不能存储海量数据
数据仓库:核心作用是统计分析数据,列式存储,存储海量数据(数据越多统计越准确)
数据价值区分:
数据库:保障全企业全业务正常运行
数据仓库:将数据的统计结果为企业的经营决策提供数据支撑,不是数据流转的终点,需要将统计结果将可视化平台呈现给客户
数据流传过程
(不完整,只记录涉及到的)
客户端——业务服务器(业务数据,行为数据)——数据存储(业务数据库,行为日志)——数据的统计分析(数据仓库)—— 数据可视化(可视化平台,图表)
↓
数据仓库:核心功能统计分析(Hive)
Spark, MR, Flink
SQL:两条技术路线 Spark On Hive ; Hive On Spark,区别:谁解析SQL谁放前面
因为Hive基于Hadoop,所以国内开发用的多一点
统计分析的基本步骤:
数据源——对接并加工数据——统计数据(算一下)——分析数据
数据仓库也应遵循这个步骤
数据存储————数据可视化
如果将数据库直接作为数据仓库的数据源会出现的问题:
1.业务数据库为行式存储,而数据仓库是列式存储,数据不能直接对接——需要把行式数据转化为列式数据
2.业务数据库中存储的数据不是海量,但数仓要求海量,所以直接对接数据量不够
3.数据库不是为了数据仓库服务的,访问会对数据库造成性能影响
所以数据仓库应该设计一个自己的数据源,为了代替和补充数据库:数据存储应和数据库同步的(汇总)
数据仓库的开发用SQL语言进行处理,那么数据的处理步骤应该采用什么方法?
需要将数据转化为结构数据——表,且每一步都有表(应对多需求,类似缓存)
数据仓库的数据源数据需要从数据库中周期性同步,一般将这个同步过程称之为采集
若数据采集时,如果想要将数据同步到数据仓库的数据源,那么就必须知道表结构
那么采集项目和数据仓库项目就会存在耦合性,但二者应有独立性
所以实际开发中,需要将采集项目和数仓项目解耦合↓
数据存储—— HDFS(解耦合)——数据仓库数据源
原理:data&file——HDFS(file)—— hive(table)=> HDFS(file)(hive将磁盘文件管 理成表),如果不选择hive,解耦合用hdfs就未必合适↓
数据存储(MySQL)—— 数据采集(DataX,Maxwell(data—file),Flume(file—file))——HDFS(解耦合)——数据仓库数据源
数据采集部分
数仓部分
数仓尾巴(Hive)— MySQL (解耦合)— 数据可视化
集群资源规划
1)生产集群(参考腾讯云EMR官方推荐部署
Master节点:管理节点,保证集群的调度正常运行;主要部署NameNode, ResourceManager, HMaster等进程;非HA模式下数量为1,HA模式数量为2。
注:HA模式-------------------
在数据仓库(Data Warehouse)搭建中,"HA" 模式通常指的是高可用性(High Availability)模式。高可用性是指系统能够在面对硬件故障、软件故障或其他可预见的事件时保持持续运行而不中断服务的能力。
在数据仓库的环境中,高可用性模式可以确保数据仓库系统的稳定性和可靠性,以确保业务的持续运行和数据的安全性。一般来说,实现高可用性模式需要采取多种技术手段,包括但不限于:
故障转移(Failover):在主节点(Primary Node)出现故障时,系统能够自动切换到备用节点(Secondary Node)以保持服务的连续性。
负载均衡(Load Balancing):将流量分发到多个节点上,以防止某个节点过载,从而提高整个系统的稳定性和性能。
数据复制(Data Replication):将数据复制到多个节点上,以确保即使某个节点发生故障,数据仍然可以从其他节点获取,确保数据的可用性和一致性。
监控和自动恢复(Monitoring and Automatic Recovery):实时监控系统的运行状况,当检测到异常时,自动触发相应的恢复机制,尽快恢复服务。
灾难恢复(Disaster Recovery):建立备份系统或数据中心,以应对灾难性事件,确保即使整个数据中心或系统发生严重故障,业务也能够在短时间内恢复运行。
高可用性模式在数据仓库中尤为重要,因为数据仓库通常承载着企业的重要业务数据和决策支 持信息。通过采取高可用性措施,可以最大限度地减少系统停机时间,提高业务连续性和数据安全性。
---------------------
Core节点:为计算及存储节点,在HDFS中的数据全部存储于core节点中,因为为保证数据安全,扩容Core节点后不允许缩容;主要部署DataNode, NodeManager, RegionServer等进程。非HA>=2, HA>=3。
Common节点:为HA集群Master节点提供数据共享同步已经高可用容错服务;主要部署分布式协调器组件,如ZooKeeper,JournalNode等节点,非HA为0,HA>=3。
消耗内存的分开部署
数据传输数据比较紧密的放在一起(Kafka,ClickHouse)
客户端尽量放到一到两台服务器上,方便外部访问
有依赖关系的尽量放到同一台服务器(如:Ds-worker和Hive/Spark)
Master | Master | core | core | core | common | common | common |
nn | nn | dn | dn | dn | JournalNode | JournalNode | JournalNode |
rm | rm | nm | nm | nm | |||
zk | zk | zk | |||||
hive | hive | hive | hive | hive | |||
kafka | kafka | kafka | |||||
spark | spark | spark | spark | spark | |||
datax | datax | datax | datax | datax | |||
Ds-master | Ds-master | Ds-worker | Ds-worker | Ds-worker | |||
maxwell | |||||||
superset | |||||||
mysql | |||||||
flume | flume | ||||||
flink | flink | ||||||
clickhouse | |||||||
redis | |||||||
hbase |
2)测试集群服务器规划
服务名称 | 子服务 | 服务器 hadoop102 | 服务器 hadoop103 | 服务器 hadoop104 |
HDFS | NameNode | √ | ||
DataNode | √ | √ | √ | |
SecondaryNameNode | √ | |||
Yarn | NodeManager | √ | √ | √ |
Resourcemanager | √ | |||
Zookeeper | Zookeeper Server | √ | √ | √ |
Flume(采集日志) | Flume | √ | √ | |
Kafka | Kafka | √ | √ | √ |
Flume (消费Kafka日志) | Flume | √ | ||
Flume (消费Kafka业务) | Flume | √ | ||
Hive | √ | √ | √ | |
MySQL | MySQL | √ | ||
DataX | √ | √ | √ | |
Spark | √ | √ | √ | |
DolphinScheduler | ApiApplicationServer | √ | ||
AlertServer | √ | |||
MasterServer | √ | |||
WorkerServer | √ | √ | √ | |
LoggerServer | √ | √ | √ | |
Superset | Superset | √ | ||
Flink | √ | |||
ClickHouse | √ | |||
Redis | √ | |||
Hbase | √ | |||
服务数总计 | 20 | 11 | 12 |
包括用户的各项行为信息以及行为所处的环境信息,收集手段通常为埋点。
主流埋点方式:代码埋点、可视化埋点、全埋点
用户行为日志内容:
本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录
页面浏览记录:记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。
动作记录:记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。
曝光记录:记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。
启动记录:记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。
错误记录:记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。
日志格式:大致分两类:页面日志和启动日志
页面日志:以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。
- {
- "common": { -- 环境信息
- "ar": "15", -- 省份ID
- "ba": "iPhone", -- 手机品牌
- "ch": "Appstore", -- 渠道
- "is_new": "1", -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。
- "md": "iPhone 8", -- 手机型号
- "mid": "YXfhjAYH6As2z9Iq", -- 设备id
- "os": "iOS 13.2.9", -- 操作系统
- "sid": "3981c171-558a-437c-be10-da6d2553c517" -- 会话id
- "uid": "485", -- 会员id
- "vc": "v2.1.134" -- app版本号
- },
- "actions": [{ -- 动作(事件)
- "action_id": "favor_add", -- 动作id
- "item": "3", -- 目标id
- "item_type": "sku_id", -- 目标类型
- "ts": 1585744376605 -- 动作时间戳
- }
- ],
- "displays": [{ -- 曝光
- "displayType": "query", -- 曝光类型
- "item": "3", -- 曝光对象id
- "item_type": "sku_id", -- 曝光对象类型
- "order": 1, -- 出现顺序
- "pos_id": 2 -- 曝光位置
- "pos_seq": 1 -- 曝光序列号(同一坑位多个对象的编号)
- },
- {
- "displayType": "promotion",
- "item": "6",
- "item_type": "sku_id",
- "order": 2,
- "pos_id": 1
- "pos_seq": 1
- },
- {
- "displayType": "promotion",
- "item": "9",
- "item_type": "sku_id",
- "order": 3,
- "pos_id": 3
- "pos_seq": 1
- },
- {
- "displayType": "recommend",
- "item": "6",
- "item_type": "sku_id",
- "order": 4,
- "pos_id": 2
- "pos_seq": 1
- },
- {
- "displayType": "query ",
- "item": "6",
- "item_type": "sku_id",
- "order": 5,
- "pos_id": 1
- "pos_seq": 1
- }
- ],
- "page": { -- 页面信息
- "during_time": 7648, -- 持续时间毫秒
- "item": "3", -- 目标id
- "item_type": "sku_id", -- 目标类型
- "last_page_id": "login", -- 上页ID
- "page_id": "good_detail", -- 页面ID
- "from_pos_id":999, -- 来源坑位ID
- "from_pos_seq":999, -- 来源坑位序列号
- "refer_id":"2", -- 外部营销渠道ID
- "sourceType": "promotion" -- 来源类型
- },
- "err": { --错误
- "error_code": "1234", --错误码
- "msg": "***********" --错误信息
- },
- "ts": 1585744374423 --跳入时间戳
- }
启动日志:以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。
- {
- "common": {
- "ar": "370000",
- "ba": "Honor",
- "ch": "wandoujia",
- "is_new": "1",
- "md": "Honor 20s",
- "mid": "eQF5boERMJFOujcp",
- "os": "Android 11.0",
- "sid":"a1068e7a-e25b-45dc-9b9a-5a55ae83fc81"
- "uid": "76",
- "vc": "v2.1.134"
- },
- "start": {
- "entry": "icon", --icon手机图标 notice 通知 install 安装后启动
- "loading_time": 18803, --启动加载时间
- "open_ad_id": 7, --广告页ID
- "open_ad_ms": 3449, -- 广告总共播放时间
- "open_ad_skip_ms": 1989 -- 用户跳过广告时点
- },
- "err":{ --错误
- "error_code": "1234", --错误码
- "msg": "***********" --错误信息
- },
- "ts": 1585744304000
- }
服务器和JDK准备
配置hadoop102、hadoop103、hadoop104三台主机(问题及Hadoop相关另行总结)
编写集群分发脚本xsync
1)xsync集群分发脚本
需求:循环复制文件到所有节点的相同目录下
需求分析:
①rsync命令原始拷贝
rsync -av /opt/module root@hadoop103:/opt/
②期望脚本:xsync要同步的文件名称
③说明:在/home/atguigu/bin这个目录下存放的脚本,atguigu用户可以在系统任何地方直接执行。
- [atguigu@hadoop102 ~]$ echo $PATH
- /usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/atguigu/.local/bin:/home/atguigu/bin
脚本实现:
①在用的家目录/home/atguigu下创建bin文件夹
[atguigu@hadoop102 ~]$ mkdir bin
②在/home/atguigu/bin目录下创建xsync文件,以便全局调用
- [atguigu@hadoop102 ~]$ cd /home/atguigu/bin
- [atguigu@hadoop102 ~]$ vim xsync
在该文件中编写如下代码
- #!/bin/bash
-
- #1. 判断参数个数
- if [ $# -lt 1 ]
- then
- echo Not Enough Arguement!
- exit;
- fi
-
- #2. 遍历集群所有机器
- for host in hadoop102 hadoop103 hadoop104
- do
- echo ==================== $host ====================
- #3. 遍历所有目录,挨个发送
- for file in $@
- do
- #4 判断文件是否存在
- if [ -e $file ]
- then
- #5. 获取父目录
- pdir=$(cd -P $(dirname $file); pwd)
- #6. 获取当前文件的名称
- fname=$(basename $file)
- ssh $host "mkdir -p $pdir"
- rsync -av $pdir/$fname $host:$pdir
- else
- echo $file does not exists!
- fi
- done
- done
③修改脚本xsync具有执行权限
[atguigu@hadoop102 bin]$ chmod 777 xsync
④测试脚本
atguigu@hadoop102 bin]$ xsync xsync
SSH无密登录配置
说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。
1)hadoop102上生成公钥和私钥:
[atguigu@hadoop102 .ssh]$ ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)。
2)将hadoop102公钥拷贝到要免密登录的目标机器上
- [atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop102
- [atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop103
- [atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop104
3)hadoop103上生成公钥和私钥:
[atguigu@hadoop103 .ssh]$ ssh-keygen -t rsa
4)拷贝操作亦同hadoop102
JDK准备
1)卸载三台节点上的现有JDK
- [atguigu@hadoop102 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
-
- [atguigu@hadoop103 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
-
- [atguigu@hadoop104 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
操作批注:
(1)rpm -qa:表示查询所有已经安装的软件包
(2)grep -i:表示过滤时不区分大小写
(3)xargs -n1:表示一次获取上次执行结果的一个值
(4)rpm -e --nodeps:表示卸载软件
2)用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面
3)在Linux系统下的opt目录查看是否导入成功(ls)
4)解压JDK到/opt/module目录下(tar)
- [atguigu@hadoop102 software]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
-
- [atguigu@hadoop102 module]$ mv jdk1.8.0_212/ jdk-1.8.0
5)配置JDK环境变量
(1)新建/etc/profile.d/my_env.sh文件(在module下sudo vim)
添加如下内容,然后保存(:wq)退出。
- #JAVA_HOME
- export JAVA_HOME=/opt/module/jdk-1.8.0
- export PATH=$PATH:$JAVA_HOME/bin
(2)让环境变量生效
[atguigu@hadoop102 software]$ source /etc/profile.d/my_env.sh
6)测试安装是否成功(java -version)
7)分发JDK(执行刚才的xsync脚本)
[atguigu@hadoop102 module]$ xsync /opt/module/jdk-1.8.0
8)分发环境变量配置文件
[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
9)在hadoop103,hadoop104上分别执行source
环境变量配置说明
Linux的环境变量可在多个文件中配置,如/etc/profile,/etc/profile.d/*.sh,~/.bashrc,~/.bash_profile等,下面说明上述几个文件之间的关系和区别。
bash的运行模式可以分为 login shell 和 non-login shell
(例如,我们通过终端,输入用户名、密码,登录系统之后,得到就是一个login shell。而当我们执行以下命令ssh hadoop103 command,在hadoop103执行command的就是一个non-login shell。)
这两种shell的主要区别在于,它们启动时会加载不同的配置文件,login shell启动时会加载/etc/profile,~/.bash_profile,~/.bashrc。non-login shell启动时会加载~/.bashrc。
数据模拟
1)将application.yml、gmall-remake-mock-2023-02-17.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下(需要 mkdir创建)
2)配置文件
①application.yml文件:可以根据需求生成对应日期的用户行为日志
vim出文件后修改内容(照搬尚硅谷,太繁琐。。)
- # 外部配置打开
- logging.config: ./logback.xml
-
-
- #http模式下,发送的地址
- mock:
- log:
- type: "file" #"file" "http" "kafka" "none"
- http:
- url: "http://localhost:8090/applog"
- kafka:
- server: "hadoop102:9092,hadoop102:9092,hadoop102:9092"
- topic: "topic_log"
-
- spring:
- datasource:
- type: com.alibaba.druid.pool.DruidDataSource
- druid:
- url: jdbc:mysql://hadoop102:3306/gmall?characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=GMT%2B8
- username: root
- password: "000000"
- driver-class-name: com.mysql.cj.jdbc.Driver
- max-active: 20
- test-on-borrow: true
-
-
- mybatis-plus.global-config.db-config.field-strategy: not_null
- mybatis-plus:
- mapper-locations: classpath:mapper/*.xml
-
- mybatis:
- mapper-locations: classpath:mapper/*.xml
-
- #业务日期, 并非Linux系统时间的日期,而是生成模拟数据的日期
- mock.date: "2022-06-08"
-
- # 日志是否写入数据库一份 写入z_log表中
- mock.log.db.enable: 1
-
- # 清空
- mock.clear.busi: 1
-
- # 清空用户
- mock.clear.user: 0
-
- # 批量生成新用户
- mock.new.user: 0
- #session次数
- mock.user-session.count: 200
- #设备最大值
- mock.max.mid: 1000000
-
- # 是否针对实时生成数据,若启用(置为1)则数据的 yyyy-MM-dd 与 mock.date 一致而 HH:mm:ss 与系统时间一致;若禁用则数据的 yyyy-MM-dd 与 mock.date 一致而 HH:mm:ss 随机分布,此处禁用
- mock.if-realtime: 0
- #访问时间分布权重
- mock.start-time-weight: "10:5:0:0:0:0:5:5:5:10:10:15:20:10:10:10:10:10:20:25:30:35:30:20"
-
- #支付类型占比 支付宝 :微信 :银联
- mock.payment_type_weight: "40:50:10"
-
- #页面平均访问时间
- mock.page.during-time-ms: 20000
- #错误概率 百分比
- mock.error.rate: 3
- #每条日志发送延迟 ms
- mock.log.sleep: 100
- #课程详情来源 用户查询,商品推广,智能推荐, 促销活动
- mock.detail.source-type-rate: "40:25:15:20"
-
- mock.if-cart-rate: 100
-
- mock.if-favor-rate: 70
-
- mock.if-order-rate: 100
-
- mock.if-refund-rate: 50
-
-
-
- #搜索关键词
- mock.search.keyword: "java,python,多线程,前端,数据库,大数据,hadoop,flink"
-
-
- #用户数据变化概率
- mock.user.update-rate: 20
-
-
- # 男女浏览品牌比重(11 品牌)
- mock.tm-weight.male: "3:2:5:5:5:1:1:1:1:1:1"
- mock.tm-weight.female: "1:5:1:1:2:2:2:5:5:5:5"
-
-
- # 外连类型比重(5 种)
- mock.refer-weight: "10:2:3:4:5"
-
- # 线程池相关配置
- mock.pool.core: 20
- mock.pool.max-core: 100
②path.json, 用来配置访问路径,根据需求可以灵活配置用户点击路径
- [
- {"path":["start_app","home", "search", "good_list","good_detail","good_detail" ,"good_detail","cart","order","payment","mine","order_list","end"],"rate":100 },
- {"path":["start_app","home", "good_list","good_detail","good_detail" ,"good_detail","cart","end"],"rate":30 },
- {"path":["start_app","home", "activity1111","good_detail" ,"cart","good_detail","cart","order","payment","end"],"rate":30 },
- {"path":[ "activity1111","good_detail" ,"activity1111" ,"good_detail","order","payment","end"],"rate":200 },
- {"path":[ "start_app","home" ,"activity1111" ,"good_detail","order","payment","end"],"rate":200 },
- {"path":[ "start_app","home" , "good_detail","order","payment","end"],"rate":30 },
- {"path":[ "good_detail","order","payment","end"],"rate":650 },
- {"path":[ "good_detail" ],"rate":30 },
- {"path":[ "start_app","home","mine","good_detail" ],"rate":30 },
- {"path":[ "start_app","home", "good_detail","good_detail","good_detail","cart","order","payment","end" ],"rate":200 },
- {"path":[ "start_app","home", "search","good_list","good_detail","cart","order","payment","end" ],"rate":200 }
- ]
③logback配置文件,可配置日志生成路径
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration>
- <property name="LOG_HOME" value="/opt/module/applog/log" />
- <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
- <target>System.out</target>
- <encoder>
- <pattern>%msg%n</pattern>
- </encoder>
- </appender>
-
- <appender name="console_em" class="ch.qos.logback.core.ConsoleAppender">
- <target>System.err</target>
- <encoder>
- <pattern>%msg%n</pattern>
- </encoder>
- </appender>
-
- <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${LOG_HOME}/app.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
- </rollingPolicy>
- <encoder>
- <pattern>%msg%n</pattern>
- </encoder>
- </appender>
-
- <!-- 将某一个包下日志单独打印日志 -->
- <logger name="com.atguigu.mock.util.LogUtil"
- level="INFO" additivity="false">
- <appender-ref ref="rollingFile" />
- <!-- <appender-ref ref="console" />-->
- </logger>
- <logger name="com.atguigu.gmallre.mock.task.UserMockTask" level="INFO" additivity="false" >
- <appender-ref ref="console_em" />
- </logger>
-
- <!-- <logger name="com.alibaba.druid.pool" level="error" additivity="false" >-->
- <!-- <appender-ref ref="console" />-->
- <!-- </logger>-->
-
- <!-- <logger name="com.atguigu.edu2021.mock.mapper" level="debug">-->
- <!-- <appender-ref ref="console" />-->
- <!-- </logger>-->
-
- <!-- <logger name="com.atguigu.edu2021.mock.service.impl.UserInfoServiceImpl" level="debug">
- <appender-ref ref="console" />
- </logger>-->
-
- <root level="error" >
- <appender-ref ref="console_em" />
- <!-- <appender-ref ref="async-rollingFile" /> -->
- </root>
- </configuration>
3)生成日志
进入到/opt/module/applog路径,执行以下命令
[atguigu@hadoop102 applog]$ java -jar gmall-remake-mock-2023-02-17.jar test 100 2022-06-08
其中:
① 增加test参数为测试模式,只生成用户行为数据不生成业务数据。
② 100 为产生的用户session数一个session默认产生1条启动日志和5条页面方法日志。
③ 第三个参数为日志数据的日期,测试模式下不会加载配置文件,要指定数据日期只能通过命令行传参实现。
④ 三个参数的顺序必须与示例保持一致
⑤ 第二个参数和第三个参数可以省略,如果test后面不填写参数,默认为1000
在/opt/module/applog/log目录下查看生成日志
[atguigu@hadoop102 log]$ ll
集群日志生成脚本
(1)在/home/atguigu/bin目录下vim脚本lg.sh
(2)在脚本中编写如下内容
- #!/bin/bash
- echo "========== hadoop102 =========="
- ssh hadoop102 "cd /opt/module/applog/; nohup java -jar gmall-remake-mock-2023-02-17.jar $1 $2 $3 >/dev/null 2>&1 &"
- done
注:
①/opt/module/applog/为jar包及配置文件所在路径
②/dev/null代表Linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。
标准输入0:从键盘获得输入 /proc/self/fd/0
标准输出1:输出到屏幕(即控制台) /proc/self/fd/1
错误输出2:输出到屏幕(即控制台) /proc/self/fd/2
(3)修改脚本执行权限(chmod 777)
(4)将jar包及配置文件上传至hadoop103的/opt/module/applog/路径
(5)启动脚本
atguigu@hadoop102 module]$ lg.sh test 100
(6)分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据
- [atguigu@hadoop102 log]$ ls
- app.log
用户行为日志数据通道:
环境准备
集群命令批量执行脚本xcall,vim后修改然后chmod777, 启动是$ xcall jps
- #! /bin/bash
-
- for i in hadoop102 hadoop103 hadoop104
- do
- echo --------- $i ----------
- ssh $i "$*"
- done
注:
jps
命令是 Java Virtual Machine Process Status Tool 的缩写,用于显示 Java 虚拟机(JVM)中正在运行的 Java 进程的信息。它通常用于识别在当前系统中正在运行的 Java 进程,以及它们的进程 ID(PID)和主类名。
jps
命令可以在命令行中直接运行,不需要任何参数。它会列出当前系统中正在运行的所有 Java 进程的信息,包括它们的 PID 和主类名。具体来说,jps
命令的输出包括以下信息:
main()
方法的类。jps
命令可以用于以下情况:
Hadoop安装
Zookeeper安装
Kafka安装
Flume安装
日志采集Flume
Apache Flume 是一个分布式、可靠且高可用的系统,用于收集、聚合和传输大量日志数据或事件数据到中心化数据存储中,如 Apache Hadoop 的 HDFS、Apache HBase、以及 Apache Kafka 等。它的主要作用是简化大规模数据的采集和传输过程,实现数据流的可靠传输,并提供了一套灵活的配置和可扩展的架构,使其能够适应不同类型和规模的数据采集需求。
按照规划,需要采集的用户行为日志存放在hadoop102,故需要在该节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(json)进行校验,然后将校验通过的日志发送到Kafka。↓
(注:
Flume 如何将采集的日志传输给 Kafka 的基本流程:
定义数据流配置: 首先,需要定义 Flume 的数据流配置,包括数据源、数据通道和数据目的地。数据源可以是各种数据来源,如日志文件、网络端口、消息队列等。数据通道用于在 Flume 内部缓存和传输数据。数据目的地则指定数据传输的最终目的地,如 Kafka、HDFS、HBase 等。
启动 Flume Agent: 根据定义的数据流配置,启动 Flume Agent。Flume Agent 是 Flume 的运行实例,负责接收、处理和传输数据。当 Agent 启动后,它会按照配置从数据源获取数据,并将数据传输到指定的目的地。
(在Flume中使用KafkaChannel可省去这一步)配置 Kafka Sink: 在数据流配置中,需要配置一个 Kafka Sink,用于将数据传输到 Kafka。Kafka Sink 是 Flume 提供的一个插件,用于与 Kafka 交互并将数据写入 Kafka 的 Topic 中。在配置 Kafka Sink 时,需要指定 Kafka 的连接信息、Topic 名称等参数。
数据传输到 Kafka: 当 Flume Agent 运行时,它会从数据源获取数据,并通过配置的数据通道将数据传输到 Kafka Sink。Kafka Sink 将接收到的数据转换为 Kafka 的消息格式,并将消息写入指定的 Kafka Topic 中。
数据消费: 一旦数据写入 Kafka Topic,消费者可以使用 Kafka 提供的 API 或工具来消费这些数据。消费者可以订阅相应的 Topic,并实时获取到 Flume 传输的日志数据,进行后续的处理和分析。
通过以上流程,Flume 可以将采集的日志数据可靠地传输到 Kafka 中,实现数据的收集、传输和分发,为后续的数据处理和分析提供了可靠的数据来源。
)
此处可以选择Flume中TaildirSource和KafkaChannel组件,并配置日志校验拦截器。选择原因:
TaildirSource:
TaildirSource 是 Flume 提供的一个源(Source)组件,用于从文件系统中实时读取日志文件的内容,并将其发送到 Flume 的通道(Channel)中。它可以监视指定目录下的日志文件,不断地读取新增的日志内容,并将其转发给 Flume 的后续组件进行处理。TaildirSource 通常用于实时采集应用程序生成的日志数据。
TailDirSource相比ExecSource、SpoolingDirectorySource的优势。
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
KafkaChannel
KafkaChannel 是 Flume 提供的一个通道(Channel)组件,用于与 Apache Kafka 集成,实现将数据从 Flume 传输到 Kafka 中。KafkaChannel 使用 Kafka 作为底层的数据存储和传输介质,将 Flume 接收到的数据写入 Kafka 的 Topic 中,以便后续的数据消费和处理。KafkaChannel 可以保证数据的可靠性和高吞吐量,适用于大规模的数据传输场景。
采用Kafka Channel,省去了Sink,提高了效率。
日志采集Flume关键配置如下:
日志采集Flume配置实操
1)创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf。(mkdir,vim)
2)配置文件内容如下
- #定义组件
- a1.sources = r1
- a1.channels = c1
-
- #配置source
- a1.sources.r1.type = TAILDIR
- a1.sources.r1.filegroups = f1
- a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
- a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
-
- #配置channel
- a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
- a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
- a1.channels.c1.kafka.topic = topic_log
- a1.channels.c1.parseAsFlumeEvent = false
-
- #组装
- a1.sources.r1.channels = c1
日志采集Flume测试
1)启动Zookeeper、Kafka集群
2)启动hadoop102的日志采集Flume
3)启动一个Kafka的Console-Consumer
Kafka 的 console-consumer
是 Kafka 提供的一个命令行工具,用于从 Kafka Topic 中消费消息并在控制台上显示。它是 Kafka 提供的一个实用工具,用于在开发和调试过程中查看和验证 Kafka 中的消息数据。
以下是 console-consumer
命令的基本用法和一些常用选项:
kafka-console-consumer.sh --bootstrap-server <broker-list> --topic <topic-name> [options]
主要参数说明:
--bootstrap-server <broker-list>
: 指定 Kafka 集群的地址列表,用于连接到 Kafka 集群。例如:localhost:9092
。
--topic <topic-name>
: 指定要消费消息的 Kafka Topic 名称。
一些常用选项包括:
--from-beginning
: 从最早的消息开始消费,即使在之前已经有消费者消费过该 Topic 中的消息,也会重新消费所有消息。
--max-messages <num-messages>
: 指定最大消费消息的数量。
--property <key=value>
: 设置 Kafka Consumer 的配置属性,如 auto.offset.reset
, group.id
等。
此处操作代码如下:
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
4)生成数据
5)观察Kafka消费者是否能消费到数据
日志采集Flume启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh
- #!/bin/bash
-
- case $1 in
- "start"){
- echo " --------启动 hadoop102 采集flume-------"
- ssh hadoop102 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
- };;
- "stop"){
- echo " --------停止 hadoop102 采集flume-------"
- ssh hadoop102 "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
- };;
- esac
之后chmod777,启动 f1.sh start 停止 f1.sh stop
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。