赞
踩
目录
1.8.1 Maxwell与Canal、FlinkCDC的对比
1.10.9 reduceByKey与groupByKey的区别
1.10.12 SparkSQL中RDD、DataFrame、DataSet三者的转换及区别
1.10.13 Hive on Spark和Spark on Hive区别
1.10.17 Spark Shuffle和Hadoop Shuffle区别?
1.10.19 Spark任务使用什么进行提交,JavaEE界面还是脚本
1.10.20 请列举会引起Shuffle过程的Spark算子,并简述功能。
1.10.21 Spark操作数据库时,如何减少Spark运行中的数据库连接数?
1.12.2 Flink和Spark Streaming的区别?
1.12.5 Flink任务的并行度优先级设置?资源一般如何配置?
1.12.10 说说Flink中的窗口(分类、生命周期、触发、划分)
1.12.11 Flink的keyby怎么实现的分区?分区、分组的区别是什么?
1.12.12 Flink的Interval Join的实现原理?Join不上的怎么办?
1.14.3 Flink写入Clickhouse怎么保证一致性?
1.14.9 Clickhouse的新特性Projection
1.19.4 ThreadPoolExecutor构造函数参数解析
1.19.6 StringBuffer和StringBuilder的区别
1.19.7 ArrayList和LinkedList的区别
1.19.10 HashMap里面放100条数据,初始化应该是多少
1.21.6 Redis存储的是k-v类型,为什么还会有Hash?
2.15.8 数据仓库每天跑多少张表,大概什么时候运行,运行多久?
2.15.15 Mysql业务库中某张表发生变化,数仓中表需要做什么改变
2.15.23 业务场景:时间跨度比较大,数据模型的数据怎么更新的,例如:借款,使用一年,再还款,这个数据时间跨度大,在处理的时候怎么处理
2.15.24 map任务从0%到1%很慢,但是从1%到100%很快,期间出了什么问题?
2.15.25 数据倾斜场景除了group by 和join外,还有哪些场景
2.15.27 你的公司方向是电子商务,自营的还是提货平台?你们会有自己的商品吗?
2.15.28 ods事实表中订单状态会发生变化,你们是通过什么方式去监测数据变化的
2.15.30 你们维度数据要做ETL吗?除了用户信息脱敏?没有做其他ETL吗
2.15.31 怎么做加密,加密数据要用怎么办,我讲的md5,他问我md5怎么做恢复
3.8.2 项目中哪里用到状态编程,状态是如何存储的,怎么解决大状态问题
3.8.3 项目中哪里遇到了反压,造成的危害,定位解决(*重点*)
3.8.7 Kafka分区动态增加,Flink监控不到新分区数据导致数据丢失
3.8.9 Kafka某个分区没有数据,导致下游水位线无法抬升,窗口无法关闭计算
3.8.10 Hbase的rowkey设计不合理导致的数据热点问题
3.9.1 Flink任务提交使用那种模式,为何选用这种模式
3.9.2 Flink任务提交参数,JobManager和TaskManager分别给多少
3.9.4 项目中Flink作业Checkpoint参数如何设置
3.9.9 如何处理动态分流冷启动问题(主流数据先到,丢失数据怎么处理)
3.9.11 如果现在做了5个Checkpoint,Flink Job挂掉之后想恢复到第三次Checkpoint保存的状态上,如何操作
3.9.12 需要使用flink记录一群人,从北京出发到上海,记录出发时间和到达时间,同时要显示每个人用时多久,需要实时显示,如果让你来做,你怎么设计?
3.9.13 flink内部的数据质量和数据的时效怎么把控的
3.9.16 Prometheus+Grafana是自己搭的吗,监控哪些指标
3.9.18 hbase中有表,里面的1月份到3月份的数据我不要了,我需要删除它(彻底删除),要怎么做
3.9.19 如果flink程序的数据倾斜是偶然出现的,可能白天可能晚上突然出现,然后几个月都没有出现,没办法复现,怎么解决?
3.9.20 维度数据改变之后,如何保证新join的维度数据是正确的数据
10.4 如何快速从40亿条数据中快速判断,数据123是否存在
10.7 匹马赛跑,1个赛道,每次5匹进行比赛,无法对每次比赛计时,但知道每次比赛结果的先后顺序,最少赛多少次可以找出前三名?
10.8 给定一个点、一条线、一个三角形、一个有向无环图,请用java面向对象的思想进行建模
10.9 现场出了一道sql题,让说出sql的优化,优化后效率提升了多少
序号 | 命令 | 命令解释 |
1 | top | 实时显示系统中各个进程的资源占用状况(CPU、内存和执行时间) |
2 | jmap -heap 进程号 | 查看某个进程内存 |
3 | free -m | 查看系统内存使用情况 |
4 | ps -ef | 查看进程 |
5 | netstat -tunlp | grep 端口号 | 查看端口占用情况 |
6 | du -sh 路径* | 查看路径下的磁盘使用情况 例如:$ du -sh /opt/* |
7 | df -h | 查看磁盘存储情况 |
1)awk、sed、cut、sort
2)用Shell写过哪些脚本
(1)集群启动,分发脚本
#!/bin/bash
case $1 in
"start")
for i in hadoop102 hadoop103 hadoop104
do
ssh $i "绝对路径"
done
;;
"stop")
;;
esac
(2)数仓层级内部的导入:ods->dwd->dws ->ads
①#!/bin/bash
②定义变量 APP=gmall
③获取时间
传入 按照传入时间
不传 T+1
④sql="
先按照当前天 写sql => 遇到时间 $do_date 遇到表 {$APP}.
自定义函数 UDF UDTF {$APP}.
"
⑤执行sql
1)在/home/atguigu/bin创建一个test.sh文件
[atguigu@hadoop102 bin]$ vim test.sh
在文件中添加如下内容
#!/bin/bash
do_date=$1
echo '$do_date'
echo "$do_date"
echo "'$do_date'"
echo '"$do_date"'
echo `date`
2)查看执行结果
[atguigu@hadoop102 bin]$ test.sh 2022-02-10
$do_date
2022-02-10
'2022-02-10'
"$do_date"
2022年 05月 02日 星期四 21:02:08 CST
3)总结:
(1)单引号不取变量值
(2)双引号取变量值
(3)反引号`,执行引号中命令
(4)双引号内部嵌套单引号,取出变量值
(5)单引号内部嵌套双引号,不取出变量值
hadoop2.x | hadoop3.x | |
访问HDFS端口 | 50070 | 9870 |
访问MR执行情况端口 | 8088 | 8088 |
历史服务器 | 19888 | 19888 |
客户端访问集群端口 | 9000 | 8020 |
注意:HDFS写入流程时候,某台dataNode挂掉如何运行?
当DataNode突然挂掉了,客户端接收不到这个DataNode发送的ack确认,客户端会通知NameNode,NameNode检查并确认该块的副本与规定的不符,NameNode会通知闲置的DataNode去复制副本,并将挂掉的DataNode作下线处理。等挂掉的DataNode节点恢复后, 删除该节点中曾经拷贝的不完整副本数据。
1)会有什么影响
(1)存储层面
1个文件块,占用namenode多大内存150字节
128G能存储多少文件块? 128 g* 1024m*1024kb*1024byte/150字节 = 9.1亿文件块
(2)计算层面
每个小文件都会起到一个MapTask,1个MapTask默认内存1G。浪费资源。
2)怎么解决
(1)采用har归档方式,将小文件归档
(2)采用CombineTextInputFormat
(3)自己写一个MR程序将产生的小文件合并成一个大文件。如果是Hive或者Spark有merge功能自动帮助我们合并。
(4)有小文件场景开启JVM重用;如果没有小文件,不要开启JVM重用,因为会一直占用使用到的Task卡槽,直到任务完成才释放。
JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间。
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm,if set to -1 ,there is no limit</description>
</property>
1)Hadoop2.x系列,配置NameNode默认2000m
2)Hadoop3.x系列,配置NameNode内存是动态分配的
NameNode内存最小值1G,每增加100万个文件block,增加1G内存。
1)Hadoop调度器重要分为三类
FIFO、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
Apache默认的资源调度器是容量调度器。
CDH默认的资源调度器是公平调度器。
2)区别
FIFO调度器:支持单队列 、先进先出 生产环境不会用。
容量调度器:支持多队列。队列资源分配,优先选择资源占用率最低的队列分配资源;作业资源分配,按照作业的优先级和提交时间顺序分配资源;容器资源分配,本地原则(同一节点/同一机架/不同节点不同机架)。
公平调度器:支持多队列,保证每个任务公平享有队列资源。资源不够时可以按照缺额分配。
大厂:如果对并发度要求比较高,选择公平,要求服务器性能必须OK。
中小公司,集群服务器资源不太充裕选择容量。
4)在生产环境怎么创建队列?
(1)调度器默认就1个default队列,不能满足生产要求。
(2)按照部门:业务部门1、业务部门2。
(3)按照业务模块:登录注册、购物车、下单。
5)创建多队列的好处?
(1)因为担心员工不小心,写递归死循环代码,把所有资源全部耗尽。
(2)实现任务的降级使用,特殊时期保证重要的任务队列资源充足。
业务部门1(重要)=》业务部门2(比较重要)=》下单(一般)=》购物车(一般)=》登录注册(次要)
1)块大小
1.x 64m
2.x 3.x 128m
本地 32m
企业 128m 256m 512m
2)块大小决定因素
磁盘读写速度
普通的机械硬盘 100m/s => 128m
固态硬盘普通的 300m/s => 256m
内存镜像 500-600m/s => 512m
1)出现脑裂的原因
Leader出现故障,系统开始改朝换代,当Follower完成全部工作并且成为Leader后,原Leader又复活了(它的故障可能是暂时断开或系统暂时变慢,不能及时响应,但其NameNode进程还在),并且由于某种原因它对应的ZKFC并没有把它设置为Standby,所以原Leader还认为自己是Leader,客户端向它发出的请求仍会响应,于是脑裂就发生了。
2)Hadoop通常不会出现脑裂。
如果出现脑裂,意味着多个Namenode数据不一致,此时只能选择保留其中一个的数据。例如:现在有三台Namenode,分别为nn1、nn2、nn3,出现脑裂,想要保留nn1的数据,步骤为:
(1)关闭nn2和nn3
(2)在nn2和nn3节点重新执行数据同步命令:hdfs namenode -bootstrapStandby
(3)重新启动nn2和nn3
ls、get、create、delete、deleteall
半数机制(过半机制):2n + 1,安装奇数台。
10台服务器:3台。
20台服务器:5台。
100台服务器:11台。
台数多,好处:提高可靠性;坏处:影响通信延时。
Zookeeper采用过半选举机制,防止了脑裂。
(1)作为HA的协调者:如 HDFS的HA、YARN的HA。
(2)被组件依赖:如Kafka、HBase、CK。
1)Taildir Source
(1)断点续传、多目录
(2)taildir底层原理
(3)Taildir挂了怎么办?
不会丢数:断点续传
重复数据:有可能
(4)存在的问题及解决方案
①问题:
新文件判断条件 = iNode值 + 绝对路径(包含文件名)
日志框架凌晨修改了文件名称=》导致会再次重读一次昨天产生的数据
②解决:
方案一:建议生成的文件名称为带日期的。同时配置日志生成框架为不更名的;
方案二:修改TairDirSource源码,只按照iNode值去确定文件
修改源码视频地址:
https://www.bilibili.com/video/BV1wf4y1G7EQ?p=14&vd_source=891aa1a363111d4914eb12ace2e039af
2)file channel /memory channel/kafka channel
(1)File Channel
数据存储于磁盘,优势:可靠性高;劣势:传输速度低
默认容量:100万个event
注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
(2)Memory Channel
数据存储于内存,优势:传输速度快;劣势:可靠性差
默认容量:100个event
(3)Kafka Channel
数据存储于Kafka,基于磁盘;
优势:可靠性高;
传输速度快 Kafka Channel 大于Memory Channel + Kafka Sink 原因省去了Sink阶段
(4)生产环境如何选择
每天丢几百万数据 pb级 亿万富翁,掉1块钱会捡?
3)HDFS Sink
(1)时间(半个小时) or 大小128m 且 设置Event个数等于0,该值默认10
具体参数:hdfs.rollInterval=1800,hdfs.rollSize=134217728 且 hdfs.rollCount=0
4)事务
1)拦截器注意事项
(1)时间戳拦截器:主要是解决零点漂移问题
2)自定义拦截器步骤
(1)实现 Interceptor
(2)重写四个方法
(3)静态内部类,实现Interceptor.Builder
3)拦截器可以不用吗?
时间戳拦截器建议使用。如果不用需要采用延迟15-20分钟处理数据的方式,比较麻烦。
Replicating:默认选择器。功能:将数据发往下一级所有通道。
Multiplexing:选择性发往指定通道。
1)监控到异常现象
采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。主要是内存不够导致的。
2)解决办法?
(1)自身:默认内存是20m,考虑增加flume内存,在flume-env.sh配置文件中修改flume内存为 4-6g
(2)找朋友:增加服务器台数
搞活动 618 =》增加服务器 =》用完在退出
日志服务器配置:8-16g内存、磁盘8T
生产者、Broker、消费者、Zookeeper。
注意:Zookeeper中保存Broker id和controller等信息,但是没有生产者信息。
Kafka官方为我们实现了三种Partitioner(分区器),分别是DefaultPartitioner(当未指定分区器时候所使用的默认分区器)、UniformStickyPartitioner、RoundRobinPartitioner。
1)DefaultPartitioner默认分区器
下图说明了默认分区器的分区分配策略:
2)UniformStickyPartitioner纯粹的粘性分区器
(1)如果指定了分区号,则会按照指定的分区号进行分配
(2)若没有指定分区好,则使用粘性分区器
3)RoundRobinPartitioner轮询分区器
(1)如果在消息中指定了分区则使用指定分区。
(2)如果未指定分区,都会将消息轮询每个分区,将数据平均分配到每个分区中。
4)自定义分区器
自定义分区策略:可以通过实现 org.apache.kafka.clients.producer.Partitioner 接口,重写 partition 方法来达到自定义分区效果。
例如我们想要实现随机分配,只需要以下代码:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
在项目中,如果希望把MySQL中某张表的数据发送到一个分区。可以以表名为key进行发送。
1)Producer角度
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
2)Broker角度
副本数大于等于2。
min.insync.replicas大于等于2。
ISR(In-Sync Replicas),副本同步队列。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
Kafka分区中的所有副本统称为AR = ISR + OSR
去重 = 幂等性 + 事务
1)幂等性原理
2)幂等性配置参数
参数名称 | 描述 |
enable.idempotence | 是否开启幂等性,默认true,表示开启幂等性。 |
max.in.flight.requests.per.connection | 1.0.X版本前,需设置为1,1.0.X之后,小于等于5 |
retries | 失败重试次数,需要大于0 |
acks | 需要设置为all |
3)Kafka的事务一共有如下5个API
// 1初始化事务
void initTransactions();
// 2开启事务
void beginTransaction() throws ProducerFencedException;
// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4提交事务
void commitTransaction() throws ProducerFencedException;
// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
4)总结
(1)生产者角度
(2)broker服务端角度
(3)消费者
1)Kafka 最多只保证单分区内的消息是有序的,所以如果要保证业务全局严格有序,就要设置 Topic 为单分区。
2)如何保证单分区内数据有序?
注:幂等机制保证数据有序的原理如下:
在ISR中存活为前提,按照AR中排在前面的优先。例如AR[1,0,2],ISR [1,0,2],那么Leader就会按照1,0,2的顺序轮询。
如果Kafka服务器只有4个节点,那么设置Kafka的分区数大于服务器台数,在Kafka底层如何分配存储副本呢?
1)创建16分区,3个副本
(1)创建一个新的Topic,名称为second。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second
(2)查看分区和副本情况。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic second
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
默认保存7天;生产环境建议3天。
日志清理的策略只有delete和compact两种。
1)delete日志删除:将过期数据删除
(1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
log.retention.bytes,默认等于-1,表示无穷大。
思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?
2)compact日志压缩
1)Kafka本身是分布式集群,可以采用分区技术,并行度高
2)读数据采用稀疏索引,可以快速定位要消费的数据
3)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
4)页缓存 + 零拷贝技术
如果Broker端配置参数auto.create.topics.enable设置为true(默认值是true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false。
(1)向一个没有提前创建five主题发送数据
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic five
>hello world
(2)查看five主题的详情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic five
一般我们设置成2个或3个,很多企业设置为2个。
副本的优势:提高可靠性;副本劣势:增加了网络IO传输。
(1)创建一个只有1个分区的Topic。
(2)测试这个Topic的Producer吞吐量和Consumer吞吐量。
(3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:Producer吞吐量 = 20m/s;Consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区
分区数一般设置为:3-10个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。
1)可以通过命令行的方式增加分区,但是分区数只能增加,不能减少。
2)为什么分区数只能增加,不能减少?
(1)按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。
(2)实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?
(3)反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
ODS层:2个
DWD层:20个
拉取数据。
粘性分区:
该分区分配算法是最复杂的一种,可以通过 partition.assignment.strategy 参数去设置,从 0.11 版本开始引入,目的就是在执行新分配时,尽量在上一次分配结果上少做调整,其主要实现了以下2个目标:
(1)Topic Partition 的分配要尽量均衡。
(2)当 Rebalance 发生时,尽量与上一次分配结果保持一致。
注意:当两个目标发生冲突的时候,优先保证第一个目标,这样可以使分配更加均匀,其中第一个目标是3种分配策略都尽量去尝试完成的,而第二个目标才是该算法的精髓所在。
1)Rebalance 的触发条件有三种
(1)当Consumer Group 组成员数量发生变化(主动加入、主动离组或者故障下线等)。
(2)当订阅主题的数量或者分区发生变化。
2)消费者故障下线的情况
描述 | |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
3)主动加入消费者组
在现有集中增加消费者,也会触发Kafka再平衡。注意,如果下游是Flink,Flink会自己维护offset,不会触发Kafka再平衡。
可以在任意offset处消费数据。
kafkaConsumer.seek(topic, 1000);
可以通过时间来消费数据。
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
kafkaConsumer.offsetsForTimes(timestampToSearch);
公司自己开发的监控器。
开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle。
1)发现数据积压
通过Kafka的监控器Eagle,可以看到消费lag,就是积压情况:
2)解决
(1)消费者消费能力不足
①可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)。
增加分区数;
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
②提高每批次拉取的数量,提高单个消费者的消费能力。
描述 | |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条 |
(2)消费者处理能力不行
①消费者,调整fetch.max.bytes大小,默认是50m。
②消费者,调整max.poll.records大小,默认是500条。
如果下游是Spark、Flink等计算引擎,消费到数据之后还要进行计算分析处理,当处理能力跟不上消费能力时,会导致背压的出现,从而使消费的速率下降。
需要对计算性能进行调优(看Spark、Flink优化)。
(3)消息积压后如何处理
某时刻,突然开始积压消息且持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题。
导致消息积压突然增加,只有两种:发送变快了或者消费变慢了。
假如赶上大促或者抢购时,短时间内不太可能优化消费端的代码来提升消费性能,此时唯一的办法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,只能降级一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,保证重要业务服务正常。
假如通过内部监控到消费变慢了,需要你检查消费实例,分析一下是什么原因导致消费变慢?
①优先查看日志是否有大量的消费错误。
②此时如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程卡在哪里「触发死锁或者卡在某些等待资源」。
如何提升吞吐量?
1)提升生产吞吐量
(1)buffer.memory:发送消息的缓冲区大小,默认值是32m,可以增加到64m。
(2)batch.size:默认是16k。如果batch设置太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(3)linger.ms,这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(4)compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的CPU开销。
2)增加分区
3)消费者提高吞吐量
(1)调整fetch.max.bytes大小,默认是50m。
(2)调整max.poll.records大小,默认是500条。
每天总数据量100g,每天产生1亿条日志,10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:50条
高峰每秒钟:1150条 *(2-20倍)= 2300条 - 23000条
每条日志大小:0.5k - 2k(取1k)
每秒多少数据量:2.0M - 20MB
用Kafka官方自带的脚本,对Kafka进行压测。
1)Kafka Producer压力测试
(1)创建一个test Topic,设置为3个分区3个副本
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic test
(2)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下
[atguigu@hadoop105 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 linger.ms=0
参数说明:
输出结果:
ap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 batch.size=16384 linger.ms=0
37021 records sent, 7401.2 records/sec (7.23 MB/sec), 1136.0 ms avg latency, 1453.0 ms max latency.
。。。 。。。
33570 records sent, 6714.0 records/sec (6.56 MB/sec), 4549.0 ms avg latency, 5049.0 ms max latency.
1000000 records sent, 9180.713158 records/sec (8.97 MB/sec), 1894.78 ms avg latency, 5049.00 ms max latency, 1335 ms 50th, 4128 ms 95th, 4719 ms 99th, 5030 ms 99.9th.
(3)调整batch.size大小
(4)调整linger.ms时间
(5)调整压缩方式
(6)调整缓存大小
2)Kafka Consumer压力测试
(1)修改/opt/module/kafka/config/consumer.properties文件中的一次拉取条数为500
max.poll.records=500
(2)消费100万条日志进行压测
[atguigu@hadoop105 kafka]$ bin/kafka-consumer-perf-test.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --messages 1000000 --consumer.config config/consumer.properties
参数说明:
输出结果:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2022-01-20 09:58:26:171, 2022-01-20 09:58:33:321, 977.0166, 136.6457, 1000465, 139925.1748, 415, 6735, 145.0656, 148547.1418
(3)一次拉取条数为2000
(4)调整fetch.max.bytes大小为100m
kafka底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。
建议选择普通的机械硬盘。
每天总数据量:1亿条 * 1k ≈ 100g
100g * 副本2 * 保存时间3天 / 0.7 ≈ 1T
建议三台服务器硬盘总大小,大于等于1T。
Kafka内存组成:堆内存 + 页缓存
1)Kafka堆内存建议每个节点:10g ~ 15g
在kafka-server-start.sh中修改
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi
(1)查看Kafka进程号
[atguigu@hadoop102 kafka]$ jps
2321 Kafka
5255 Jps
1931 QuorumPeerMain
(2)根据Kafka进程号,查看Kafka的GC情况
[atguigu@hadoop102 kafka]$ jstat -gc 2321 1s 10
S0C S1C S0U S1U EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT GCT
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 60416.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
0.0 7168.0 0.0 7168.0 103424.0 61440.0 1986560.0 148433.5 52092.0 46656.1 6780.0 6202.2 13 0.531 0 0.000 0.531
参数说明:
YGC:年轻代垃圾回收次数;
(3)根据Kafka进程号,查看Kafka的堆内存
[atguigu@hadoop102 kafka]$ jmap -heap 2321
… …
Heap Usage:
G1 Heap:
regions = 2048
capacity = 2147483648 (2048.0MB)
used = 246367744 (234.95458984375MB)
free = 1901115904 (1813.04541015625MB)
11.472392082214355% used
2)页缓存:
页缓存是Linux系统服务器的内存。我们只需要保证1个segment(1g)中25%的数据在内存中就好。
每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如10个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g
建议服务器内存大于等于11G。
1)默认配置
num.io.threads = 8 负责写磁盘的线程数。
num.replica.fetchers = 1 副本拉取线程数。
num.network.threads = 3 数据传输线程数。
2)建议配置
此外还有后台的一些其他线程,比如清理数据线程,Controller负责感知和管控整个集群的线程等等,这样算,每个Broker都会有上百个线程存在。根据经验,4核CPU处理几十个线程在高峰期会打满,8核勉强够用,而且再考虑到集群上还要运行其他的服务,所以部署Kafka的服务器一般建议在16核以上可以应对一两百个线程的工作,如果条件允许,给到24核甚至32核就更好。
num.io.threads = 16 负责写磁盘的线程数。
num.replica.fetchers = 2 副本拉取线程数。
num.network.threads = 6 数据传输线程数。
服务器建议购买 32核CPU
网络带宽 = 峰值吞吐量 ≈ 20MB/s ,选择千兆网卡即可。
100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
一般百兆的网卡(100Mbps=12.5m/s)、千兆的网卡(1000Mbps=125m/s)、万兆的网卡(1250m/s)。
一般百兆的网卡(100Mbps)、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。100Mbps单位是bit;10M/s单位是byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。
通常选用千兆或者是万兆网卡。
在生产环境中,如果某个Kafka节点挂掉。
正常处理办法:
(1)先看日志,尝试重新启动一下,如果能启动正常,那直接解决。
(2)如果重启不行,检查内存、CPU、网络带宽。调优=》调优不行增加资源
(3)如果将Kafka整个节点误删除,如果副本数大于等于2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。
可以通过bin/kafka-reassign-partitions.sh脚本服役和退役节点。
Kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中,常常会出现一条消息大于1M,如果不对Kafka进行配置。则会出现生产者无法将消息推送到Kafka或消费者无法去消费Kafka里面的数据,这时我们就要对Kafka进行以下配置:server.properties。
描述 | |
message.max.bytes | 默认1m,Broker端接收每个批次消息最大值。 |
max.request.size | 默认1m,生产者发往Broker每个请求消息最大值。针对Topic级别设置消息体的大小。 |
replica.fetch.max.bytes | 默认1m,副本同步数据,每个批次消息最大值。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
重点调优参数:
(1)buffer.memory 32m
(2)batch.size:16k
(3)linger.ms默认0 调整 5-100ms
(4)compression.type采用压缩 snappy
(5)消费者端调整fetch.max.bytes大小,默认是50m。
(6)消费者端调整max.poll.records大小,默认是500条。
(7)单条日志大小:message.max.bytes、max.request.size、replica.fetch.max.bytes适当调整2-10m
(8)Kafka堆内存建议每个节点:10g ~ 15g
在kafka-server-start.sh中修改
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi
(9)增加CPU核数
num.io.threads = 8 负责写磁盘的线程数
num.replica.fetchers = 1 副本拉取线程数
num.network.threads = 3 数据传输线程数
(10)日志保存时间log.retention.hours 3天
(11)副本数,调整为2
(1)解析器(SQLParser):将SQL字符串转换成抽象语法树(AST)
(2)语义分析器(Semantic Analyzer):将AST进一步抽象为QueryBlock(可以理解为一个子查询划分成一个QueryBlock)
(2)逻辑计划生成器(Logical Plan Gen):由QueryBlock生成逻辑计划
(3)逻辑优化器(Logical Optimizer):对逻辑计划进行优化
(4)物理计划生成器(Physical Plan Gen):根据优化后的逻辑计划生成物理计划
(5)物理优化器(Physical Optimizer):对物理计划进行优化
(6)执行器(Execution):执行该计划,得到查询结果并返回给客户端
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
1)数据存储位置
Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
2)数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的。
3)执行延迟
Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
4)数据规模
Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
元数据、原始数据
1)删除数据时
内部表:元数据、原始数据,全删除
外部表:元数据 只删除
2)在公司生产环境下,什么时候创建内部表,什么时候创建外部表?
在公司中绝大多数场景都是外部表。
自己使用的临时表,才会创建内部表;
1)数值函数
(1)round:四舍五入;(2)ceil:向上取整;(3)floor:向下取整
2)字符串函数
(1)substring:截取字符串;(2)replace:替换;(3)regexp_replace:正则替换
(4)regexp:正则匹配;(5)repeat:重复字符串;(6)split:字符串切割
(7)nvl:替换null值;(8)concat:拼接字符串;
(9)concat_ws:以指定分隔符拼接字符串或者字符串数组;
(10)get_json_object:解析JSON字符串
3)日期函数
(1)unix_timestamp:返回当前或指定时间的时间戳
(2)from_unixtime:转化UNIX时间戳(从 1970-01-01 00:00:00 UTC 到指定时间的秒数)到当前时区的时间格式
(3)current_date:当前日期
(4)current_timestamp:当前的日期加时间,并且精确的毫秒
(5)month:获取日期中的月;(6)day:获取日期中的日
(7)datediff:两个日期相差的天数(结束日期减去开始日期的天数)
(8)date_add:日期加天数;(9)date_sub:日期减天数
(10)date_format:将标准日期解析成指定格式字符串
4)流程控制函数
(1)case when:条件判断函数
(2)if:条件判断,类似于Java中三元运算符
5)集合函数
(1)array:声明array集合
(2)map:创建map集合
(3)named_struct:声明struct的属性和值
(4)size:集合中元素的个数
(5)map_keys:返回map中的key
(6)map_values:返回map中的value
(7)array_contains:判断array中是否包含某个元素
(8)sort_array:将array中的元素排序
6)聚合函数
(1)collect_list:收集并形成list集合,结果不去重
(2)collect_set:收集并形成set集合,结果去重
1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
(1)目前项目中逻辑不是特别复杂就没有用自定义UDF和UDTF
(2)自定义UDF:继承G..UDF,重写核心方法evaluate
(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
2)企业中一般什么场景下使用UDF/UDTF?
(1)因为自定义函数,可以将自定函数内部任意计算过程打印输出,方便调试。
(2)引入第三方jar包时,也需要。
一般在场景题中出现手写:分组TopN、行转列、列转行。
按照功能,常用窗口可划分为如下几类:聚合函数、跨行取值函数、排名函数。
1)聚合函数
max:最大值。
min:最小值。
sum:求和。
avg:平均值。
count:计数。
2)跨行取值函数
(1)lead和lag
注:lag和lead函数不支持自定义窗口。
(2)first_value和last_value
3)排名函数
注:rank 、dense_rank、row_number不支持自定义窗口。
一个分组聚合的查询语句,默认是通过一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个Hash Table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至Reduce端,完成最终的聚合。
相关参数如下:
--启用map-side聚合,默认是true
set hive.map.aggr=true;
--用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;否则,认为该表数据不适合进行map-side聚合,后续数据便不再进行map-side聚合。
set hive.map.aggr.hash.min.reduction=0.5;
--用于检测源表是否适合map-side聚合的条数。
set hive.groupby.mapaggr.checkinterval=100000;
--map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
Hive中默认最稳定的Join算法是Common Join。其通过一个MapReduce Job完成一个Join操作。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
优化Join的最为常用的手段就是Map Join,其可通过两个只有Map阶段的Job完成一个join操作。第一个Job会读取小表数据,将其制作为Hash Table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。
注:由于Map Join需要缓存整个小标的数据,故只适用于大表Join小表的场景。
相关参数如下:
--启动Map Join自动转换
set hive.auto.convert.join=true;
--开启无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;
--无条件转Map Join小表阈值,默认值10M,推荐设置为Map Task总内存的三分之一到二分之一
set hive.auto.convert.join.noconditionaltask.size=10000000;
上节提到,Map Join只适用于大表Join小表的场景。若想提高大表Join大表的计算效率,可使用Sort Merge Bucket Map Join。
需要注意的是SMB Map Join有如下要求:
(1)参与Join的表均为分桶表,且分桶字段为Join的关联字段。
(2)两表分桶数呈倍数关系。
(3)数据在分桶内是按关联字段有序的。
SMB Join的核心原理如下:只要保证了上述三点要求的前两点,就能保证参与Join的两张表的分桶之间具有明确的关联关系,因此就可以在两表的分桶间进行Join操作了。
若能保证第三点,也就是参与Join的数据是有序的,这样就能使用数据库中常用的Join算法之一——Sort Merge Join了,Merge Join原理如下:
在满足了上述三点要求之后,就能使用SMB Map Join了。
由于SMB Map Join无需构建Hash Table也无需缓存小表数据,故其对内存要求很低。适用于大表Join大表的场景。
Reduce端的并行度,也就是Reduce个数,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。
Reduce端的并行度的相关参数如下:
--指定Reduce端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces;
--Reduce端并行度最大值
set hive.exec.reducers.max;
--单个Reduce Task计算的数据量,用于估算Reduce并行度
set hive.exec.reducers.bytes.per.reducer;
Reduce端并行度的确定逻辑如下:
若指定参数mapreduce.job.reduces的值为一个非负整数,则Reduce并行度为指定值。否则,Hive自行估算Reduce并行度,估算逻辑如下:
假设Job输入的文件大小为totalInputBytes
参数hive.exec.reducers.bytes.per.reducer的值为bytesPerReducer。
参数hive.exec.reducers.max的值为maxReducers。
则Reduce端的并行度为:
根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了。
需要说明的是:若使用Tez或者是Spark引擎,Hive可根据计算统计信息(Statistics)估算Reduce并行度,其估算的结果相对更加准确。
若Hive的Reduce并行度设置不合理,或者估算不合理,就可能导致计算结果出现大量的小文件。该问题可由小文件合并任务解决。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
相关参数为:
--开启合并map only任务输出的小文件
set hive.merge.mapfiles=true;
--开启合并map reduce任务输出的小文件
set hive.merge.mapredfiles=true;
--合并后的文件大小
set hive.merge.size.per.task=256000000;
--触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;
谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。开启谓词下推优化后,无需调整SQL语句,Hive就会自动将过滤操作尽可能的前移动。
相关参数为:
--是否启动谓词下推(predicate pushdown)优化
set hive.optimize.ppd = true;
Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。但是某SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行。相关参数如下:
--启用并行执行优化,默认是关闭的
set hive.exec.parallel=true;
--同一个sql允许最大并行度,默认为8
set hive.exec.parallel.thread.number=8;
CBO是指Cost based Optimizer,即基于计算成本的优化。
在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在Hive的MR引擎下主要用于Join的优化,例如多表Join的Join顺序。
相关参数为:
--是否启用cbo优化
set hive.cbo.enable=true;
采用ORC列式存储加快查询速度。
id name age
1 zs 18
2 lishi 19
行:1 zs 18 2 lishi 19
列:1 2 zs lishi 18 19
select name from user
压缩减少磁盘IO:因为Hive底层计算引擎默认是MR,可以在Map输出端采用Snappy压缩。
Map(Snappy ) Reduce
(1)创建分区表 防止后续全表扫描
(2)创建分桶表 对未知的复杂的数据进行提前采样
1)MR/Tez/Spark区别:
MR引擎:多Job串联,基于磁盘,落盘的地方比较多。虽然慢,但一定能跑出结果。一般处理,周、月、年指标。
Spark引擎:虽然在Shuffle过程中也落盘,但是并不是所有算子都需要Shuffle,尤其是多算子过程,中间过程不落盘 DAG有向无环图。 兼顾了可靠性和效率。一般处理天指标。
2)Tez引擎的优点
(1)使用DAG描述任务,可以减少MR中不必要的中间节点,从而减少磁盘IO和网络IO。
(2)可更好的利用集群资源,例如Container重用、根据集群资源计算初始任务的并行度等。
(3)可在任务运行时,根据具体数据量,动态的调整后续任务的并行度。
(1)减少join的表数量:不影响业务前提,可以考虑将一些表进行预处理和合并,从而减少join操作。
(2)使用Map Join:将小表加载到内存中,从而避免了Reduce操作,提高了性能。通过设置hive.auto.convert.join为true来启用自动Map Join。
(3)使用Bucketed Map Join:通过设置hive.optimize.bucketmapjoin为true来启用Bucketed Map Join。
(4)使用Sort Merge Join:这种方式在Map阶段完成排序,从而减少了Reduce阶段的计算量。通过设置hive.auto.convert.sortmerge.join为true来启用。
(5)控制Reduce任务数量:通过合理设置hive.exec.reducers.bytes.per.reducer和mapreduce.job.reduces参数来控制Reduce任务的数量。
(6)过滤不需要的数据:join操作之前,尽量过滤掉不需要的数据,从而提高性能。
(7)选择合适的join顺序:将小表放在前面可以减少中间结果的数据量,提高性能。
(8)使用分区:可以考虑使用分区技术。只需要读取与查询条件匹配的分区数据,从而减少数据量和计算量。
(9)使用压缩:通过对数据进行压缩,可以减少磁盘和网络IO,提高性能。注意选择合适的压缩格式和压缩级别。
(10)调整Hive配置参数:根据集群的硬件资源和实际需求,合理调整Hive的配置参数,如内存、CPU、IO等,以提高性能。
数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往同一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。以下为生产环境中数据倾斜的现象:
Hive中的数据倾斜常出现在分组聚合和join操作的场景中,下面分别介绍在上述两种场景下的优化思路。
1)分组聚合导致的数据倾斜
前文提到过,Hive中的分组聚合是由一个MapReduce Job完成的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。若group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜。
由分组聚合导致的数据倾斜问题,有如下解决思路:
(1)判断倾斜的值是否为null
若倾斜的值为null,可考虑最终结果是否需要这部分数据,若不需要,只要提前将null过滤掉,就能解决问题。若需要保留这部分数据,考虑以下思路。
开启Map-Side聚合后,数据会现在Map端完成部分聚合工作。这样一来即便原始数据是倾斜的,经过Map端的初步聚合后,发往Reduce的数据也就不再倾斜了。最佳状态下,Map端聚合能完全屏蔽数据倾斜问题。
相关参数如下:
set hive.map.aggr=true;
set hive.map.aggr.hash.min.reduction=0.5;
set hive.groupby.mapaggr.checkinterval=100000;
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
(3)Skew-GroupBy优化
Skew-GroupBy是Hive提供的一个专门用来解决分组聚合导致的数据倾斜问题的方案。其原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,并完成部分聚合,第二个MR按照分组字段分区,完成最终聚合。
相关参数如下:
--启用分组聚合数据倾斜优化
set hive.groupby.skewindata=true;
2)Join导致的数据倾斜
若Join操作使用的是Common Join算法,就会通过一个MapReduce Job完成计算。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
如果关联字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
由Join导致的数据倾斜问题,有如下解决思路:
(1)Map Join
使用Map Join算法,Join操作仅在Map端就能完成,没有Shuffle操作,没有Reduce阶段,自然不会产生Reduce端的数据倾斜。该方案适用于大表Join小表时发生数据倾斜的场景。
相关参数如下:
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
(2)Skew Join
若参与Join的两表均为大表,Map Join就难以应对了。此时可考虑Skew Join,其核心原理是Skew Join的原理是,为倾斜的大key单独启动一个Map Join任务进行计算,其余key进行正常的Common Join。原理图如下:
相关参数如下:
--启用skew join优化
set hive.optimize.skewjoin=true;
--触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key=100000;
3)调整SQL语句
若参与Join的两表均为大表,其中一张表的数据是倾斜的,此时也可通过以下方式对SQL语句进行相应的调整。
假设原始SQL语句如下:A,B两表均为大表,且其中一张表的数据是倾斜的。
hive (default)>
select
*
from A
join B
on A.id=B.id;
其Join过程如下:
图中1001为倾斜的大key,可以看到,其被发往了同一个Reduce进行处理。
调整之后的SQL语句执行计划如下图所示:
调整SQL语句如下:
hive (default)>
select
*
from(
select --打散操作
concat(id,'_',cast(rand()*2 as int)) id,
value
from A
)ta
join(
select --扩容操作
concat(id,'_',1) id,
value
from B
union all
select
concat(id,'_',2) id,
value
from B
)tb
on ta.id=tb.id;
Hive 默认的字段分隔符为Ascii码的控制符\001(^A),建表的时候用fields terminated by '\001'。注意:如果采用\t或者\001等为分隔符,需要要求前端埋点和JavaEE后台传递过来的数据必须不能出现该分隔符,通过代码规范约束。
一旦传输过来的数据含有分隔符,需要在前一级数据中转义或者替换(ETL)。通常采用Sqoop和DataX在同步数据时预处理。
id name age
1 zs 18
2 li分隔符si 19
元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)。
(1)MySQL备份数据脚本(建议每天定时执行一次备份元数据)
#/bin/bash
#常量设置
MYSQL_HOST='hadoop102'
MYSQL_USER='root'
MYSQL_PASSWORD='000000'
# 备份目录,需提前创建
BACKUP_DIR='/root/mysql-backup'
# 备份天数,超过这个值,最旧的备份会被删除
FILE_ROLL_COUNT='7'
# 备份MySQL数据库
[ -d "${BACKUP_DIR}" ] || exit 1
mysqldump \
--all-databases \
--opt \
--single-transaction \
--source-data=2 \
--default-character-set=utf8 \
-h"${MYSQL_HOST}" \
-u"${MYSQL_USER}" \
-p"${MYSQL_PASSWORD}" | gzip > "${BACKUP_DIR}/$(date +%F).gz"
if [ "$(ls "${BACKUP_DIR}" | wc -l )" -gt "${FILE_ROLL_COUNT}" ]
then
ls "${BACKUP_DIR}" | sort |sed -n 1p | xargs -I {} -n1 rm -rf "${BACKUP_DIR}"/{}
fi
(2)MySQL恢复数据脚本
#/bin/bash
#常量设置
MYSQL_HOST='hadoop102'
MYSQL_USER='root'
MYSQL_PASSWORD='000000'
BACKUP_DIR='/root/mysql-backup'
# 恢复指定日期,不指定就恢复最新数据
RESTORE_DATE=''
[ "${RESTORE_DATE}" ] && BACKUP_FILE="${RESTORE_DATE}.gz" || BACKUP_FILE="$(ls ${BACKUP_DIR} | sort -r | sed -n 1p)"
gunzip "${BACKUP_DIR}/${BACKUP_FILE}" --stdout | mysql \
-h"${MYSQL_HOST}" \
-u"${MYSQL_USER}" \
-p"${MYSQL_PASSWORD}"
create table dept_partition2(
deptno int, -- 部门编号
dname string, -- 部门名称
)
partitioned by (day string, hour string)
row format delimited fields terminated by '\t';
(1)union会将联合的结果集去重
(2)union all不会对结果集去重
1)DataX与Sqoop都是主要用于离线系统中批量同步数据处理场景。
2)DataX和Sqoop区别如下:
(1)DataX底层是单进程多线程;Sqoop底层是4个Map;
(2)数据量大的场景优先考虑Sqoop分布式同步;数据量小的场景优先考虑DataX,完全基于内存;DataX数据量大,可以使用多个DataX实例,每个实例负责一部分(手动划分)。
(3)Sqoop是为Hadoop而生的,对Hadoop相关组件兼容性比较好;Datax是插件化开发,支持的Source和Sink更多一些。
(4)Sqoop目前官方不在升级维护;DataX目前阿里在升级维护
(5)关于运行日志与统计信息,DataX更丰富,Sqoop基于Yarn不容易采集
1)关键优化参数如下:
参数 | 说明 |
job.setting.speed.channel | 总并发数 |
job.setting.speed.record | 总record限速 |
job.setting.speed.byte | 总byte限速 |
core.transport.channel.speed.record | 单个channel的record限速,默认值为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
2)生效优先级:
(1)全局Byte限速 / 单Channel Byte限速
(2)全局Record限速 / 单Channel Record限速
两个都设置,取结果小的
(3)上面都没设置,总Channel数的设置生效
3)项目配置
只设置 总channel数=5,基本可以跑满网卡带宽。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
1)MySQL(null) => Hive (\N) 要求Hive建表语句
解决该问题的方案有两个:
(1)修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考记Datax3.0解决MySQL抽数到HDFSNULL变为空字符的问题_datax nullformat_谭正强的博客-CSDN博客。
(2)在Hive中建表时指定null值存储格式为空字符串(''),例如:
DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/base_province/';
2)Hive(\N) => MySQL (null)
"reader": {
"name": "hdfsreader",
"parameter": {
"defaultFS": "hdfs://hadoop102:8020",
"path": "/base_province",
"column": [
"*"
],
"fileType": "text",
"compress": "gzip",
"encoding": "UTF-8",
"nullFormat": "\\N",
"fieldDelimiter": "\t",
}
}
(1)一个表一个配置,如果有几千张表,怎么编写的配置?
(2)脚本使用说明
python gen_import_config.py -d database -t table
1)全量同步的表如下
以上全部加一起30万条,约等于300m。
加购表(每天增量20万、全量100万 =》1g)
所以Datax每天全量同步的数据1-2g左右。
注意:金融、保险(平安 、民生银行),只有业务数据数据量大一些。
2)增量同步的表如下
增量数据每天1-2g
获取今天新增和变化的数据:通过sql过滤,创建时间是今天或者操作时间等于今天。
1)FlinkCDC、Maxwell、Canal都是主要用于实时系统中实时数据同步处理场景。
FlinkCDC | Maxwell | Canal | |
SQL与数据条数关系 | SQL影响几条出现几条 | SQL影响几条出现几条 | 只有一整条(后续可能需要炸开) |
数据初始化功能(同步全量数据) | 有(支持多库多表同时做) | 有(单表) | 无 |
断点续传功能 | 有(放在CK) | 有(存在MySQL) | 有(本地) |
支持断点续传。
全量初始化同步。
自动根据库名和表名把数据发往Kafka的对应主题。
MySQL主从复制。
同步速度慢,全量同步建议采用Sqoop或者DataX。
同步历史数据时,bootstrap会扫描所有数据。
同时maxwell会监听binlog变化。
例如:用bootstrap同步历史数据库时,历史数据库中新插入一条数据,这时bootstrap扫描到,maxwell进程也监控到了,这时就会出现数据重复问题。
1.3.9版本,支持邮件、企业微信。
2.0.3版本,支持的报警信息更全一些,配置更容易。
3.0.0以上版本,支持数据质量监控。
每天跑100多个指标,有活动时跑200个左右。
(1)运行成功或者失败都会发邮件、发钉钉、集成自动打电话。
(2)最主要的解决方案就是,看日志,解决问题。
(3)报警网站睿象云,睿象云-智能运维管理平台-智能运维系统-自动化运维性能监控平台
(4)双11和618活动需要24小时值班
看日志报错原因:直接重启,资源不够增加资源在重启。
(1)Local:运行在一台机器上。测试用。
(2)Standalone:是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。
(3)Yarn:采用Hadoop的资源调度器。 国内大量使用。
Yarn-client模式:Driver运行在Client上(不在AM里)
Yarn-cluster模式:Driver在AM上
(4)Mesos:国内很少使用。
(5)K8S:趋势,但是目前不成熟,需要的配置信息太多。
(1)4040 spark-shell任务端口
(2)7077 内部通讯端口。类比Hadoop的8020/9000
(3)8080 查看任务执行情况端口。 类比Hadoop的8088
(4)18080 历史服务器。类比Hadoop的19888
注意:由于Spark只负责计算,所有并没有Hadoop中存储数据的端口9870/50070。
主要表现为存储弹性、计算弹性、任务(Task、Stage)弹性、数据位置弹性,具体如下:
(1)自动进行内存和磁盘切换
(2)基于lineage的高效容错
(3)Task如果失败会特定次数的重试
(4)Stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
(5)Checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
(6)数据调度弹性:DAG Task 和资源管理无关
(7)数据分片的高度弹性repartion
1)单Value
(1)map
(2)mapPartitions
(3)mapPartitionsWithIndex
(4)flatMap
(5)groupBy
(6)filter
(7)distinct
(8)coalesce
(9)repartition
(10)sortBy
2)双vlaue
(1)intersection
(2)union
(3)subtract
(4)zip
3)Key-Value
(1)partitionBy
(2)reduceByKey
(3)groupByKey
(4)sortByKey
(5)mapValues
(6)join
(1)reduce
(2)collect
(3)count
(4)first
(5)take
(6)save
(7)foreach
(1)map:每次处理一条数据
(2)mapPartitions:每次处理一个分区数据
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)。
2)区别:
repartition一定会发生Shuffle,coalesce根据传入的参数来判断是否发生Shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
reduceByKey:具有预聚合操作。
groupByKey:没有预聚合。
在不影响业务逻辑的前提下,优先采用reduceByKey。
宽依赖和窄依赖。有Shuffle的是宽依赖。
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
DataFrame和DataSet的区别:前者是row类型
RDD和DataSet及DataSet的区别:前者没有字段和表信息
元数据 | 执行引擎 | 语法 | 生态 | |
Hive on Spark | MySQL | rdd | HQL | 更加完善 |
Spark on Hive (Spark SQL ) | MySQL | df ds | Spark SQL | 有欠缺(权限管理、元数据管理) |
内置Hive | derby | |||
外置Hive | MySQL |
1)提交流程(重点)
2)Shuffle流程(重点)
(1)SortShuffle:减少了小文件。
中间落盘应该是本地磁盘
生成的文件数 = Task数量*2
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。
(4)bypassShuffle:减少了小文件,不排序,效率高。在不需要排序的场景使用。
1)统一内存管理的堆内内存结构如下图
2)统一内存管理的动态占用机制如下图
1)内存&硬盘
(1)MR在Map阶段会在溢写阶段将中间结果频繁的写入磁盘,在Reduce阶段再从磁盘拉取数据。频繁的磁盘IO消耗大量时间。
(2)Spark不需要将计算的中间结果写入磁盘。这得益于Spark的RDD,在各个RDD的分区中,各自处理自己的中间结果即可。在迭代计算时,这一优势更为明显。
2)Spark DAG任务划分减少了不必要的Shuffle
(1)对MR来说,每一个Job的结果都会落地到磁盘。后续依赖于次Job结果的Job,会从磁盘中读取数据再进行计算。
(2)对于Spark来说,每一个Job的结果都可以保存到内存中,供后续Job使用。配合Spark的缓存机制,大大的减少了不必要的Shuffle。
3)资源申请粒度:进程&线程
开启和调度进程的代价一般情况下大于线程的代价。
(1)MR任务以进程的方式运行在Yarn集群中。N个MapTask就要申请N个进程
(2)Spark的任务是以线程的方式运行在进程中。N个MapTask就要申请N个线程。
(1)Hadoop不用等所有的MapTask都结束后开启ReduceTask;Spark必须等到父Stage都完成,才能去Fetch数据。
(2)Hadoop的Shuffle是必须排序的,那么不管是Map的输出,还是Reduce的输出,都是分区内有序的,而Spark不要求这一点。
参考答案:
https://blog.csdn.net/gamer_gyt/article/details/79135118
1)在提交任务时的几个重要参数
executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M
2)给一个提交任务的样式
spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
Shell脚本。海豚调度器可以通过页面提交Spark任务。
reduceBykey:
groupByKey:
…ByKey:
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
详见Hive on Spark数据倾斜讲解。
Flink程序在运行时主要有TaskManager,JobManager,Client三种角色。
JobManager是集群的老大,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理TaskManager。 包含:Dispatcher、ResourceManager、JobMaster。
TaskManager是执行计算的节点,每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络。内部划分slot隔离内存,不隔离cpu。同一个slot共享组的不同算子的subtask可以共享slot。
Client是Flink程序提交的客户端,将Flink Job提交给JobManager。
Flink | Spark Streaming | |
计算模型 | 流计算 | 微批次 |
时间语义 | 三种 | 没有,处理时间 |
乱序 | 有 | 没有 |
窗口 | 多、灵活 | 少、不灵活(窗口长度必须是 批次的整数倍) |
checkpoint | 异步分界线快照 | 弱 |
状态 | 有,多 | 没有(updatestatebykey) |
流式sql | 有 | 没有 |
1)Flink提交流程(Yarn-Per-Job)
2)算子链路:Operator Chain
Flink自动做的优化,要求One-to-one,并行度相同。
代码disableOperatorChaining()禁用算子链。
3)Graph生成与传递
在哪里生成 | 传递给谁 | 做了什么事 | |
逻辑流图StreamGraph | Client | Client | 最初的DAG图 |
作业流图JobGraph | Client | JobManager | 算子链路优化 |
执行流图ExecutionGraph | JobManager | JobManager | 并行度的细化 |
物理流图 |
4)Task、Subtask的区别
Subtask:算子的一个并行实例。
Task:Subtask运行起来之后,就叫Task。
5)并行度和Slot的关系
Slot是静态的概念,是指TaskMangaer具有的并发执行能力。
并行度是动态的概念,指程序运行时实际使用的并发能力。
设置合适的并行度能提高运算效率,太多太少都不合适。
6)Slot共享组了解吗,如何独享Slot插槽
默认共享组时default,同一共享组的task可以共享Slot。
通过slotSharingGroup()设置共享组。
1)Local:本地模式,Flink作业在单个JVM进程中运行,适用于测试阶段
2)Standalone:Flink作业在一个专门的Flink集群上运行,独立模式不依赖于其他集群管理器(Yarn或者Kubernetes)
3)Yarn:
Per-job:独享资源,代码解析在Client
Application:独享资源,代码解析在JobMaster
Session:共享资源,一套集群多个job
4)K8s:支持云原生,未来的趋势
5)Mesos:国外使用,仅作了解
设置并行度有多种方式,优先级:算子 > 全局Env > 提交命令行 > 配置文件
1)并行度根据任务设置:
(1)常规任务:Source,Transform,Sink算子都与Kafka分区保持一致
(2)计算偏大任务:Source,Sink算子与Kafka分区保持一致,Transform算子可设置成2的n次方,64,128…
2)资源设置:通用经验 1CU = 1CPU + 4G内存
Taskmanager的Slot数:1拖1(独享资源)、1拖N(节省资源,减少网络传输)
TaskManager的内存数:4~8G
TaskManager的CPU:Flink默认一个Slot分配一个CPU
JobManager的内存:2~4G
JobManager的CPU:默认是1
3)资源是否足够:
资源设置,然后压测,看每个并行度处理上限,是否会出现反压
例如:每个并行度处理5000/s,开始出现反压,比如我们设置三个并行度,我们程序处理上限15000/s
事件时间Event Time:是事件创建的时间。数据本身携带的时间。
进入时间Ingestion Time:是数据进入Flink的时间。
处理时间Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
1)分类:
间歇性:来一条数据,更新一次Watermark。
周期性:固定周期更新Watermark。
官方提供的API是基于周期的,默认200ms,因为间歇性会给系统带来压力。
2)生成原理:
Watermark = 当前最大事件时间 - 乱序时间 - 1ms
3)传递:
Watermark是一条携带时间戳的特殊数据,从代码指定生成的位置,插入到流里面。
一对多:广播。
多对一:取最小。
多对多:拆分来看,其实就是上面两种的结合。
在Apache Flink中,迟到时间(lateness)和乱序时间(out-of-orderness)是两个与处理时间和事件时间相关的概念。它们在流处理过程中,尤其是在处理不按事件时间排序的数据时非常重要。
(1)迟到时间(lateness):迟到时间可以影响窗口,在窗口计算完成后,仍然可以接收迟到的数据
迟到时间是指事件到达流处理系统的延迟时间,即事件的实际接收时间与其事件时间的差值。在某些场景下,由于网络延迟、系统故障等原因,事件可能会延迟到达。为了处理这些迟到的事件,Flink提供了一种机制,允许在窗口计算完成后仍然接受迟到的数据。设置迟到时间后,Flink会在窗口关闭之后再等待一段时间,以便接收并处理这些迟到的事件。
设置迟到时间的方法如下:
在定义窗口时,使用`allowedLateness`方法设置迟到时间。例如,设置迟到时间为10分钟:
```java
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(Time.minutes(10))
.<window function>;
```
(2)乱序时间(out-of-orderness)
乱序时间是通过影响水印来影响数据的摄入,它表示的是数据的混乱程度。
乱序时间是指事件在流中不按照事件时间的顺序到达。在某些场景下,由于网络延迟或数据源的特性,事件可能会乱序到达。
Flink提供了处理乱序事件的方法,即水位线(watermark)。
水位线是一种表示事件时间进展的机制,它告诉系统当前处理到哪个事件时间。
当水位线到达某个值时,说明所有时间戳小于该值的事件都已经处理完成。
为了处理乱序事件,可以为水位线设置一个固定的延迟。
设置乱序时间的方法如下:
在定义数据源时,使用`assignTimestampsAndWatermarks`方法设置水位线策略。例如,设置水位线延迟为5秒:
```java
DataStream<T> input = env.addSource(<source>);
input
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(<timestamp assigner>))
.<other operations>;
```
1)窗口分类:
Keyed Window和Non-keyed Window
基于时间:滚动、滑动、会话。
基于数量:滚动、滑动。
2)Window口的4个相关重要组件:
assigner(分配器):如何将元素分配给窗口。
function(计算函数):为窗口定义的计算。其实是一个计算函数,完成窗口内容的计算。
triger(触发器):在什么条件下触发窗口的计算。
可以使用自定义触发器,解决事件时间,没有数据到达,窗口不触发计算问题,还可以使用持续性触发器,实现一个窗口多次触发输出结果,详细看连接
问题展示:https://www.bilibili.com/video/BV1Gv4y1H7F8/?spm_id_from=333.999.0.0&vd_source=891aa1a363111d4914eb12ace2e039af
问题解决:https://www.bilibili.com/video/BV1mM411N7uP/?spm_id_from=333.999.0.0&vd_source=891aa1a363111d4914eb12ace2e039af
evictor(退出器):定义从窗口中移除数据。
3)窗口的划分:如,基于事件时间的滚动窗口
Start = 按照数据的事件时间向下取窗口长度的整数倍。
end = start + size
比如开了一个10s的滚动窗口,第一条数据是857s,那么它属于[850s,860s)。
4)窗口的创建:当属于某个窗口的第一个元素到达,Flink就会创建一个窗口,并且放入单例集合
5)窗口的销毁:时间进展 >= 窗口最大时间戳 + 窗口允许延迟时间
(Flink保证只删除基于时间的窗口,而不能删除其他类型的窗口,例如全局窗口)。
6)窗口为什么左闭右开:属于窗口的最大时间戳 = end - 1ms
7)窗口什么时候触发:如基于事件时间的窗口 watermark >= end - 1ms
分组和分区在 Flink 中具有不同的含义和作用:
分区:分区(Partitioning)是将数据流划分为多个子集,这些子集可以在不同的任务实例上进行处理,以实现数据的并行处理。
数据具体去往哪个分区,是通过指定的 key 值先进行一次 hash 再进行一次 murmurHash,通过上述计算得到的值再与并行度进行相应的计算得到。
分组:分组(Grouping)是将具有相同键值的数据元素归类到一起,以便进行后续操作(如聚合、窗口计算等)。key 值相同的数据将进入同一个分组中。
注意:数据如果具有相同的 key 将一定去往同一个分组和分区,但是同一分区中的数据不一定属于同一组。
API:window join
interval join
connect + coGroup
SQL:regular join : 默认不清理状态,结合ttl
inner : 没有回撤数据
left、right : 有回撤数据
interval join : where between xxx and xxx
lookup join (现查外部系统)
temporal join :时态表join(保存多版本)
底层调用的是keyby + connect ,处理逻辑:
(1)判断是否迟到(迟到就不处理了,直接return)
(2)每条流都存了一个Map类型的状态(key是时间戳,value是List存数据)
(3)任一条流,来了一条数据,遍历对方的map状态,能匹配上就发往join方法
(4)使用定时器,超过有效时间范围,会删除对应Map中的数据(不是clear,是remove)
Interval join不会处理join不上的数据,如果需要没join上的数据,可以用 coGroup+join算子实现,或者直接使用flinksql里的left join或right join语法。
(1)算子状态:作用范围是算子,算子的多个并行实例各自维护一个状态
(2)键控状态:每个分组维护一个状态
(3)状态后端:两件事=》 本地状态存哪里、checkpoint存哪里
1.13版本之前
本地状态 Checkpoint
内存 TaskManager的内存 JobManager内存
文件 TaskManager的内存 HDFS
RocksDB RocksDB HDFS
1.13版本之后
本地状态
Hashmap() TaskManager的内存
RocksDB RocksDB
Checkpoint存储 参数指定
barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。
例如,在Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。
然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。
一旦Sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向Checkpoint协调器确认快照n完成。
在所有Sink确认快照后,意味快照着已完成。一旦完成快照n,Job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。
barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。
例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。
然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。
一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。
在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。
(1)间隔:兼顾性能和延迟,一般任务设置分钟级(1~5min),要求延迟低的设置秒级
(2)Task重启策略(Failover):
固定延迟重启策略:重试几次、每次间隔多久。
失败率重启策略:重试次数、重试区间、重试间隔。
无重启策略:一般在开发测试时使用。
Fallback重启策略:默认固定延迟重启策略。
(1)预加载:open()方法,查询维表,存储下来 ==》 定时查询
(2)热存储:存在外部系统Redis、HBase等
(3)广播维表
(4)Lookup Join:外部存储,connector创建,SQL用法
(1)FlinkCDC 1.x同步历史数据会锁表
设置参数不加锁,但只能保证至少一次。
(2)2.x 实现了无锁算法,同步历史数据的时候不会锁表
Flink Spark Streaming
计算模型 流计算 微批次
时间语义 三种 没有,处理时间
乱序 有 没有
窗口 多、灵活 少、不灵活(窗口长度必须是 批次的整数倍)
checkpoint 异步分界线快照 弱
状态 有,多 没有(updatestatebykey)
流式sql 有 没有
1)时间语义
Event Time:是事件创建的时间。数据本身携带的时间。
Ingestion Time:是数据进入Flink的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
(1)Watermark 是一种衡量 Event Time 进展的机制,是一个逻辑时钟
(2)Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现
(3)基于事件时间,用来触发窗口、定时器等
(4)watermark主要属性就是时间戳,可以理解一个特殊的数据,插入到流里面
(5)watermark是单调不减的
(6)数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,如果后续还有timestamp 小于 Watermark 的数据到达,称为迟到数据
1)窗口分类:
Keyed Window和Non-keyed Window
基于时间:滚动、滑动、会话
基于数量:滚动、滑动
2)窗口的划分:如,基于事件时间的滚动窗口
start=按照数据的事件时间向下取窗口长度的整数倍
end=start+size
比如开了一个10s的滚动窗口,第一条数据是857s,那么它属于[850s,860s)
3)窗口的创建:当属于某个窗口的第一个元素到达,Flink就会创建一个窗口,并且放入单例集合
4)窗口的销毁:时间进展 >= 窗口最大时间戳 + 窗口允许延迟时间
(Flink保证只删除基于时间的窗口,而不能删除其他类型的窗口,例如全局窗口)。
1)watermark的乱序等待时间
2)使用窗口时,可以允许迟到
3)迟到特别久的,放到侧输出流处理
1) 算子状态:每个并行实例 各自维护状态
List
unionList
键控状态:经过keyby的,每个key各自维护状态
value、list、map
广播
2)状态后端
<1.13
本地状态 checkpoint存储
memory TM内存 JM内存
fs TM内存 HDFS
rocksdb TM的Rocksdb HDFS
>=1.13
本地状态
hashmap TM内存
rocksdb TM的rocksdb
checkpoint存储单独自己指定: JM内存、hdfs
又短又小的,一般作业: hashmap ===》 快
又大又长的: rocksdb ===》 增量,比hashmap快、节省资源
我们怎么用: 基本都是hashmap,个别状态大的用rocksdb(3个)
开启State访问性能监控
Flink 1.13 中引入了 State 访问的性能监控,即 latency trackig state。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用此功能。
开启增量检查点和本地恢复
1)开启增量检查点
RocksDB是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:
state.backend.incremental: true #默认false,改为true。
或代码中指定
new EmbeddedRocksDBStateBackend(true)
2)开启本地恢复
当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据。本地恢复目前仅涵盖键控类型的状态后端(RocksDB),MemoryStateBackend不支持本地恢复并忽略此选项。
state.backend.local-recovery: true
3)设置多目录
如果有多块磁盘,也可以考虑指定本地多目录
调整预定义选项
Flink针对不同的设置为RocksDB提供了一些预定义的选项集合,其中包含了后续提到的一些参数,如果调整预定义选项后还达不到预期,再去调整后面的block、writebuffer等参数。
当前支持的预定义选项有DEFAULT、SPINNING_DISK_OPTIMIZED、SPINNING_DISK_OPTIMIZED_HIGH_MEM或FLASH_SSD_OPTIMIZED。有条件上SSD的,可以指定为FLASH_SSD_OPTIMIZED
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM
#设置为机械硬盘+内存模式
①反压的原因
短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增。
②反压的危害
会影响到checkpoint 时长和 state 大小,甚至可能会导致资源耗尽甚至系统崩溃
checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。
③定位反压
利用web ui定位
定位到造成反压的节点,排查的时候,先把operator chain禁用,方便定位到具体算子。
Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。
通过WebUI看具体算子的BackPressure显示High。
分析瓶颈算子
如果处于反压状态,那么有两种可能性:
(1)该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。这种情况,该节点是反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。
(2)下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。这种情况,需要继续排查下游节点,一直找到第一个为OK的一般就是根源节点。
总体来看,如果我们找到第一个出现反压的节点,反压根源要么是就这个节点,要么是它紧接着的下游节点。
通常来讲,第二种情况更常见。如果无法确定,还需要结合 Metrics进一步判断。
利用Metrics定位
可以根据指标分析反压
可以分析数据传输
④处理反压
反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。如果反压是暂时的,应该忽略它。
查看是否数据倾斜
使用火焰图分析看顶层的哪个函数占据的宽度最大。只要有"平顶"(plateaus),就表示该函数可能存在性能问题。
分析GC日志
数据倾斜现象
相同 Task 的多个 Subtask 中,个别Subtask 接收到的数据量明显大于其他 Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。
数据倾斜解决
- LocalKeyBy
Flink是实时流处理,如果keyby之后的聚合操作存在数据倾斜,且没有开窗口(没攒批)的情况下,简单的认为使用两阶段聚合,是不能解决问题的。在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
- keyBy 之前发生数据倾斜
如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少。对于不存在 keyBy 的 Flink 任务也会出现该情况。
这种情况,需要让 Flink 任务强制进行shuffle。使用shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。
- keyBy 后的窗口聚合操作存在数据倾斜
第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚合
注意:聚合完不再是WindowedStream,要获取WindowEnd作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
第二阶段聚合:按照原来的key及windowEnd作keyby、聚合
source端: 是kafka,可重发
flink:开启checkpoint,设置为精准一次(介绍checkpoint原理、barrier对齐)
sink端:幂等、事务(2pc),结合kafka介绍2pc过程
(1)固定延迟重启策略:如果发生故障,系统会尝试重新启动作业n次,并在连续重启尝试之间等待n秒。
(2)固定延迟重启策略:尝试给定次数重新启动作业,如果超过最大尝试次数,则作业最终会失败。在两次连续重启尝试之间,重启策略等待一段固定的时间。
(1)对指定的key调用自身的hashCode方法=》key.hashcode =》keyHash
(2)调用murmruhash算法,进行第二次hash =》MathUtils.murmurHash(keyHash) % maxParallelism =》keygroupid
(3)计算出当前数据应该去往哪个下游分区:
keyGroupId * parallelism / maxParallelism
(4)键组id * 下游算子并行度 / 最大并行度(默认128)
flink run -t yarn-per-job
-p 并行度3,与kafka分区保持一致,效率最高
-D JM内存,2G,不需要处理数据,不用给太大
-D TM内存,4G, 按照1cu原则,观察过webui的内存metrics
-D 每个TM的slot数 3,节省资源, 3不浪费slot
-D
-c 全类名
jar包路径
架构角色:
1)Master
实现类为HMaster,负责监控集群中所有的 RegionServer 实例。主要作用如下:
(1)管理元数据表格hbase:meta,接收用户对表格创建修改删除的命令并执行
(2)监控region是否需要进行负载均衡,故障转移和region的拆分。
通过启动多个后台线程监控实现上述功能:
①LoadBalancer负载均衡器
周期性监控region分布在regionServer上面是否均衡,由参数hbase.balancer.period控制周期时间,默认5分钟。
②CatalogJanitor元数据管理器
定期检查和清理HBase:meta中的数据。meta表内容在进阶中介绍。
③MasterProcWAL Master预写日志处理器
把Master需要执行的任务记录到预写日志WAL中,如果Master宕机,让backupMaster读取日志继续干。
2)Region Server
Region Server实现类为HRegionServer,主要作用如下:
(1)负责数据cell的处理,例如写入数据put,查询数据get等
(2)拆分合并Region的实际执行者,有Master监控,有regionServer执行。
3)Zookeeper
HBase通过Zookeeper来做Master的高可用、记录RegionServer的部署信息、并且存储有meta表的位置信息。
HBase对于数据的读写操作时直接访问Zookeeper的,在2.3版本推出Master Registry模式,客户端可以直接访问Master。使用此功能,会加大对Master的压力,减轻对Zookeeper的压力。
4)HDFS
HDFS为HBase提供最终的底层数据存储服务,同时为HBase提供高容错的支持。
写流程:
写流程顺序正如API编写顺序,首先创建HBase的重量级连接
(1)读取本地缓存中的Meta表信息;(第一次启动客户端为空)
(2)向ZK发起读取Meta表所在位置的请求;
(3)ZK正常返回Meta表所在位置;
(4)向Meta表所在位置的RegionServer发起请求读取Meta表信息;
(5)读取到Meta表信息并将其缓存在本地;
(6)向待写入表发起写数据请求;
(7)先写WAL,再写MemStore,并向客户端返回写入数据成功。
创建连接同写流程。
(1)读取本地缓存中的Meta表信息;(第一次启动客户端为空)
(2)向ZK发起读取Meta表所在位置的请求;
(3)ZK正常返回Meta表所在位置;
(4)向Meta表所在位置的RegionServer发起请求读取Meta表信息;
(5)读取到Meta表信息并将其缓存在本地;
(6)MemStore、StoreFile、BlockCache
同时构建MemStore与StoreFile的扫描器,
MemStore:正常读
StoreFile:
根据索引确定待读取文件;
再根据BlockCache确定读取文件;
(7)合并多个位置读取到的数据,给用户返回最大版本的数据,如果最大版本数据为删除标记,则不给不返回任何数据。
Compaction分为两种,分别是Minor Compaction和Major Compaction。
(1)rowkey长度原则
(2)rowkey散列原则
(3)rowkey唯一原则
大量用户信息保存在HBase中。
2)热点问题:
由于用户的id是连续的,批量导入用户数据后,很有可能用户信息都集中在同一个region中。如果用户信息频繁访问,很有可能该region的节点成为热点。
3)期望: 通过对Rowkey的设计,使用户数据能够分散到多个region中。
4)步骤:
(1)预分区
通过命令
create 'GMALL:DIM_USER_INFO','INFO',SPLITS=>['20','40','60','80']
把用户信息表(GMALL:DIM_USER_INFO) 分为5个region : [00-20), [20-40), [40-60), [60-80), [80-99]
(2)写入时反转ID
把用户ID左补零10位(根据最大用户数),然后反转顺序。
比如:用户id为1457,反转处理后变为7541000000; 根据前两位分到region [60-80),
用户id为1459,反转处理后变为9541000000;根据前两位分到 region [80-99]
这样连续的用户ID反转后由于Rowkey开头并不连续,会进入不同的region中。
最终达到的效果可以通过Web UI进行观察:
如上图,用户数据会分散到多个分区中。
注意:在用户查询时,也同样根据需要把ID进行反转后进行查询。
1)原理
协处理器:协助处理数据,可以在向原始表中写入数据之后向索引表中写入一条索引数据。
2)种类及用法
(1)全局 读多写少
单独创建表专门用于存储索引,索引表数据量比原始表小,读取更快速。但是写操作会写两张表的数据,跨Region,需要多个连接。
(2)本地 写多读少
将索引数据与原表放在一起(Region),加在一起比原表数据量大,读取相对变慢,但是由于在一个Region,所以写操作两条数据用的是同一个连接。
快:提供了丰富的表引擎,每个表引擎 都做了尽可能的优化。
为什么快?
(1)向量化
(2)列式
(3)尽可能使用本节点的 内存+cpu,不依赖其他组件,比如Hadoop
(4)提供了sql化的语言
(5)支持自定义函数
(6)提供了丰富的表引擎,引擎都经过了优化
(1)Log
(2)Special:Memory、Distributed
(3)MergeTree: replacingmergetree、summingmergetree
replicatedmergetree
(4)集成引擎: 外部系统映射,如MySQL
Clickhouse没有事务,Flink写入是至少一次语义。
利用Clickhouse的ReplacingMergeTree引擎会根据主键去重,但只能保证最终一致性。查询时加上final关键字可以保证查询结果的一致性。
10几张宽表,每天平均10来G,存储一年。
需要磁盘 10G * 365天 * 2副本/0.7 = 约11T
1)我们用的本地表,2个副本
2)分布式表写入存在的问题:
假如现有一个2分片的集群,使用clickhouse插入分布式表。
(1)资源消耗问题:在分片2的数据写入临时目录中会产生写放大现象,会大量消耗分片节点1的CPU和磁盘等资源。
(2)数据准确性和一致性问题:在写入分片2的时候,节点1或节点2的不正常都会导致数据问题。(节点1挂了数据丢失、节点2挂了或者节点2表删了节点1会无限制重试,占用资源)。
(3)part过多问题:每个节点每秒收到一个Insert Query,N个节点,分发N-1次,一共就是每秒生成Nx(N-1)个part目录。集群shard数越多,分发产生的小文件也会越多(如果写本地表就会相对集中些),最后会导致写入到MergeTree的Part的数会特别多,最后会拖垮整个文件的系统。
一种查询结果的持久化,记录了查询语句和对应的查询结果。
优点:查询速度快,要是把物化视图这些规则全部写好,它比原数据查询快了很多,总的行数少了,因为都预计算好了。
缺点:它的本质是一个流式数据的使用场景,是累加式的技术,所以要用历史数据做去重、去核这样的分析,在物化视图里面是不太好用的。在某些场景的使用也是有限的。而且如果一张表加了好多物化视图,在写这张表的时候,就会消耗很多机器的资源,比如数据带宽占满、存储一下子增加了很多。
1)内存优化
max_memory_usage: 单个查询的内存上限,128G内存的服务器==》 设为100G
max_bytes_before_external_group_by:设为 一半,50G
max_bytes_before_external_sort:设为一半,50G
2)CPU
max_concurrent_queries: 默认 100/s ===> 300/s
3)存储
SSD更快
4)物化视图
5)写入时攒批,避免写入过快导致 too many parts
Projection 意指一组列的组合,可以按照与原表不同的排序存储,并且支持聚合函数的查询。ClickHouse Projection 可以看做是一种更加智能的物化视图,它有如下特点:
1)part-level存储
相比普通物化视图是一张独立的表,Projection 物化的数据就保存在原表的分区目录中,支持明细数据的普通Projection 和 预聚合Projection。
2)无感使用,自动命中
可以对一张 MergeTree 创建多个 Projection ,当执行 Select 语句的时候,能根据查询范围,自动匹配最优的 Projection 提供查询加速。如果没有命中 Projection , 就直接查询底表。
3)数据同源、同生共死
因为物化的数据保存在原表的分区,所以数据的更新、合并都是同源的,也就不会出现不一致的情况了。
1)索引
(1)一级索引:稀疏索引(主键索引) 粒度8192
(2)二级索引:跳数索引 minmax、set、bloom_filter等
2)底层存储
Clickhouse默认数据目录在/var/lib/clickhouse/data目录中。所有的数据库都会在该目录中创建一个子文件夹。下图展示了Clickhouse对数据文件的组织。
202103_1_10_2 | 目录 | 分区目录,由分区+LSM生成的 |
detached | 目录 | 通过DETACH语句卸载后的表分区存放位置 |
format_version.txt | 文本文件 | 纯文本,记录存储的格式 |
分区目录命名 = 分区ID_最小数据块编号_最大数据块编号_层级构成。数据块编号从1开始自增,新创建的数据块最大和最小编号相同,当发生合并时会将其修改为合并的数据块编号。同时每次合并都会将层级增加1。
开源:Echarts(百度)、Kibana、Superset(功能一般)
收费:Tableau(功能强大)、QuickBI(阿里云面对实时)、DataV(阿里云面对实时)、Suga(百度实时)
1)Sqoop参数
/opt/module/sqoop/bin/sqoop import \
--connect \
--username \
--password \
--target-dir \
--delete-target-dir \
--num-mappers \
--fields-terminated-by \
--query "$2" ' and $CONDITIONS;'
2)Sqoop导入导出Null存储一致性问题
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。
3)Sqoop数据导出一致性问题
场景1:如Sqoop在导出到MySQL时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。
官网:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html
Since Sqoop breaks down export process into multiple transactions, it is possible that a failed export job may result in partial data being committed to the database. This can further lead to subsequent jobs failing due to insert collisions in some cases, or lead to duplicated data in others. You can overcome this problem by specifying a staging table via the --staging-table option which acts as an auxiliary table that is used to stage exported data. The staged data is finally moved to the destination table in a single transaction.
–staging-table方式
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string '\N'
4)Sqoop底层运行的任务是什么
只有Map阶段,没有Reduce阶段的任务。默认是4个MapTask。
5)Sqoop一天导入多少数据
100万日活=》10万订单,1人10条,每天1g左右业务数据
Sqoop每天将1G的数据量导入到数仓。
6)Sqoop数据导出的时候一次执行多长时间
每天晚上00:10开始执行,Sqoop任务一般情况20-30分钟的都有。取决于数据量(11.11,6.18等活动在1个小时左右)。
7)Sqoop在导出数据的时候数据倾斜
Sqoop参数撇嘴: split-by:按照自增主键来切分表的工作单元。
num-mappers:启动N个map来并行导入数据,默认4个;
8)Sqoop数据导出Parquet(项目中遇到的问题)
Ads层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式
(1)创建临时表,把Parquet中表数据导入到临时表,把临时表导出到目标表用于可视化
(2)ads层建表的时候就不要建Parquet表
1)集群每天跑多少指标?
每天跑100多个指标,有活动时跑200个左右。
2)任务挂了怎么办?
(1)运行成功或者失败都会发邮件、发钉钉、集成自动打电话(项目中遇到的问题)
(2)最主要的解决方案就是重新跑。
(3)报警网站睿象云-智能运维管理平台-智能运维系统-自动化运维性能监控平台
多线程是指程序中包含多个执行流,即一个程序中可以同时运行多个不同的线程来执行不同的任务。
优点:可以提高cpu的利用率。多线程中,一个线程必须等待的时候,cpu可以运行其它的线程而不是等待,这样大大提高了程序的效率。
Java 3种常见创建多线程的方式
(1)继承Thread类,重run()方法
(2)实现Runnable接口,重写run()方法
(3)通过创建线程池实现
Executors提供了线程工厂方法用于创建线程池,返回的线程池都实现了ExecutorServer接口。
虽然Java自带的工厂方法很便捷,但都有弊端,《阿里巴巴Java开发手册》中强制线程池不允许使用以上方法创建,而是通过ThreadPoolExecutor的方式,这样处理可以更加明确线程池运行规则,规避资源耗尽的风险。
(1)corePoolSize 创建线程池的线程数量
(2)maximumPoolSize 线程池的最大线程数
(3)keepAliveTime 当线程数量大于corePoolSize ,空闲的线程当空闲时间超过keepAliveTime时就会回收;
(4)unit { keepAliveTime} 时间单位
(5)workQueue 保留任务的队列
SynchronizedMap、ConcurrentHashMap
(1)StringBuffer中的方法大都采用synchronized关键字进行修饰,是线程安全的,效率低。
(2)StringBuilder是线程不安全的,效率高。
(1)ArrayList基于动态数据实现,LinkedList基于链表实现,两者都是线程不安全的
(2)ArrayList基于数组,查询快;linkedList基于链表,新增和删除更快
(3)LinkedList不支持高效的随机访问
1)继承的父类不同
HashMap继承AbstractMap类
HashTable继承Dictionary类(已经废弃的类),用比较少
2)是否线程安全
HashMap是线程不安全的效率高,HashTable是线程安全的,效率低。
3)key和value是否允许null值
Hashtable中,key和value都不允许出现null值。HashMap中,都可出现null。
1)HashMap的实现原理
HashMap实际上是一个数组和链表的结合体,HashMap基于Hash算法实现的;
(1)当我们向HashMap中Put元素时,利用key的hashCode重新计算出当前对象的元素在数组中的下标
(2)写入时,如果出现Hash值相同的key,此时分类,如果key相同,则覆盖原始值;如果key不同,value则放入链表中
(3)读取时,直接找到hash值对应的下标,在进一步判断key是否相同,进而找到对应值
2)HashMap在JDK1.7和JDK1.8中有哪些区别
JDK1.7:数组 + 链表
JDK1.8:数组+红黑树
3)HashMap的Put方法具体流程
4)HashMap的扩容
HashMap中的键值对大于阈值或者初始化时,就调用resize()进行扩容。
每次扩展的时候都是扩展2倍。
扩容因子0.75
100/0.75≈133.3
初始化134
对比项 | MyISAM | InnoDB |
外键 | 不支持 | 支持 |
事务 | 不支持 | 支持 |
行表锁 | 表锁,即使操作一条记录也会锁住整个表,不适合高并发的操作 | 行锁,操作时只锁某一行,不对其它行有影响,适合高并发的操作 |
缓存 | 只缓存索引,不缓存真实数据 | 不仅缓存索引还要缓存真实数据,对内存要求较高,而且内存大小对性能有决定性的影响 |
1)唯一索引
主键索引是唯一的,通常以表的ID设置为主键索引,一个表只能有一个主键索引,这是他跟唯一索引的区别。
2)聚簇索引
聚簇索引的叶子节点都包含主键值、事务 ID、用于事务 MVCC 的回滚指针以及所有的剩余列。
3)辅助索引(非聚簇索引|二级索引)
辅助索引也叫非聚簇索引,二级索引等,其叶子节点存储的不是行指针而是主键值,得到主键值再要查询具体行数据的话,要去聚簇索引中再查找一次,也叫回表。这样的策略优势是减少了当出现行移动或者数据页分裂时二级索引的维护工作。
4)联合索引
两个或两个以上字段联合组成一个索引。使用时需要注意满足最左匹配原则!
(1)事务的基本要素(ACID)
(2)事务的并发问题
脏读:事务A读取了事务B更新的数据,然后B回滚操作,那么A读取到的数据是脏数据
不可重复读:事务 A 多次读取同一数据,事务 B 在事务A多次读取的过程中,对数据作了更新并提交,导致事务A多次读取同一数据时,结果 不一致
幻读:系统管理员A将数据库中所有学生的成绩从具体分数改为ABCDE等级,但是系统管理员B就在这个时候插入了一条具体分数的记录,当系统管理员A改结束后发现还有一条记录没有改过来,就好像发生了幻觉一样,这就叫幻读。
小结:不可重复读的和幻读很容易混淆,不可重复读侧重于修改,幻读侧重于新增或删除。解决不可重复读的问题只需锁住满足条件的行,解决幻读需要锁表
事务隔离级别 | 脏读 | 不可重复读 | 幻读 |
读未提交(read-uncommitted) | 是 | 是 | 是 |
不可重复读(read-committed) | 否 | 是 | 是 |
可重复读(repeatable-read) | 否 | 否 | 是 |
串行化(serializable) | 否 | 否 | 否 |
(1)InnoDB的数据文件本身就是索引文件,而MyISAM索引文件和数据文件是分离的:
①InnoDB的表在磁盘上存储在以下文件中: .ibd(表结构、索引和数据都存在一起,MySQL5.7表结构放在.frm中)
②MyISAM的表在磁盘上存储在以下文件中: *.sdi(描述表结构,MySQL5.7是.frm)、*.MYD(数据),*.MYI(索引)
(2)InnoDB中主键索引是聚簇索引,叶子节点中存储完整的数据记录;其他索引是非聚簇索引,存储相应记录主键的值 。
(3)InnoDB要求表必须有主键 ( MyISAM可以没有 )。如果没有显式指定,则MySQL系统会自动选择一个可以非空且唯一标识数据记录的列作为主键。如果不存在这种列,则MySQL自动为InnoDB表生成一个隐含字段作为主键。
(4)MyISAM中无论是主键索引还是非主键索引都是非聚簇的,叶子节点记录的是数据的地址。
(5)MyISAM的回表操作是十分快速的,因为是拿着地址偏移量直接到文件中取数据的,反观InnoDB是通过获取主键之后再去聚簇索引里找记录,虽然说也不慢,但还是比不上直接用地址去访问。
1)B+ 树和 B 树的差异
(1)B+树中非叶子节点的关键字也会同时存在子节点中,并且是在子节点中所有关键字的最大值(或最小)。
(2)B+树中非叶子节点仅用于索引,不保存数据记录,跟记录有关的信息都放在叶子节点中。而B树中,非叶子节点既保存索引,也保存数据记录。
(3)B+树中所有关键字都在叶子节点出现,叶子节点构成一个有序链表,而且叶子节点本身按照关键字的大小从小到大顺序链接。
2)B+树为什么IO的次数会更少
真实环境中一个页存放的记录数量是非常大的(默认16KB),假设指针与键值忽略不计(或看做10个字节),数据占 1 kb 的空间:
如果B+树只有1层,也就是只有1个用于存放用户记录的节点,最多能存放16条记录。
如果B+树有2层,最多能存放1600×16 = 25600条记录。
如果B+树有3层,最多能存放1600×1600×16 = 40960000条记录。
如果存储千万级别的数据,只需要三层就够了。
B+树的非叶子节点不存储用户记录,只存储目录记录,相对B树每个节点可以存储更多的记录,树的高度会更矮胖,IO次数也会更少。
(1)缓存穿透是指查询一个一定不存在的数据。由于缓存命不中时会去查询数据库,查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,造成缓存穿透。
解决方案:
①是将空对象也缓存起来,并给它设置一个很短的过期时间,最长不超过5分钟。
②采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。
(2)如果缓存集中在一段时间内失效,发生大量的缓存穿透,所有的查询都落在数据库上,就会造成缓存雪崩。
解决方案:尽量让失效的时间点不分布在同一个时间点。
(3)缓存击穿,是指一个key非常热点,在不停的扛着大并发,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
解决方案:可以设置key永不过期。
(1)主从复制中反客为主的自动版,如果主机Down掉,哨兵会从从机中选择一台作为主机,并将它设置为其他从机的主机,而且如果原来的主机再次启动的话也会成为从机。
(2)哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
(3)当哨兵监测到Redis主机宕机,会自动将Slave切换成Master,然后通过发布订阅模式通知其他服务器,修改配置文件,让他们换主机。
(4)当一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此可以使用哨兵进行监控, 各个哨兵之间还会进行监控,这就形成了多哨兵模式。
提供一种简单实现缓存失效的思路:LRU(最近少用的淘汰)。
即Redis的缓存每命中一次,就给命中的缓存增加一定TTL(过期时间)(根据具体情况来设定, 比如10分钟)。
一段时间后,热数据的TTL都会较大,不会自动失效,而冷数据基本上过了设定的TTL就马上失效了。
Redis 默认开启RDB持久化方式,在指定的时间间隔内,执行指定次数的写操作,则将内存中的数据写入到磁盘中。
RDB 持久化适合大规模的数据恢复但它的数据一致性和完整性较差。
Redis 需要手动开启AOF持久化方式,默认是每秒将写操作日志追加到AOF文件中。
AOF 的数据完整性比RDB高,但记录内容多了,会影响数据恢复的效率。
Redis 针对 AOF文件大的问题,提供重写的瘦身机制。
若只打算用Redis 做缓存,可以关闭持久化。
若打算使用Redis 的持久化。建议RDB和AOF都开启。其实RDB更适合做数据的备份,留一后手。AOF出问题了,还有RDB。
Redis的hash数据结构是一个键值对(key-value)集合,他是一个String类型的field和value的映射表,Redis本身就是一个key-value 类型的数据库,因此Hash数据结构等于在原来的value上又套了一层key-vlaue型数据。所以Redis 的hash数据类型特别适合存储关系型对象。
关注尚硅谷教育公众号,回复 java。
云上数据仓库解决方案:云上大数据仓库解决方案_阿里云大数据_ODPS_阿里云
随着公司的发展,老板需要详细的了解公司的运营情况。比如,日活、新增、留存、转化率等。所以公司决定招聘大数据人才来做这个项目,目的是为老板做决策提供数据支持。
(1)输入系统:前端埋点产生的用户行为数据、JavaEE后台产生的业务数据、个别公司有爬虫数据。
(2)输出系统:报表系统、用户画像系统、推荐系统。
1)Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)。
2)CDH6.3.2:国内使用最多的版本。CDH和HDP合并后推出,CDP7.0。收费标准,10000美金一个节点每年。(不建议使用)
3)HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少。
4)云服务选择
(1)阿里云的EMR、MaxCompute、DataWorks
(2)腾讯云EMR、流计算Oceanus、数据开发治理平台WeData
(3)华为云EMR
(4)亚马逊云EMR
星环国际、金蝶。。。神策、数梦、袋鼠云
Apache框架各组件重要版本发版时间
框架 | 版本号 | 发版时间 | 框架 | 版本号 | 发版时间 |
Hadoop | 2.7.2 | 2017-06 | Spark | 1.6.0 | 2016-01 |
3.0.0 | 2018-03 | 2.0.0 | 2016-07 | ||
3.1.3 | 2020-07 | 2.2.0 | 2018-05 | ||
Zookeeper | 3.4.12 | 2018-05 | 2.4.0 | 2018-11 | |
3.4.14 | 2019-04 | 3.0.0 | 2020-06 | ||
3.5.8 | 2020-05 | 2.4.8 | 2022-06 | ||
3.7.0 | 2021-03 | 3.2.0 | 2021-10 | ||
3.8.0 | 2022-03 | 3.3.0 | 2022-06 | ||
Flume | 1.9.0 | 2019-01 | Flink | 1.7.0 | 2018-11 |
1.10.0 | 2022-03 | 1.8.0 | 2019-04 | ||
1.11.0 | 2022-10 | 1.9.0 | 2019-08 | ||
Kafka | 1.0.0 | 2017-11 | 1.10.0 | 2020-02 | |
2.0.0 | 2018-07 | 1.11.0 | 2020-07 | ||
2.3.0 | 2019-03 | 1.12.0 | 2020-12 | ||
2.4.0 | 2019-12 | 1.13.0 | 2021-04 | ||
2.7.0 | 2020-12 | 1.13.6 | 2022-02 | ||
3.0.0 | 2021-09 | 1.14.0 | 2021-09 | ||
Hive | 1.2.1 | 2015-06 | 1.15.0 | 2022-05 | |
2.0.0 | 2016-02 | 1.16.0 | 2022-10 | ||
2.2.0 | 2017-07 | DolphinScheduler | 1.2.0(最早) | 2020-01 | |
3.0.0 | 2018-05 | 1.3.9 | 2021-10 | ||
2.3.6 | 2019-08 | 2.0.0 | 2021-11 | ||
3.1.2 | 2019-08 | 3.0.0 | 2022-08 | ||
2.3.7 | 2020-04 | Doris | 0.13.0(最早) | 2020-10 | |
3.1.3 | 2022-04 | 0.14.0 | 2021-05 | ||
HBase | 1.2.0 | 2016-02 | 0.15.0 | 2021-11 | |
1.4.0 | 2017-12 | 1.1.0 | 2022-07 | ||
1.5.0 | 2019-10 | Hudi | 0.10.0 | 2021-12 | |
1.6.0 | 2020-07 | 0.11.0 | 2022-03 | ||
2.0.0 | 2018-05 | 0.12.0 | 2022-08 | ||
2.2.0 | 2019-06 | Sqoop | 1.4.6 | 2017-10 | |
2.4.0 | 2020-12 | 1.4.7 | 2020-07 | ||
2.5.0 | 2022-08 | ||||
Phoenix | 4.14.0 (1.4) | 2018-06 | |||
4.16.1 ( 1.3, 1.4, 1.5, 1.6) | 2021-05 | ||||
5.1.2 ( 2.1, 2.2, 2.3, 2.4) | 2021-07 |
*注:着重标出的为公司实际生产中的常用版本。
服务器使用物理机还是云主机?
1)机器成本考虑:
(1)物理机:以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,惠普品牌。一般物理机寿命5年左右。
(2)云主机,以阿里云为例,差不多相同配置,每年5W。华为云、腾讯云、天翼云。
2)运维成本考虑:
(1)物理机:需要有专业的运维人员(1万 * 13个月)、电费(商业用户)、安装空调、场地。
(2)云主机:很多运维工作都由阿里云已经完成,运维相对较轻松。
3)企业选择
(1)金融有钱公司选择云产品(上海)。
(2)中小公司、为了融资上市,选择云产品,拉到融资后买物理机。
(3)有长期打算,资金比较足,选择物理机。
20核物理CPU 40线程 * 8 = 320线程 (指标 100-200)
内存128g * 8台 = 1024g (计算任务内存800g,其他安装框架需要内存)
128m =》512M内存
200g数据 、800g内存
根据数据规模搭建集群(在企业,干了三年 通常服务器集群 5-20台之间)
(1)参考腾讯云EMR官方推荐部署
(2)数据传输数据比较紧密的放在一起(Kafka、clickhouse)
(3)客户端尽量放在一到两台服务器上,方便外部访问
(4)有依赖关系的尽量放到同一台服务器(例如:Ds-worker和hive/spark,ClickHouse必须单独部署)
Master | Master | core | core | core | common | common | common |
nn | nn | dn | dn | dn | JournalNode | JournalNode | JournalNode |
rm | rm | nm | nm | nm | |||
zk | zk | zk | |||||
hive | hive | hive | hive | hive | |||
kafka | kafka | kafka | |||||
spark | spark | spark | spark | spark | |||
datax | datax | datax | datax | datax | |||
Ds-master | Ds-master | Ds-worker | Ds-worker | Ds-worker | |||
maxwell | |||||||
superset | |||||||
mysql | |||||||
flume | flume | ||||||
flink | flink | ||||||
redis | |||||||
hbase |
大数据开发工程师 =》 大数据组组长=》项目经理=》部门经理=》技术总监CTO
=》 高级架构师 =》 资深架构师
小公司:职级就分初级,中级,高级。晋升规则不一定,看公司效益和职位空缺。
大公司都有明确的职级:
小型公司(1-3人左右):组长1人,剩余组员无明确分工,并且可能兼顾JavaEE和前端。
中小型公司(3~6人左右):组长1人,离线2人左右,实时1人左右(离线一般多于实时),组长兼顾和JavaEE、前端。
中型公司(5~10人左右):组长1人,离线3~5人左右(离线处理、数仓),实时2人左右,组长和技术大牛兼顾和JavaEE、前端。
中大型公司(10~20人左右):组长1人,离线5~10人(离线处理、数仓),实时5人左右,JavaEE1人左右(负责对接JavaEE业务),前端1人(有或者没有人单独负责前端)。(发展比较良好的中大型公司可能大数据部门已经细化拆分,分成多个大数据组,分别负责不同业务)
上面只是参考配置,因为公司之间差异很大,例如ofo大数据部门只有5个人左右,因此根据所选公司规模确定一个合理范围,在面试前必须将这个人员配置考虑清楚,回答时要非常确定。
咱们自己公司:大数据组组长:1个人;离线3-4个人;实时1-3个人。
IOS多少人?安卓多少人?前端多少人?JavaEE多少人?测试多少人?
(IOS、安卓) 1-2个人 前端3个人; JavaEE一般是大数据的1-1.5倍,测试:有的有,1个左右,有的没有。 产品经理1个、产品助理1-2个,运营1-3个。
公司划分:
0-50 小公司;50-500 中等;500-1000 大公司;1000以上 大厂 领军的存在。
1)需要问项目经理的问题
(1)数据量(历史数据、增量、全量): 100g
(2)预算: 50万
(3)数据存储多久: 1年
(4)云主机、物理机: 云主机
(5)日活: 100万
(6)数据源: 接口、用户行为数据(文件)、业务数据(MySQL)
(7)项目周期: 1个月-3个月
(8)团队多少人: 3-5个
(9)首批指标: 1-10个
(10)未来的规划: 离线和实时 是否都要做
2)项目周期(2个月)
(1)数据调研(2周) + 集群搭建
(2)明确数据域(2天)
(3)构建业务矩阵(3天)
(4)建模 至下而上 (2周)
①ODS层 ②DWD层 ③DIM层
(5)指标体系建设 至上而下 (2周)
(6)处理bug 1周
1)数据仓库建模的意义
如果把数据看作图书馆里的书,我们希望看到它们在书架上分门别类地放置;
2)ER模型
如果对方问三范式问题。初步判断对方是一个java程序员,就不要和他深入聊,mysql高级、redis、多线程、JVM、SSM等框架了。
应该把话题转移到大数据技术。Spark、flink、海量数据如何处理、维度建模。
3)维度建模
星型模型:事实表周围一级维度 减少join => 大数据场景不适合频繁的join
雪花模型:事实表周围多级维度
星座:多个事实表
4)事实表
(1)如何判断一张表是事实表?
具有度量值的 可以累加的 个数、件数、金额、次数
(2)同步策略
数据量大 =》 通常增量 特殊的,加购 (周期快照事实表)
(3)分类
①事务型事实表
找原子操作。 例如:下单 加购 支付
①选择业务过程
②声明粒度
③确定维度
④确定事实
不足:
连续性指标,不好找原子操作。 例如,库存(周期快照事实表)
多事实表关联。 例如,统计加购到支付的平均使用时长 (累积型快照事实表)
②周期快照事实表
③累积型快照事实表
5)维度表
(1)如何判断一张表是维度表?
没有度量值,都是描述信息。 身高 体重、年龄、性别
(2)同步策略
数据量小 =》 通常 全量 特殊的 用户表
(3)维度整合 减少Join操作
①商品表、商品品类表、SPU、商品一级分类、二级分类、三级分类=》商品维度表
②省份表、地区表 =》 地区维度表
③活动信息表、活动规则表 =》 活动维度表
(4)拉链表
对用户表做了拉链。
缓慢变化维 场景
6)建模工具是什么?
PowerDesigner、EZDML
1)数据调研
(1)先和Java人员要表,表中最好有字段的描述或者有表和字段的说明文档。(项目经理帮助协调) =》 快速熟悉表中业务。梳理清楚业务线,找到事实表和维度表。
(2)和业务人员聊 =》 验证你猜测的是否正确
(3)和产品经理聊
需求:派生指标、衍生指标
派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
需求中的业务过程必须和实际的后台业务能对应上。
2)明确数据域
(1)用户域:登录、注册
(2)流量域:启动、页面、动作、故障、曝光
(3)交易域:加购、下单、支付、物流、取消下单、取消支付
(4)工具域:领取优惠卷、使用优惠卷下单、使用优惠卷支付
(5)互动域:点赞、评论、收藏
3)构建业务矩阵
用户、商品、活动、时间、地区、优惠卷
(1)用户域:
登录、注册
(2)流量域: √
启动、页面、动作、故障、曝光
(3)交易域:
加购、下单、支付、物流、取消下单、取消支付
(4)工具域:
领取优惠卷、使用优惠卷下单、使用优惠卷支付
(5)互动域:
点赞、评论、收藏
4)建模 至下而上
(1)ODS层
①保持数据原貌不做任何修改 起到备份作用
②采用压缩 减少磁盘空间,采用Gzip压缩
③创建分区表 防止后续全表扫描
(2)DWD层 事实表
①事务型事实表
找原子操作
a)选择业务过程
选择感兴趣的业务过程。 产品经理提出的指标中需要的。
b)声明粒度
粒度:一行信息代表什么含义。可以是一次下单、一周下单、一个月下单。
如果是一个月的下单,就没有办法统计一次下单情况。保持最小粒度。
只要你自己不做聚合操作就可以。
c)确定维度
确定感兴趣的维度。 产品经理提出的指标中需要的。
例如:用户、商品、活动、时间、地区、优惠卷
d)确定事实
确定事实表的度量值。 可以累加的值,例如,个数、件数、次数、金额。
事务型事实表的不足:
(2)周期快照事实表
①选择业务过程
②声明粒度 =》 1天
③确定维度
④确定事实
(3)累积型快照事实表
①选择业务过程
②声明粒度
③确定维度
④确定事实 确定多个事实表度量值
(3)DIM层 维度表
①维度整合 减少join
a)商品表、商品品类表、spu、商品一级分类、二级分类、三级分类=》商品维度表
b)省份表、地区表 =》 地区维度表
c)活动信息表、活动规则表 =》 活动维度表
②拉链表
对用户表做了拉链。
缓慢变化维 场景。
5)指标体系建设 至上而下
(1)ADS层
需求、日活、新增、留存、转化率、GMV
(2)DWS层 聚合层
需求:派生指标、衍生指标
派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
例如,统计,每天各个省份手机品牌交易总额
交易总额 (下单 + 金额 + sum ) + 每天 + 省份 + 手机品牌
找公共的:业务过程 + 统计周期 + 统计粒度 建宽表
1)ODS层做了哪些事?
(1)保持数据原貌,不做任何修改
(2)压缩采用gzip,压缩比是100g数据压缩完10g左右。
(3)创建分区表
2)DIM/DWD层做了哪些事?
建模里面的操作,正常写。
(1)数据清洗的手段
HQL、MR、SparkSQL、Kettle、Python(项目中采用SQL进行清除)
(2)清洗规则
金额必须都是数字,[0-9]、手机号、身份证、匹配网址URL
解析数据、核心字段不能为空、过期数据删除、重复数据过滤
json => 很多字段 =》 一个一个判断 =》 取数,根据规则匹配
(3)清洗掉多少数据算合理
参考,1万条数据清洗掉1条。
(4)脱敏
对手机号、身份证号等敏感数据脱敏。
①加*
135****0013 互联网公司经常采用
②加密算法 md5 需要用数据统计分析,还想保证安全
美团 滴滴 md5(12334354809)=》唯一值
③加权限 需要正常使用 军工、银行、政府
(5)压缩snappy
(6)orc列式存储
3)DWS层做了哪些事?
指标体系建设里面的内容再来一遍。
4)ADS层做了哪些事?
一分钟至少说出30个指标。
日活、月活、周活、留存、留存率、新增(日、周、年)、转化率、流失、回流、七天内连续3天登录(点赞、收藏、评价、购买、加购、下单、活动)、连续3周(月)登录、GMV、复购率、复购率排行、点赞、评论、收藏、领优惠卷人数、使用优惠卷人数、沉默、值不值得买、退款人数、退款率 topn 热门商品
产品经理最关心的:留转G复活
数据量的描述都是压缩前的数据量。
1)ODS层:
(1)用户行为数据(100g => 1亿条;1g => 100万条)
曝光(60g or 600万条)、页面(20g)、动作(10g)、故障 + 启动(10g)
(2)业务数据(1-2g => 100万-200万条)
登录(20万)、注册(100-1000);
加购(每天增量20万、全量100万)、下单(10万)、支付(9万)、物流(9万)、取消下单(500)、退款(500);
领取优惠卷(5万)、使用优惠卷下单(4万)、使用优惠卷支付(3万);
点赞(1000)、评论(1000)、收藏(1000);
用户(活跃用户100万、新增1000、总用户1千万)、商品SPU(1-2万)、商品SKU(10-20万)、活动(1000)、时间(忽略)、地区(忽略)
2)DWD层 + DIM层:
和ODS层几乎一致;
3)DWS层
轻度聚合后,20g-50g。
4)ADS层
10-50m之间,可以忽略不计。
1)Flume零点漂移
2)Flume挂掉及优化
3)Datax空值、调优
4)HDFS小文件处理
5)Kafka挂掉
6)Kafka丢失
7)Kafka数据重复
8)Kafka消息数据积压
9)Kafk乱序
10)Kafka顺序
11)Kafka优化(提高吞吐量)
12)Kafka底层怎么保证高效读写
13)Kafka单条日志传输大小
14)Hive优化(Hive on Spark)
15)Hive解决数据倾斜方法
19)疑难指标编写(7天内连续3次活跃、1 7 30指标、路径分析、用户留存率、最近7/30日各品牌复购率、最近30天发布的优惠券的补贴率、 同时在线人数)
20)DS任务挂了怎么办?
21)DS故障报警
SKU:一台银色、128G内存的、支持联通网络的iPhoneX。
SPU:iPhoneX。
Tm_id:品牌Id苹果,包括IPHONE,耳机,MAC等。
订单表的订单状态会变化,订单详情表不会,因为没有订单状态。
订单表记录user_id,订单id订单编号,订单的总金额order_status,支付方式,订单状态等。
订单详情表记录user_id,商品sku_id,具体的商品信息(商品名称sku_name,价格order_price,数量sku_num)
上卷:上卷是沿着维度的层次向上聚集汇总数据。
下探(钻):下探是上卷的逆操作,它是沿着维度的层次向下,查看更详细的数据。
比如这个经典的数据立方体模型:
维度有产品、年度、地区等,统计销售额。实际上,维度还可以更细粒度,如时间维可由年、季、月、日构成,地区也可以由国家、省份、市、区县构成等。
下钻可以理解为由粗粒度到细粒度来观察数据,比如对产品销售情况分析时,可以沿着时间维从年到月到日更细粒度的观察数据。
增加维度粒度“月”。
上卷和下钻是相逆的操作,所以上卷可以理解为删掉维的某些粒度,由细粒度到粗粒度观察数据,向上聚合汇总数据。
TOB(toBusiness):表示面向的用户是企业。
TOC(toConsumer):表示面向的用户是个人。
1)活跃
日活:100万 ;月活:是日活的2-3倍 300万
总注册的用户多少?1000万-3000万之间。
渠道来源:app 公众号 抖音 百度 36氪 头条 地推
2)GMV
GMV:每天 10万订单 (50 – 100元) 500万-1000万
10%-20% 100万-200万(人员:程序员、人事、行政、财务、房租、收电费)
3)复购率
某日常商品复购;(手纸、面膜、牙膏)10%-20%
电脑、显示器、手表 1%
4)转化率
商品详情 =》 加购物车 =》下单 =》 支付
1%-5% 50-60% 80%-95%
5)留存率
1/2/3-60日、周留存、月留存
搞活动: 10-20%
日活增加50%,GMV增加多少20%。(留转G复活)情人节,促销手纸。
集群资源都留有预量。11.11,6.18,数据量过大,提前动态增加服务器。
加多少机器:3-4台
面膜、手纸,每天销售5000个。下载APP根据自身业务
基本一个项目建一个库,表格个数为初始的原始数据表格加上统计结果表格的总数。(一般70-100张表格)。
用户行为5张;业务数据33张表 =》ods34 =》dwd=>32张=》dws 22张宽表=>ads=》15张 =》103张。
Datax:00:10 => 10-20分钟左右 第一次全量。
用户行为数据,每天0:30开始运行。=》ds =》 5-6个小时运行完指标。
所有离线数据报表控制在8小时之内。
大数据实时处理部分控制在5分钟之内。(分钟级别、秒级别)
如果是实时推荐系统,需要秒级响应。
1)用户行为数据
曝光(60g or 6000万条)、页面(20g)
2)业务数据(1-2g => 100万-200万条)
登录(20万)、注册(100-1000);
加购(20万)、下单(10万)
用户(活跃用户100万、新增1000、总用户1千万)
商品SKU(10万-20万)
最费时间,一般是发生数据倾斜时,会比较费时间。
1)Group By
(1)统计各个省份对应的交易额
第一个统计完的指标和最后一个统计完是时间相差20倍
我们从Yarn上看到的
一共执行了多长时间 4-5小时
你想:发生了数据倾斜 任务停止掉
(2)解决办法:
①开启map-side 预聚合
②skewindata
解决后的效果怎么样 ?
30-50分钟内执行完了
2)Join
统计 事实表 和维度表join => mapjoin
(1)小表 大表 join mapjoin
解决办法: mapjoin
(2)大表 =》 大表 join
项目中什么出现 统计 加购到支付的平均使用时长
执行时间 4-5小时 yarn
①:skewjoin
②:smbjoin 分桶有序join 使用的前提 (分桶且有序)
③:左表随机 右表扩容
④:通过建模 规避 大表join大表
累积型快照事实表
高峰期晚上7-12点。Kafka里面20m/s 2万/s 并发峰值在1-2万人
常用的包括:textFile,ORC,Parquet,一般企业里使用ORC或者Parquet,因为是列式存储,且压缩比非常高,所以相比于textFile,查询速度快,占用硬盘空间少。
(1)部分公司永久不删
(2)有一年、两年“删除”一次的,这里面说的删除是,先将超时数据压缩下载到单独安装的磁盘上。然后删除集群上数据。 很少有公司不备份数据,直接删除的。
修改表结构,将新增字段放置最后!
详细看第一章Hive多表join优化手段。
大小表join 大表join大表。
拉链表用于记录维度表中的历史变化。在拉链表中,当某个维度属性发生变化时,会插入一条新的记录,同时将原记录的有效期设置为截至。退链是指将一个已经生效的变更恢复到上一个状态。实现思路如下:
(1)定位要退链的记录:例如找到用户最近一次信息更新
(2)查询上一条记录:查询这条记录之前的一条记录(主键相同,不同版本记录,而不是单指上一条)
(3)更新有效期:将当前记录的生效时间或者有效开始时间更新为无效,将上条记录截至日期改为最大值。
补数:指重新处理一段历史时间范围内的数据,以修复数据问题。
利用调度框架,海豚调度器的补数功能进行补数。
(1)确定需求:与产品经理和业务部门沟通
(2)数据源分析:分析现有数据源确定是否可以满足新指标计算需求,如果不能支撑需要引入新的数据源或者扩展现有数据源
(3)数据模型设计:根据新指标,设计模型
(4)数据处理流程设计:利用sql进行数据的提取、清洗、转化和加载,实现指标
引入新的数据源或者扩展现有数据源
产品经理:需求、统一业务口径
后端开发:数据源以及存储,数据格式,业务逻辑
前端开发:数据可视化要求,用户体验
产品经理
累积型快照事实表。
1)申请资源
2)加载数据
3)如果存在于外部系统交互,获取连接慢
(1)使用Snappy压缩,原始文件大小不等,Map阶段数据倾斜。
(2)over开窗,partition by导致数据倾斜。
在离线数据仓库中,没有意义的用户通常指那些对业务价值贡献极低或没有贡献的用户。这类用户可能是僵尸用户、垃圾用户或恶意用户。
(1)低活跃度用户:这类用户在平台上的活跃度极低,可能很长时间没有登录或者几乎没有操作。可以通过用户登录频率、活动次数等指标来定义低活跃度用户。
(2)低价值用户:这类用户对平台的业务价值贡献很低,例如在电商平台上从未进行过购买,或者在内容平台上几乎没有观看、点赞和评论等操作。可以通过消费金额、互动次数等指标来定义低价值用户。
(3)僵尸用户:这类用户可能是批量注册的账户或者长时间不活跃的用户。僵尸用户可能会导致平台数据泡沫,影响业务分析和决策。可以通过注册信息、活跃度和价值指标综合判断僵尸用户。
(4)恶意用户:这类用户可能会进行恶意操作,如发布垃圾信息、刷单、刷点击等。这些操作可能会对平台的数据准确性和用户体验产生负面影响。可以通过用户行为分析、异常检测等方法来识别恶意用户。
我们的公司是自营的,有自己的商品,我们有自己经营的商品维度表,也有商品从采购到上架、发货、收货等一系列流程的业务数据和日志数据。
需要下载APP根据自身情况灵活回答!
订单状态的改变会引起状态码的变更,这对应了不同的业务过程,可以被提取为不同的事务事实表,无论退款发生在哪个结点,支付成功、发货、收货这些业务过程都可以被记录在对应的事务事实表中。
在离线数仓中,当天订单没有闭环结束的数据量通常指以下几种数据:
以当天10万订单为标准
(1)未支付订单:指当天生成的订单,但客户还未完成支付行为的数据。
300-500
(2)待发货订单:指当天已经收到付款的订单,但还未处理发货行为的数据。
6万
(3)退货订单:指当天客户发起的退货请求,但商家还未处理完退货流程的数据。
200-300
除了用户信息脱敏,我们还对用户埋点数据中的IP、UA等信息进行解析,同时我们对数据进行去重操作,还会对空值以及异常数据进行处理,对于部分维度数据我们将其合并。
对于MD5加密算法,由于它是一种不可逆的散列算法,无法恢复原始数据。
表的生命周期管理是数据仓库和数据库管理的一个重要方面,需要关注表的创建、使用、优化和删除等阶段。管理表的生命周期有如下几个阶段:
1)表设计
2)数据导入和更新
3)数据存储和备份
4)数据安全和权限管理
5)性能优化和监控
6)数据归档和删除
(1)样本数据验证:从计算结果抽取部分样本数据,与业务部门实际数据对比。
(2)逻辑验证:检查指标的计算sql是否正确
(3)指标间关系验证:比较不同指标间关系,检查它们是否符合预期。例如:某个指标是另外一个指标的累计值,那么这两个指标一定存在关系
(4)历史数据对比:将计算结果和过去数据进行对比,观察指标的变化趋势
(5)异常值检测:检查计算结果中是否存在异常值
(6)跨数据部门对比:可以将计算结果与其它部门或团队的数据进行对比,进一步验证
(1)确定错误范围:找出指标计算错误的时间范围,指标及相关维度,缩小排查范围
(2)检查数据处理逻辑:从ods层开始排查,找出可能导致计算错误的数据清洗,转换和聚合等步骤,确认处理逻辑不出错误
(3)审查数据质量:确保每层数据完整性,一致性,准确性和时效性
(4)重新计算指标:修复数据质量问题和处理逻辑后,重新计算
数仓脚本是串行执行的,ods=》dim=》dwd=》dws=》ads
所以占用集群资源比例,仅为当前执行层的脚本转化成底层Sparkjob需要的资源
CPU与内存比 1:4
离线: 128M数据 512M内存
实时:并行度与Kafka分区一致,CPU与Slot比 1:3
20M/s -> 3个分区 -> CPU与Slot比 1:3 -> 3个Slot -> Core数1个 -> CPU与内存比 1:4 -> TM 1 slot -> TM 4G资源
JobManager 2G内存 1CPU
平均 一个Flink作业6G内存,2Core
1)数仓搭建完成之前:
(1)需求分析:与产品了解需求,明确数仓实现功能
(2)数据源分析:分析数据源是否充足,是否需要引进新数据源
(3)数据模型设计
(4)技术选型
(5)数据处理流程设计:Hivesql
2)数仓搭建完成之后:
(1)开发与维护
(2)数据监控与报警
(3)数据质量管理
(4)性能优化
(5)新需求开发
(6)报表与可视化
(7)文档编写与知识分享
1)埋点选择
免费的埋点:上课演示。前端程序员自己埋点。
收费的埋点:神策、百度统计、友盟统计。
2)埋点方式主要有两种:
(1)按照页面埋点,有几个页面就创建几个表。
(2)按照用户行为:页面数据、事件数据、曝光数据、启动数据和错误数据。 咱们项目中采用的这种方式。
3)埋点数据日志格式
为了减少网络上数据的传输,日志格式设计时都会有公共信息。
{
"common": { -- 公共信息
"ar": "230000", -- 地区编码
"ba": "iPhone", -- 手机品牌
"ch": "Appstore", -- 渠道
"md": "iPhone 8", -- 手机型号
"mid": "YXfhjAYH6As2z9Iq", -- 设备id
"os": "iOS 13.2.9", -- 操作系统
"uid": "485", -- 会员id
"vc": "v2.1.134" -- app版本号
},
"actions": [ --动作(事件)
{
"action_id": "favor_add", --动作id
"item": "3", --目标id
"item_type": "sku_id", --目标类型
"ts": 1585744376605 --动作时间戳
}
],
"displays": [
{
"displayType": "query", -- 曝光类型
"item": "3", -- 曝光对象id
"item_type": "sku_id", -- 曝光对象类型
"order": 1 --出现顺序
},
{
"displayType": "promotion",
"item": "6",
"item_type": "sku_id",
"order": 2
},
{
"displayType": "promotion",
"item": "9",
"item_type": "sku_id",
"order": 3
}
],
"page": { --页面信息
"during_time": 7648, -- 持续时间毫秒
"item": "3", -- 目标id
"item_type": "sku_id", -- 目标类型
"last_page_id": "login", -- 上页类型
"page_id": "good_detail", -- 页面ID
"sourceType": "promotion" -- 来源类型
},
"err":{ --错误
"error_code": "1234", --错误码
"msg": "***********" --错误信息
},
"ts": 1585744374423 --跳入时间戳
}
随着公司不断业务不断发展,产品需求和内部决策对于数据实时性要求越来越迫切,传统离线数仓T+1模式已经不能满足,所以需要实时数仓的能力来赋能。
和离线保持一致。
和离线保持一致。
1)生产集群规模、Flink集群规模(10台为例)
项目中方便作业提交,Flink作为客户端,部署在所有的Worker节点
举例:Job数量在20左右,需要10台服务器
Clickhouse单独部署,服务器使用128G,64C
1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |
nn | nn | dn | dn | dn | dn | dn | dn | ||
rm | rm | nm | nm | nm | nm | nm | nm | ||
zk | zk | zk | |||||||
Kafka | Kafka | Kafka | |||||||
Flume | Flume | Flume | |||||||
Hive | Hive | ||||||||
MySQL | |||||||||
Spark | Spark | ||||||||
DS | DS | Datax | maxwell | ||||||
Hbase | Hbase | Hbase | |||||||
Flink | Flink | Flink | Flink | Flink | Flink | ||||
CK | CK |
1)数据调研
(1)先和Java人员要表,表中最好有字段的描述或者有表和字段的说明文档。(项目经理帮助协调) =》 快速熟悉表中业务。梳理清楚业务线,找到事实表和维度表。
(2)和业务人员聊 =》 验证你猜测的是否正确
(3)和产品经理聊
需求:派生指标、衍生指标
派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
需求中的业务过程必须和实际的后台业务能对应上。
2)明确数据域
(1)用户域:登录、注册
(2)流量域:启动、页面、动作、故障、曝光
(3)交易域:加购、下单、支付、物流
(4)工具域:领取优惠卷、使用优惠卷下单、使用优惠卷支付
(5)互动域:点赞、评论、收藏
3)构建业务矩阵
用户、商品、活动、时间、地区、优惠卷
(1)用户域:
登录、注册
(2)流量域: √
启动、页面、动作、故障、曝光
(3)交易域:
加购、下单、支付、物流
(4)工具域:
领取优惠卷、使用优惠卷下单、使用优惠卷支付
(5)互动域:
点赞、评论、收藏
4)建模 至下而上
(1)ODS层
①存Kafka: topic_log\topic_db ,保持数据原貌不做处理
(2)DWD层 事实表
①事务型事实表
找原子操作
a)选择业务过程
选择感兴趣的业务过程。 产品经理提出的指标中需要的。
b)声明粒度
粒度:一行信息代表什么含义。可以是一次下单、一周下单、一个月下单。
如果是一个月的下单,就没有办法统计一次下单情况。保持最小粒度。
只要你自己不做聚合操作就可以。
c)确定维度
确定感兴趣的维度。 产品经理提出的指标中需要的。
例如:用户、商品、活动、时间、地区、优惠卷
d)确定事实
确定事实表的度量值。 可以累加的值,例如,个数、件数、次数、金额。
e)维度退化
通过Lookupjoin 将字典表中字段退化到明细表中
(3)DIM层 维度表
①维度数据存储Hbase,同时不做维度整合
5)指标体系建设 至上而下
(1)ADS层
需求、日活、新增、留存、转化率、GMV
(2)DWS层 聚合层
需求:派生指标、衍生指标
派生指标 = 原子指标(业务过程 + 度量值 + 聚合逻辑) + 统计周期 + 统计粒度 + 业务限定
例如,统计,每天各个省份手机品牌交易总额
交易总额 (下单 + 金额 + sum ) + 每天 + 省份 + 手机品牌
找公共的:业务过程 + 统计周期 + 统计粒度 建宽表
1)ODS层
(1)用户行为数据(100g => 1亿条;1g => 100万条)
曝光(60g or 600万条)、页面(20g)、动作(10g)、故障 + 启动(10g)
(2)业务数据(1-2g => 100万-200万条)
登录(20万)、注册(100-1000);
加购(每天增量20万、全量100万)、下单(10万)、支付(9万)、物流(9万)、取消下单(500)、退款(500);
领取优惠卷(5万)、使用优惠卷下单(4万)、使用优惠卷支付(3万);
点赞(1000)、评论(1000)、收藏(1000);
用户(活跃用户100万、新增1000、总用户1千万)、商品SPU(1-2万)、商品SKU(10-20万)、活动(1000)、时间(忽略)、地区(忽略)
2)DWD层 + DIM层
和ODS层几乎一致;
3)DWS层
轻度聚合后,20g-50g。
4)ADS层
10-50m之间,可以忽略不计。
1)Kafka:
ods层和dwd层数据
ods和dwd数据一致,每天约200G数据
考虑kafka副本2个,保存三天,kafka存储400G数据
2)Hbase:
存储 dim层数据,与离线一致
3)Clickhouse:
存储dws层数据,每天约20~30G数据
考虑dws层数据保存一年,Clickhouse三个副本
数据约15~20T
QPS峰值:20000条/s或者2M/s
详见第一章(FlinkCDC,Maxwell,Canal)对比
1)Dim动态分流使用广播状态,新老访客修复使用键控状态
状态中数据少使用HashMap,状态中数据多的使用RocksDB
2)大状态优化手段
(1)使用rocksdb
(2)开启增量检查点、本地恢复、设置多目录
(3)设置预定义选项为 磁盘+内存 的策略,自动设定 writerbuffer、blockcache等
1)项目中反压造成的原因
流量洪峰:不需要解决
频繁GC:比如代码中大量创建临时对象
大状态:新老访客修复
关联外部数据库:从Hbase读取维度数据或将数据写入Clickhouse
数据倾斜:keyby之后不同分组数据量不一致
2)反压的危害
问题:Checkpoint超时失败导致job挂掉
内存压力变大导致的OOM导致job挂掉
时效性降低
3)定位反压
(1)利用Web UI定位
定位到造成反压的节点,排查的时候,先把operator chain禁用,方便定位到具体算子。
Flink 现在在UI上通过颜色和数值来展示繁忙和反压的程度。
上游都是high,找到第一个为ok的节点就是瓶颈节点。
(2)利用Metrics定位
可以根据指标分析反压: buffer.inPoolUsage、buffer.outPoolUsage
可以分析数据传输
4)处理反压
反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。如果反压是暂时的,应该忽略它。
(1)查看是否数据倾斜
(2)使用火焰图分析看顶层的哪个函数占据的宽度最大。只要有"平顶"(plateaus),就表示该函数可能存在性能问题。
(3)分析GC日志,调整代码
(4)资源不合理造成的:调整资源
(5)与外部系统交互:
写MySQL、Clickhouse:攒批写入
读HBase:异步IO、旁路缓存
1)数据倾斜现象:
相同Task 的多个 Subtask 中,个别Subtask 接收到的数据量明显大于其他 Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。
2)数据倾斜解决
(1)数据源倾斜
比如消费Kafka,但是Kafka的Topic的分区之间数据不均衡
读进来之后调用重分区算子:rescale、rebalance、shuffle等
(2)单表分组聚合(纯流式)倾斜
API:利用flatmap攒批、预聚合
SQL:开启MiniBatch+LocalGlobal
(3)单表分组开窗聚合倾斜
第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚合
注意:聚合完不再是WindowedStream,要获取WindowEnd作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
第二阶段聚合:按照原来的key及windowEnd作keyby、聚合
在我们项目中,用到了Clickhouse,我们可以第一阶段打散聚合后,直接写入Click house,查clickhouse再处理第二阶段
上游:kafka保证offset可重发,kafka默认实现
Flink:Checkpoint设置执行模式为Exactly_once
下游:使用事务写入Kafka,使用幂等写入Clickhouse且查询使用final查询
(1)设置空闲状态保留时间
(2)开启MiniBatch
(3)开启LocalGlobal
(4)开启Split Distinct
(5)多维Distinct使用Filter
设置Flink动态监控kafka分区的参数
注入水位线时,设置最小等待时间
详见Hbase的rowkey设计原则
对Redis和数据库的操作有2种方案:
(1)先操作(删除)Redis,再操作数据库
并发下可能产生数据一致性问题。
上面的图表示,Thread-1 是个更新流程,Thread-2 是个查询流程,CPU 执行顺序是:Thread-1 删除缓存成功,此时 Thread-2 获取到 CPU 执行查询缓存没有数据,然后查询数据库把数据库的值写入缓存,因为此时 Thread-1 更新数据库还没有执行,所以缓存里的值是一个旧值(old),最后 CPU 执行 Thread-1 更新数据库成功的代码,那么此时数据库的值是新增(new),这样就产生了数据不一致行的问题。
解决上述问题的两种方案:
①加锁,使线程顺序执行:如果一个服务部署到了多个机器,就变成了分布式锁,或者是分布式队列按顺序去操作数据库或者 Redis,带来的副作用就是:数据库本来是并发的,现在变成串行的了,加锁或者排队执行的方案降低了系统性能,所以这个方案看起来不太可行。
②采用双删:先删除缓存,再更新数据库,当更新数据后休眠一段时间再删除一次缓存。
(2)先操作数据库,再操作(删除) Redis
我们如果更新数据库成功,删除 Redis 失败,那么 Redis 里存放的就是一个旧值,也就是删除缓存失败导致缓存和数据库的数据不一致了
上述二种方案,都希望数据操作要么都成功,要么都失败,也就是最好是一个原子操作,我们不希望看到一个失败,一个成功的结果,因为这样就产生了数据不一致的问题。
(1)使用interval join调整上下限时间,但是依然会有迟到数据关联不上
(2)使用left join,带回撤关联
(3)可以使用Cogroup+connect关联两条流
项目中提交使用的per-job模式,因为每个job资源隔离、故障隔离、独立调优
JobManager:内存默认1G,cpu默认1核
TaskManager:数据量多的job,例如:Topic_log分流的job可以给8G
数据量少的job,例如:Topic_db分流的job可以给4G
实时:并行度与Kafka分区一致,CPU与Slot比 1:3
20M/s -> 3个分区 -> CPU与Slot比 1:3 -> 3个Slot -> Core数1个 -> CPU与内存比 1:4 -> TM 1 slot -> TM 4G资源
JobManager 2G内存 1CPU
平均 一个Flink作业6G内存,2Core
全局并行度设置和kafka分区数保持一致为5,Keyby后计算偏大的算子,单独指定。
Checkpoint间隔:作业多久触发一次Checkpoint,由job状态大小和恢复调整,一般建议3~5分钟,时效性要求高的可以设置s级别。
Checkpoint超时:限制Checkpoint的执行时间,超过此时间,Checkpoint被丢弃,建议10分钟。
Checkpoint最小间隔:避免Checkpoint过于频繁,可以设置分钟级别。
Checkpoint的执行模式:Exactly_once或At_least_once,选择Exactly_once。
Checkpoint的存储后端:一般存储HDFS 。
(1)设置乱序时间
(2)窗口允许迟到时间
(3)侧输出流
生产中侧输出流,需要Flink单独处理,在写入Clickhouse,通过接口再次计算
反压,状态大小,资源偏少,机器性能,checkpoint时间都会影响数仓延迟。
一般影响最大就是窗口大小,一般是5s。
如果启用两阶段提交写入Kafka,下游设置读已提交,那么需要加上CheckPoint间隔时间。
开发周期半年,维护半年多
初次启动,Redis没有缓存数据,大量读请求访问Habse,类似于缓存雪崩
从离线统计热门维度数据,最近三天用户购买,活跃的sku,手动插入Redis。
在Open方法中预加载配置信息到HashMap以防止配置信息后到。
Savepoint停止程序,通过Savepoint恢复程序。
代码改动较大,savepoint恢复不了怎么办,看历史数据要不要,要从头跑,不要就不适用savepoint恢复直接提交运行。
在Flink中,我们可以通过设置externalized-checkpoint来启用外部化检查点,要从特定的检查点(例如第三个检查点)恢复作业,我们需要手动指定要从哪个检查点(需要指定到chk-xx目录)恢复。
按照每个人KeyBy,将出发时间存入状态,当到达时使用到达时间减去出发时间。
内部的数据质量:内部一致性检查点
数据的时效:结合3.9.5回答
实时任务出现延迟时,可以从以下几个方面进行排查:
(1)监控指标:看是否反压
(2)日志信息:查看任务运行时的日志信息,定位潜在的问题和异常情况。例如,网络波动、硬件故障、不当的配置等等。
(3)外部事件:如果延迟出现在大量的外部事件后,则可能需要考虑其他因素(如外部系统故障、网络波动等)。框架混部,资源争抢!
未做优化之前,有几千QPS,做完Redis的缓存优化,下降到几十
是我们自己搭建的,用来监控Flink任务和集群的相关指标
1)TaskManager Metrics:这些指标提供有关TaskManager的信息,例如CPU使用率、内存使用率、网络IO等。
2)Task Metrics:这些指标提供有关任务的信息,例如任务的延迟时间、记录丢失数、输入输出速率等。
3)Checkpoint Metrics:这些指标提供有关检查点的信息,例如检查点的持续时间、成功/失败的检查点数量、检查点大小等。
4)Operator Metrics:这些指标提供有关Flink操作符的信息,例如操作符的输入/输出记录数、处理时间、缓存大小等。
动态分流,其他没有做过!
在HBase中彻底删除表中的数据,需要执行以下步骤:
(1)禁用表
(2)创建一个新表
(3)复制需要保留的数据,将需要保留的数据从旧表复制到新表。
(4)删除旧表
(5)重命名新表
在执行这些步骤之前,建议先进行数据备份以防止意外数据丢失。此外,如果旧表中的数据量非常大,复制数据到新表中的过程可能会需要很长时间。
Flink本身存在反压机制,短时间的数据倾斜问题可以自身消化掉,所以针对于这种偶然性数据倾斜,不做处理。
(1)我们采用的是低延迟增量更新,本身就有延迟,没办法保证完全的正确数据。
(2)如果必须要正确结果,只能直接读取MySQL数据,但是需要考虑并发,MySQL机器性能。
1)前端埋点的行为数据为什么又采集一份?
时效性
Kafka保存3天,磁盘够:原来1T,现在2T,没压力
2)为什么选择Kafka?
实时写、实时读
=》 消息队列适合,其他数据库受不了
3)为什么用Maxwell?历史数据同步怎么保证一致性?
FlinkCDC在20年7月才发布
Canal与Maxwell区别:
Maxwell支持同步历史数据
Maxwell支持断点还原(存在元数据库)
数据格式更轻量
保证至少一次,不丢
4)Kafka保存多久?如果需要以前的数据怎么办?
跟离线项目保持一致:3天
我们的项目不需要,如果需要的话可以去数据库或Hive现查,ClickHouse也有历史的宽表数据。
1)存储原始数据
2个topic:埋点的行为数据 ods_base_log、业务数据 ods_base_db
2)业务数据的有序性:
maxwell配置,指定生产者分区的key为 table。
1)存储位置,为什么维度表存HBase?
事实表存Kafka、维度表存HBase
基于热存储加载维表的Join方案:
随机查
长远考虑
适合实时读写
2)埋点行为数据分流
(1)修复新老访客(选择性):以前是前端试别新老访客,不够准确
(2)分流:侧输出流
分了3个topic: 启动、页面、曝光
(3)用户跳出、独立访客统计
3)业务数据处理
(1)动态分流:FlinkSQL读取topic_base_db数据,过滤出每张明细表写回kafka
(2)订单预处理表设计:双流join,使用leftjoin
(3)字典表维度退化
4)维度数据写入Hbase
(1)为了避免维度数据发生变化而重启任务,在mysql存一张配置表来动态配置。
动态实现:通过广播状态
=》 读取一张配置表 ===》 维护这张配置表
source来源 sink写到哪 操作类型 字段 主键 扩展
=》实时获取配置表的变化 ==》CDC工具
=》 FlinkCDC
=》 使用了sql的方式,去同步这张配置表
=》sql的数据格式比较方便
(2)怎么写HBase:借助phoenix
没有做维度退化
维表数据量小、变化频率慢
(3)Hbase的rowkey怎么设计的?有没有数据热点问题?
最大的维表:用户维表
=》百万日活,2000万注册用户为例,1条平均1k:2000万*1k=约20G
使用Phoenix创建的盐表,避免数据热点问题
1)为什么选择ClickHouse
(1)适合大宽表、数据量多、聚合统计分析 =》 快
(2)宽表已经不再需要Join,很合适
2)关联维度数据
(1)维度关联方案:预加载、读取外部数据库、双流Join、LookupJoin
(2)项目中读取Hbase中维度数据
(3)优化1:异步IO
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,单个并行可以连续发送多个请求,提高并发效率。
这种方式特别针对涉及网络IO的操作,减少因为请求等待带来的消耗。
Flink在1.2中引入了Async I/O,在异步模式下,将IO操作异步化,单个并行可以连续发送多个请求,哪个请求先返回就先处理,从而在连续的请求间不需要阻塞式等待,大大提高了流处理效率。
Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,解决与外部系统交互时网络延迟成为了系统瓶颈的问题。
(4)优化2:旁路缓存
旁路缓存模式是一种非常常见的按需分配缓存的模式。如图,任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,查询数据库,同时把结果写入缓存以备后续请求使用。
(5)怎么保证缓存一致性
方案1:当我们获取到维表更新的数据,也就是拿到维度表操作类型为update时:
更新Hbase的同时,删除redis里对应的之前缓存的数据
Redis设置了过期时间:24小时
方案2:双写
3)轻度聚合
(1)DWS层要应对很多实时查询,如果是完全的明细那么查询的压力是非常大的。将更多的实时数据以主题的方式组合起来便于管理,同时也能减少维度查询的次数。
(2)开一个小窗口,5s的滚动窗口
(3)同时减轻了写ClickHouse的压力,减少后续聚合的时间
(4)几张表? 表名、字段
访客、商品、地区、关键词
1)实现方案
为可视化大屏服务,提供一个数据接口用来查询ClickHouse中的数据。
2)怎么保证ClickHouse的一致性?
ReplacingMergeTree只能保证最终一致性,查询时的sql语法加上去重逻辑。
3)Flink任务如何监控
Flink和ClickHouse都使用了Prometheus + Grafana。
1)用户信息标签化
2)对标签化的数据的应用(分群、洞察分析)
3)标签如何建模的,有哪些标签
根据用户需求,协调产品经理一起规划了四级标签。前两级是分类,第三级是标签,第四级是标签值。
四个任务:
(1)通过根据每个标签的业务逻辑编写SQL,生产标签单表。
(2)把标签单表合并为标签宽表。
(3)把标签宽表导出到Clickhouse中的标签宽表。
(4)把Clickhouse中的标签表转储为Bitmap表。
四个任务通过编写Spark程序完成。并通过画像平台调度,以后新增标签只需要在平台填写标签定义、SQL及相关参数即可。
(1)标签定义(2)标签任务设定(3)任务调度(4)任务监控(5)分群创建维护(6)人群洞察
(1)画像平台 分群(2)画像平台 其他功能(可选)(3)实时数仓 数据接口
(1)上游: 数仓系统
(2)下游: 写入到Redis中,由广告、运营系统访问。
Bitmap是一个二进制集合,用0或1 标识某个值是否存在。
在求两个集合的交集运算时,不需要遍历两个集合,只要对位进行与运算即可。无论是比较次数的降低(从O(N^2) 到O(N) ),还是比较方式的改善(位运算),都给性能带来巨大的提升。
业务场景:把每个标签的用户id集合放在一个Bitmap中,那多个标签求交集(比如:女性 + 90后)这种分群筛选时,就可以通过两个标签的Bitmap求交集运算即可。
数据湖(Data Lake)是一个存储企业的各种各样原始数据的大型仓库,其中的数据可供存取、处理、分析及传输。Hudi、Iceberg、Data Lake
(1)离线数仓痛点
时效性:T+1模式,时效性差
数据更新只能overwrite,耗费资源
(2)实时数仓痛点
数据一致性问题:维护麻烦
历史数据修正:没有持久化明细数据,需要重跑,流程繁琐
(3)传统的数仓发展方向
流批一体:一套架构,一套代码,可以跑批也可以跑流
==》节省 资源 人力
(4)Hudi数据湖的优势
将离线的时效性降低到了分钟级别(5~10分钟)
本身支持增量处理
数据更新支持upsert
随着大数据技术发展趋势,公司对单一的数据湖和数据架构并不满意,想要去融合数据湖和数据仓库,构建在数据湖低成本的数据存储架构之上,又继承数据仓库的数据处理和管理功能。
业务与实时数仓一致。
1)断点续传采集如何处理
FlinkCDC分为全量和binlog,他们都是基于Flink state的能力,同步过程会将进度存储在state中,如果失败了,下一次会从state中恢复即可。
2)写Hudi表数据倾斜问题
FlinkCDC在全量阶段,读取完一张表后在读取下一张表,如果下游接了多个Sink,则只有一个Sink有数据写入。使用多表混合读取方式解决。
rocksdb +增量
(1)MOR表,离线compaction(不跟写入过程绑定在一起)
(2)相关并发、内存
Compaction、write并发 =》4
内存:compaction =》1G
(3)大状态:rocksdb +增量
(1)解决大状态问题=》不使用多留join,使用“部分列”更新方案
=》hudi<=0.12,官方没有提供,需要自定义payload实现类(大厂实现)
=》0.13.0,官方加入了 部分列更新的payload类
(2)Dws层换成olap=》clickhouse,存储明细
希望dws是一种明细,支持灵活的自助分析==》未来连实时项目也可以干掉
测试服务器一般三台。
有钱的公司和生产环境电脑配置一样。
一般公司测试环境的配置是生产的一半。
一部分自己写Java程序自己造(更灵活),一部分从生产环境上取一部分(更真实)。
先在MySQL的业务库里面把结果计算出来;在给你在ads层计算的结果进行比较;
需要造一些特定的测试数据,测试。
从生产环境抓取一部分数据,数据有多少你是知道的,运算完毕应该符合你的预期。
离线数据和实时数据分析的结果比较。(日活1万 实时10100),倾向取离线。
算法异构
实时数据质量监控(脚本、调度器、可视化、故障报警)
(1)样本数据验证:从计算结果抽取部分样本数据,与业务部门实际数据对比。
(2)逻辑验证:检查指标的计算sql是否正确
(3)指标间关系验证:比较不同指标间关系,检查它们是否符合预期。例如:某个指标是另外一个指标的累计值,那么这两个指标一定存在关系
(4)历史数据对比:将计算结果和过去数据进行对比,观察指标的变化趋势
(5)异常值检测:检查计算结果中是否存在异常值
(6)跨数据部门对比:可以将计算结果与其它部门或团队的数据进行对比,进一步验证
大公司:上线的时候,将脚本打包,提交git。先发邮件抄送经理和总监,运维。运维负责上线。
小公司:跟项目经理说一下,项目经理技术把关,项目经理通过了就可以上线了。风险意识。
所谓的上线就是编写脚本,并在DolphinScheduler中进行作业调度。
1)什么是 A/B 测试?
A / B测试本质上是一种实验,即随机向用户显示变量的两个或多个版本,并使用统计分析来确定哪个变量更适合给定的转化目标。
2)为什么要做A/B测试?
举例:字节跳动有一款中视频产品叫西瓜视频,最早它叫做头条视频。为了提升产品的品牌辨识度,团队想给它起个更好的名字。经过一些内部调研和头脑风暴,征集到了西瓜视频、奇妙视频、筷子视频、阳光视频4个名字,于是团队就针对一共5个APP 名称进行了A/B实验。
这个实验中唯一改变的是应用市场里该产品的名称和对应的logo,实验目的是为了验证哪一个应用名称能更好地提升“头条视频”APP在应用商店的点击率。最后西瓜视频和奇妙视频的点击率位列前二,但差距不显著,结合用户调性等因素的综合考量,最终决定头条视频正式更名为西瓜视频。
通过这个案例可以看到,A/B测试可以帮助业务做最终决策。结合案例的直观感受,我们可以这样来定义A/B 测试:在同一时间对目标受众做科学抽样、分组测试以评估效果。
以上图图示为例,假设我们有100万用户要进行A/B测试:
先选定目标受众,比如一线城市的用户。A/B测试不可能对所有用户都进行实验,所以要进行科学抽样,选择小部分流量进行实验。
3)哪个首页新UI版本更受欢迎
今日头条UI整体风格偏大龄被诟病已久,不利于年轻和女性用户泛化,历史上几次红头改灰头实验都对大盘数据显著负向。因此团队设计了A/B实验,目标是在可接受的负向范围内,改一版用户评价更好的UI。通过控制变量法,对以下变量分别开展数次A/B实验:
头部色值饱和度、字号、字重、上下间距、左右间距、底部 tab icon。
结合用户调研(结果显示:年轻用户和女性用户对新 UI 更偏好)。
综合来看,效果最好的 UI 版本如下图所示,全量上线。
新 UI 上线后,Stay duration 显著负向从-0.38% 降至 -0.24%,图文类时长显著 +1.66%,搜索渗透显著 +1.47%,高频用户(占 71%)已逐渐适应新 UI。
以下是活跃用户需求的整体开发流程。
产品经理负责收集需求:需求来源与客户反馈、老板的意见。
第1步:确定指标的业务口径
由产品经理主导,找到提出该指标的运营负责人沟通。首先要问清楚指标是怎么定义的,比如活跃用户是指启动过APP的用户。设备id 还是用户id。
产品经理先编写需求文档并画原型图。=》需求不要口头说。
第2步:需求评审
由产品经理主导设计原型,对于活跃主题,我们最终要展示的是最近n天的活跃用户数变化趋势 ,效果如下图所示。此处大数据开发工程师、后端开发工程师、前端开发工程师一同参与,一起说明整个功能的价值和详细的操作流程,确保大家理解的一致。
工期:
接口:数据格式、字段类型、责任人。
第3步:大数据开发
大数据开发工程师,通过数据同步的工具如Flume、Datax、Maxwell等将数据同步到ODS层,然后就是一层一层的通过SQL计算到DWD、DWS层,最后形成可为应用直接服务的数据填充到ADS层。
第4步:后端开发
后端工程师负责,为大数据工程师提供业务数据接口。
同时还负责读取ADS层分析后,写入MySQL中的数据。
第5步:前端开发
前端工程师负责,前端埋点。
对分析后的结果数据进行可视化展示。
第6步:联调
此时大数据开发工程师、前端开发工程师、后端开发工程师都要参与进来。此时会要求大数据开发工程师基于历史的数据执行计算任务,大数据开发工程师承担数据准确性的校验。前后端解决用户操作的相关BUG保证不出现低级的问题完成自测。
第7步:测试
测试工程师对整个大数据系统进行测试。测试的手段包括,边界值、等价类等。
提交测试异常的软件有:禅道(测试人员记录测试问题1.0,输入是什么,结果是什么,跟预期不一样->需要开发人员解释,是一个bug,下一个版本解决1.1->测试人员再测试。测试1.1ok->测试经理关闭bug)
1周开发写代码 =》 2周测试时间
第8步:上线
运维工程师会配合我们的前后端开发工程师更新最新的版本到服务器。此时产品经理要找到该指标的负责人长期跟进指标的准确性。重要的指标还要每过一个周期内部再次验证,从而保证数据的准确性。
敏捷开发(少量需求=>代码编写=>测试=>少量需求=>代码编写=>测试…),又叫小步快跑。
差不多一个月会迭代一次。每月都有节日(元旦、春节、情人节、3.8妇女节、端午节、618、国庆、中秋、1111/6.1/5.1、生日、周末)新产品、新区域。
就产品或我们提出优化需求,然后评估时间。每周我们都会开会做下周计划和本周总结。(日报、周报、月报、季度报、年报)需求1周的时间,周三一定完成。周四周五(帮同事写代码、自己学习工作额外的技术)。
5.1.2
5是大版本号:必须是重大升级
1:一般是核心模块变动
2:一般版本变化
(1)刚入职第一个需求大概需要7天左右。对业务熟悉后,平均一天一个需求。
(2)影响时间的因素:对业务熟悉、开会讨论需求、表的权限申请、测试等。新员工培训(公司规章制度、代码规范)
(1)新需求(活动、优化、新产品、新市场)。 60%
(2)故障分析:数仓的任何步骤出现问题,需要查看问题,比如日活,月活下降或快速上升等。20%
(3)新技术的预言(比如湖仓一体 数据湖 Doris 实时数据质量监控)10%
(4)其临时任务 10%
(5)晨会-》10做操-》讨论中午吃什么-》12点出去吃1点-》睡到2点-》3点茶歇水果-》晚上吃啥-》吃加班餐-》开会-》晚上6点吃饭-》7点开始干活-10点-》11点
元数据管理目前开源的框架中,Atlas框架使用的较多。再就是采用自研的系统。
1)元数据管理底层实现原理
解析如下HQL,获取对应的原数据表和目标表直接的依赖关系。
insert into table ads_user
select id, name from dws_user
依赖关系能够做到:表级别和字段级别 neo4j
2)用处:作业执行失败,评估他的影响范围。主要用于表比较多的公司
atlas版本问题:
0.84版本:2019-06-21
2.0版本:2019-05-13
框架版本:
Apache 0.84 2.0 2.1
CDH 2.0
3)自研的元数据管理
实现数据质量检测的功能,我们需要首先明确数据质量的维度,例如准确性、完整性、唯一性、及时性和一致性。
1)确定数据源
确定需要进行数据质量检测的数据源。这可能是数据库表、文件、API等。
2)定义质量规则(*)
为每个数据质量维度定义具体的规则。例如:
–准确性:检查数据是否符合预期的范围或分布。
–完整性:检查数据是否存在缺失值或空值。
–唯一性:检查数据中是否存在重复项。
–及时性:检查数据是否在预期的时间范围内更新。
–一致性:检查数据是否符合预定义的格式或标准。
3)实现检测功能
使用编程语言(如SQL)编写检测数据质量的函数。这些函数可以包括:
–数据导入:从数据源导入数据。
–数据清理:对数据进行预处理,如去除空格、转换数据类型等。
–应用质量规则:根据定义的质量规则,实现相应的检测函数。例如,检查缺失值、重复项或数据范围等。
–输出报告:生成数据质量报告,如将检测结果汇总成表格或可视化图表。
4)自动化和监控
将数据质量检测功能集成到数据管道或ETL过程中,以实现自动化检测。此外,可以设置监控和警报机制,以便在检测到数据质量问题时及时通知相关人员。
一张表的记录数在一个已知的范围内,或者上下浮动不会超过某个阈值
日活、周活、月活、留存(日周月)、转化率(日、周、月)GMV(日、周、月)
复购率(日周月) 30%
某个字段为空的记录数在一个范围内,或者占总量的百分比在某个阈值范围内
一个或多个字段是否满足某些规则
一个或多个字段没有重复记录
主要针对同步流程,监控两张表的数据量是否一致
资产健康度量化模型。
根据数据资产健康管理的关键因素,明确量化积分规则。根据数据基础信息完整度、数据存储和数据计算健康度、数据质量监控规则合理性等,完整计算数据资产健康分。
1)资产健康分基础逻辑
(1)健康分基本设定原则:
(2)数据表资产健康分:
数据表资产健康分score =(规范合规健康分*10% + 存储健康分*30% + 计算健康分*30% + 数据质量健康分*15% + 数据安全健康分 * 15%);
2)数据资产特征列表:
资产健康类型 | 特征 | 特征分计算逻辑 |
规范 Specification 规范健康分= 100 * sum(特征分)/count(特征) | 有技术owner | 0/1 |
有业务owner | 0/1 | |
有分区信息 | 0/1 | |
有归属部门 | 0/1 | |
表命名合规 | 0/1 | |
数仓分层合规 | 0/1 | |
表有备注信息 | 0/1 | |
字段有备注信息 | 有备注字段数 / 总字段数 | |
存储 Storage 存储健康分= 100 * 完成度 | 生命周期合理性 |
|
计算 Calc 计算健康分= 100 * sum(特征分)/count(特征) | hdfs路径被删除 | 被删除,0分;否则为1 |
产出为空 | 连续15天无数据产出,0分;否则为1; | |
产出表未被读取 | 最近30天产出数据表无读取,0分;否则为1; | |
运行出错 | 最近3天任务运行有出错,0分;否则为1; | |
重复表/相似表 | 与其他数据表50%相似,则为0分,否则为1; | |
责任人不合理 | 对应的调度节点责任人已经离职,或调度节点责任人在职但与数据表责任人不一致 ,0分;否则为1; | |
简单加工 | 生产sql只简单 select字段出来,没有 join、group;where 条件只有分区字段 ;0分,否则为1; | |
暴力扫描 | 表中被查询分区大于 90 天,同时被查询分区的总存储量大于 100G ;0分,否则为1; | |
两侧类型不一致 | 类似这个例子: select ... from table1 t1 join table2 t2 on t1.a_bigint=t2.a_string; 这种情况在 on 条件中两边都被 double了,这个其实不合理;是个大坑, 会导致行为和用户期待不一致 | |
数据倾斜 | 长尾运行实例耗费时⻓高于平均值 20%,分数记为0,否则记为1; | |
需要列剪裁 | 判断 select 语句及后续使用逻辑,是否 select 出来的列都被使 用,用被使用的列数/总 select 列数计算使用率,低于 50% 就需要列剪裁,0分,否则为1; | |
质量 Quality 质量健康分= 100 * sum(特征分)/count(特征) | 表产出时效监控 | qdc有定义产出时间预警或已经归属于某个生产基线; 0/1; |
表内容监控 | 有配置表级规则; 0/1; | |
字段内容监控 | 有配置字段级规则; 0/1; | |
表产出SLA | X点及时性SLA测量函数:
| |
表内容SLA | 1-触发监控规则/总监控规则数 | |
字段内容SLA | 1-触发监控规则/总监控规则数 | |
安全 Security 安全健康分= 100 * sum(特征分)/count(特征) | 数据分类 | 有明确设置归属的“资产目录” ; 0/1; |
资产分级 | 有指定资产等级; 0/1; | |
字段级安全等级 | 有字段设置了安全等级; 0/1; |
1)什么是前台?
首先,这里所说的“前台”和“前端”并不是一回事。所谓前台即包括各种和用户直接交互的界面,比如web页面,手机app;也包括服务端各种实时响应用户请求的业务逻辑,比如商品查询、订单系统等等。
2)什么是后台?
后台并不直接面向用户,而是面向运营人员的配置管理系统,比如商品管理、物流管理、结算管理。后台为前台提供了一些简单的配置。
3)为什么要做中台
传统项目痛点:重复造轮子。
1)SuperCell公司
2)阿里巴巴提出了“大中台,小前台”的战略
3)华为提出了“平台炮火支撑精兵作战”的战略
1)业务中台 & 技术中台
图 业务中台 图 技术中台
2)数据中台 & 算法中台
图 数据中台 图 算法中台
1)从0到1的阶段,没有必要搭建中台。
从0到1的创业型公司,首要目的是生存下去,以最快的速度打造出产品,证明自身的市场价值。
这个时候,让项目野蛮生长才是最好的选择。如果不慌不忙地先去搭建中台,恐怕中台还没搭建好,公司早就饿死了。
2)从1到N的阶段,适合搭建中台。
当企业有了一定规模,产品得到了市场的认可,这时候公司的首要目的不再是活下去,而是活的更好。
这个时候,趁着项目复杂度还不是特别高,可以考虑把各项目的通用部分下沉,组建中台,以方便后续新项目的尝试和旧项目的迭代。
3)从N到N+1的阶段,搭建中台势在必行。
当企业已经有了很大的规模,各种产品、服务、部门错综复杂,这时候做架构调整会比较痛苦。
但是长痛不如短痛,为了项目的长期发展,还是需要尽早调整架构,实现平台化,以免日后越来越难以维护。
牵一发动全身,中台细小的改动,都需要严格测试。周期比较长。
大厂一般有总的中台,也有部门级别的中台,保证效率。
在计算机算法理论中,用时间复杂度和空间复杂度来分别从这两方面衡量算法的性能。
1)时间复杂度(Time Complexity)
算法的时间复杂度,是指执行算法所需要的计算工作量。
一般来说,计算机算法是问题规模n 的函数f(n),算法的时间复杂度也因此记做:T(n)= Ο(f(n))。
问题的规模n 越大,算法执行的时间的增长率与f(n)的增长率正相关,称作渐进时间复杂度(Asymptotic Time Complexity)。
2)空间复杂度
算法的空间复杂度,是指算法需要消耗的内存空间。有时候做递归调用,还需要考虑调用栈所占用的空间。
其计算和表示方法与时间复杂度类似,一般都用复杂度的渐近性来表示。同时间复杂度相比,空间复杂度的分析要简单得多。
所以,我们一般对程序复杂度的分析,重点都会放在时间复杂度上。
1)暴力求解
不推荐。
2)动态规划
动态规划(Dynamic Programming,DP)是运筹学的一个分支,是求解决策过程最优化的过程。
动态规划过程是:把原问题划分成多个“阶段”,依次来做“决策”,得到当前的局部解;每次决策,会依赖于当前“状态”,而且会随即引起状态的转移。
这样,一个决策序列就是在变化的状态中,“动态”产生出来的,这种多阶段的、最优化决策,解决问题的过程就称为动态规划(Dynamic Programming,DP)。
3)分支
对于复杂的最优化问题,往往需要遍历搜索解空间树。最直观的策略,就是依次搜索当前节点的所有分支,进而搜索整个问题的解。为了加快搜索进程,我们可以加入一些限制条件计算优先值,得到优先搜索的分支,从而更快地找到最优解:这种策略被称为“分支限界法”。
分支限界法常以广度优先(BFS)、或以最小耗费(最大效益)优先的方式,搜索问题的解空间树。
冒泡排序是一种简单的排序算法。
它的基本原理是:重复地扫描要排序的数列,一次比较两个元素,如果它们的大小顺序错误,就把它们交换过来。这样,一次扫描结束,我们可以确保最大(小)的值被移动到序列末尾。这个算法的名字由来,就是因为越小的元素会经由交换,慢慢“浮”到数列的顶端。
冒泡排序的时间复杂度为O(n2)。
public void bubbleSort(int nums[]) {
int n = nums.length;
for(int i = 0; i < n - 1; i++) {
for(int j = 0; j < n - i - 1; j++) {
if(nums[j + 1] < nums[j])
swap(nums, j, j + 1);
}
}
}
快速排序的基本思想:通过一趟排序,将待排记录分隔成独立的两部分,其中一部分记录的关键字均比另一部分的关键字小,则可分别对这两部分记录继续进行排序,以达到整个序列有序。
快排应用了分治思想,一般会用递归来实现。
快速排序的时间复杂度可以做到O(nlogn),在很多框架和数据结构设计中都有广泛的应用。
public void qSort(int[] nums, int start, int end){
if (start >= end) return;
int mid = partition(nums, start, end);
qSort(nums, start, mid - 1);
qSort(nums, mid + 1, end);
}
// 定义分区方法,把数组按一个基准划分两部分,左侧元素一定小于基准,右侧大于基准
private static int partition( int[] nums, int start, int end ){
// 以当前数组起始元素为pivot
int pivot = nums[start];
int left = start;
int right = end;
while ( left < right ){
while ( left < right && nums[right] >= pivot )
right --;
nums[left] = nums[right];
while ( left < right && nums[left] <= pivot )
left ++;
nums[right] = nums[left];
}
nums[left] = pivot;
return left;
}
归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法(Divide and Conquer)的一个非常典型的应用。
将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为2-路归并。
归并排序的时间复杂度是O(nlogn)。代价是需要额外的内存空间。
public void mergeSort(int[] nums, int start, int end){
if (start >= end ) return;
int mid = (start + end) / 2;
mergeSort(nums, start, mid);
mergeSort(nums, mid + 1, end);
merge(nums, start, mid, mid + 1, end);
}
private static void merge(int[] nums, int lstart, int lend, int rstart, int rend){
int[] result = new int[rend - lstart + 1];
int left = lstart;
int right = rstart;
int i = 0;
while (left <= lend && right <= rend){
if (nums[left] <= nums[right])
result[i++] = nums[left++];
else
result[i++] = nums[right++];
}
while (left <= lend)
result[i++] = nums[left++];
while (right <= rend)
result[i++] = nums[right++];
System.arraycopy(result, 0, nums, lstart, result.length);
}
题目:求下面二叉树的各种遍历(前序、中序、后序、层次)
给定一个n个元素有序的(升序)整型数组nums和一个目标值target,写一个函数搜索nums中的target,如果目标值存在返回下标,否则返回-1。
二分查找也称折半查找(Binary Search),它是一种效率较高的查找方法,前提是数据结构必须先排好序,可以在对数时间复杂度内完成查找。
二分查找事实上采用的就是一种分治策略,它充分利用了元素间的次序关系,可在最坏的情况下用O(log n)完成搜索任务。
/**
* @param a 要查找的有序int数组
* @param key 要查找的数值元素
* @return 返回找到的元素下标;如果没有找到,返回-1
*/
public int binarySearch(int[] a, int key){
int low = 0;
int high = a.length - 1;
if ( key < a[low] || key > a[high] )
return -1;
while ( low <= high){
int mid = ( low + high ) / 2;
if( a[mid] < key)
low = mid + 1;
else if( a[mid] > key )
high = mid - 1;
else
return mid;
}
return -1;
}
题目:一只青蛙一次可以跳上1级台阶,也可以跳上2级台阶。求该青蛙上一个n级台阶总共有多少种跳法?
题目:给你一个字符串s,找到s中最长的回文子串。
实例:
输入:s = “babad”
输出:“bab”
解释:“aba”也是符合题意答案
题目:现在有一个只包含数字的字符串,将该字符串转化成IP地址的形式,返回所有可能的情况。
例如:
给出的字符串为“25525511135”
返回["255.255.11.135", "255.255.111.35"](顺序没有关系)
1)方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。
2)方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
外排序(External Sorting)是解决大数据排序问题的一种常用方法。它将整个数据集分为多个小块,每个小块可以完全放入内存中进行排序,然后再将这些有序的小块逐一合并,直到整个数据集被完全排序。
具体实现上,可以采用以下步骤:
将100G数据分为若干个大小相等的块(如1G/块),将每个块读入内存中,使用快速排序等排序算法进行排序,然后将这些有序的块暂存到磁盘上。
依次从每个排好序的小块中读取第一个数,将它们归并成一个有序序列,并将这个序列输出到磁盘中。
如果有多个块输出到了同一个归并块中,则需要在内存中对这些数进行归并排序,然后再输出到磁盘中。
重复步骤2和步骤3,直到所有块都被归并排序,并得到最终的有序序列。
此外,为了提高效率,还可以采用多路归并排序(Merge Sort),将多个有序序列同时归并,提高归并排序的效率。
select 2d from t_order where 2d in (SELECT 2d from t_order_f)
对于这条 SQL 语句,可以使用内连接(INNER JOIN)来代替子查询(IN)。这通常可以提高查询性能,因为内连接在大多数数据库系统中的性能优化更为成熟。以下是优化后的 SQL 语句:
SELECT t1.2d
FROM t_order t1
INNER JOIN t_order_f t2 ON t1.2d = t2.2d
优化后,如何判断效率提升了多少?
查看执行时间:执行优化前后的 SQL 语句,比较它们的执行时间。执行时间的减少表示性能得到了提升。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。