赞
踩
本文最新版已发布至公众号【五分钟学大数据】
获取此套面试题最新pdf版,请搜索公众号【五分钟学大数据】,对话框发送 面试宝典
扫码获取最新PDF版:
版本 | 时间 | 描述 |
V1.0 | 2020-02-18 | 创建 |
V1.2 | 2020-06-17 | 新增 spark 、flink相关面试题 |
V1.3 | 2021-03-18 | 新增 java、JVM、mysql、JUC等 |
本文档更新只发于公众号:五分钟学大数据
本套面试题堪称史上最全,既有面试技巧,面试流程,还有技术总结,面试真题,包含算法,Java,Mysql,大数据框架,大数据项目等(持续更新中… 最新版请扫描下方二维码关注公众号:五分钟学大数据,回复【面试宝典】获取)。
第一版是按照大数据技术进行划分(另一版,可在公众号【五分钟学大数据】后台发送 面试 获取),第二版是综合版 (此版)。
扫码关注公众号,获取最新PDF版:(对话框发送 面试宝典) |
目 录
2.2.4 CTO/技术架构师等面试(如果过了技术经理面试). 4
2.7.3 自我介绍(控制在4分半以内,不超过5分钟). 8
5.2.2 Hadoop配置文件以及简单的Hadoop集群搭建. 25
5.2.4 MapReduce的Shuffle过程及Hadoop优化(包括:压缩、小文件、集群优化). 26
5.2.6 Yarn的默认调度器、调度器分类、以及他们之间的区别. 29
5.4.1 Flume组成,Put事务,Take事务. 34
5.4.5 Flume采集数据会丢失吗?(防止数据丢失的机制). 36
5.5.14 Kafka消息数据积压,Kafka消费能力不足怎么处理?. 40
5.7.6 hbase-default.xml中相关参数. 53
5.8.1 Sqoop导入导出Null存储一致性问题. 54
5.8.4 Sqoop数据导出的时候一次执行多长时间. 55
5.9.7 Some、None、Option的正确使用. 57
5.10.1 Spark有几种部署方式?请分别简要论述. 57
5.10.2 Spark任务使用什么进行提交,javaEE界面还是脚本. 58
5.10.4 简述Spark的架构与作业提交流程(画图讲解,注明各个部分的作用)(重点). 59
5.10.5 如何理解Spark中的血统概念(RDD)(笔试重点). 59
5.10.6 简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数? (笔试重点). 59
5.10.7 请列举Spark的transformation算子(不少于8个),并简述功能(重点). 59
5.10.8 请列举Spark的action算子(不少于6个),并简述功能(重点). 60
5.10.9 请列举会引起Shuffle过程的Spark算子,并简述功能。. 61
5.10.11 Spark常用算子reduceByKey与groupByKey的区别,哪一种更具优势?(重点). 62
5.10.12 Repartition和Coalesce关系与区别. 63
5.10.13 分别简述Spark中的缓存机制(cache和persist)与checkpoint机制,并指出两者的区别与联系. 63
5.10.14 简述Spark中共享变量(广播变量和累加器)的基本原理与用途。(重点). 63
5.10.15 当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?. 64
5.10.16 简述SparkSQL中RDD、DataFrame、DataSet三者的区别与联系? (笔试重点). 64
5.10.17 SparkSQL中join操作与left join操作的区别?. 65
5.10.18 SparkStreaming有哪几种方式消费Kafka中的数据,它们之间的区别是什么? (重点). 65
5.10.19 简述SparkStreaming窗口函数的原理(重点). 66
5.10.20 请手写出wordcount的Spark代码实现(Scala)(手写代码重点). 67
5.10.21 如何使用Spark实现topN的获取(描述思路或使用伪代码)(重点). 67
5.10.22 京东:调优之前与调优之后性能的详细对比(例如调整map个数,map个数之前多少、之后多少,有什么提升). 67
5.11.2 Flink相比Spark Streaming有什么区别?. 68
5.11.4 Flink的并行度有了解吗?Flink中设置并行度需要注意什么?. 76
5.11.5 Flink支持哪几种重启策略?分别如何配置?. 76
5.11.6 Flink的分布式缓存有什么作用?如何使用?. 76
5.11.7 Flink中的广播变量,使用广播变量需要注意什么事项?. 77
5.11.8 Flink中对窗口的支持包括哪几种?说说他们的使用场景. 77
5.11.9 Flink 中的 State Backends是什么?有什么作用?分成哪几类?说说他们各自的优缺点?. 78
5.11.10 Flink中的时间种类有哪些?各自介绍一下?. 79
5.11.11 WaterMark是什么?是用来解决什么问题?如何生成水印?水印的原理是什么?. 79
5.11.12 Flink的table和SQL熟悉吗?Table API和SQL中TableEnvironment这个类有什么作用. 79
5.11.14 Flink是如何做到批处理与流处理统一的?. 80
5.11.15 Flink中的数据传输模式是怎么样的?. 81
5.11.17 Flink在使用Window时出现数据倾斜,你有什么解决办法?. 82
5.11.18 Flink任务,delay极高,请问你有什么调优策略?. 82
7.5 Kafka消息数据积压,Kafka消费能力不足怎么处理?. 92
8.2.4 分析过哪些指标(一分钟至少说出30个指标). 99
8.2.6 数据仓库每天跑多少张表,大概什么时候运行,运行多久?. 103
8.2.8 数仓中用到过哪些Shell脚本及具体功能. 104
8.2.13 项目在3年内迭代次数,每一个项目具体是如何迭代的。. 105
9.5 String buffer和String build区别. 110
9.6 Final、Finally、Finalize 110
10.1 缓存雪崩、缓存穿透、缓存预热、缓存更新、缓存降级. 111
12.1 JVM内存分哪几个区,每个区的作用是什么? 128
12.4 如何判断一个对象是否存活?(或者GC对象的判定方法) 131
12.6 简述Java内存分配与回收策略以及Minor GC和Major GC(full GC) 132
13.1 Synchronized与Lock的区别. 132
1.1.1
简历编写可在公众号【五分钟学大数据】后台发送:简历,获取大数据简历的模板,包含各个行业及各个项目,非常全。可直接扫码:
如能内推,尽量走内推路线,可免简历筛选,如果找不到内推路径,可在牛客网上搜内推码,上面有很多公司的员工发内推码。
招聘软件推荐 Boss直聘,这个软件还是很靠谱的,其他的智联招聘,拉勾,前程无忧,猎聘也不错。
选择题:基础知识类型的选择题(编程语言,数据库,操作系统,大数据要点知识,网络,计算机组成原理等)
简答题:
编程题:算法题(考察逻辑思维,考察解答问题的方式方法等)
数据库类型的SQL编写题目(MySQL或者Hive)这是必出题,也是分必拿题。否则就全盘覆没了。
SQL题:必有(Hive和MySQL)
场景题:
给你假定一种场景,让你给出解决方案,面试问的最多的也是这种
遇到了问题(集群节点宕机,任务运行出错,数据丢失等),该如何解决?
简单脚本题:
写出一个简单的数据处理脚本,或者运维脚本(会的就自己写,不会的就百度搜,或者求救于小伙伴)
聊基本情况,聊工作经历,聊人生价值观,聊对工作的态度,聊方方面面,就是不聊技术
请事先准备好针对你个人基本情况的面试题
1、比如你原来在上海,为何10月份来北京找工作?
2、比如工作经历中,有4个月是不上班的,怎么解释?
.....
也就是做题
当然技术面试过程中,也有可能会出现手写代码的问题
经典手写代码:
基本只聊技术。
基本套路:
考察广度的同时,也会考察深度
通俗的说,也就是会在不同的方向,不同的领域问各种问题,然后针对你能回答的问题,就深入探讨,以此得知你对这门技术的掌握程度
所以:
考察广度,就是看你的技术领域分布
考察深度,就是看你你对这个技术掌握程度如何
总体来说:
到了这种级别的面试,一般考察技术的就少了。
更多的是考察发展,眼光更长远的,考察你的职业生涯规划,考察你的价值观,考察你是否符合公司的长期发展战略
能了解公司更多信息,就切合公司实际去描述
如果不了解公司的,那就尽量按照通用套路说
少数场景会有的。但是不要慌。按照自己的本事来。
有些是现场的,这种很少。有些是给出需求之后让你回来之后自己做。
编程语言方向
java是重点(重中之重),其次:scala、python
java中的重点考察方向:
集合(优缺点,底层实现,更好的替代方案,如何根据场景选择和使用)
并发(锁,JMM,各种关键字(volite),技术点,线程池,......)
面向对象
数据库方向
mysql是重点,其次是hbase,redis
sql语句的编写和优化就不说了,没有不考查的
Hadoop体系/Spark体系
架构原理
工作机制
典型的常见流程
某个功能的详细分析
问题和运维难点
集群规模/集群规划
任务多少
任务运行总时长
每天数据量
总数据量
多少条记录
每条记录多大
每条记录多少个字段
hive的总表数等等
其他知识:
ElasticSearch
Flink
机器学习
架构
优化
源码
数据结构
算法
追根究底的问各种你答的上来的东西的底层实现细节,直到你答不上来为止,或者到他满意为止
关于HashMap的问题:
别人说的合理的给予赞同和钦佩
别人说的你不赞同的你不要反对,你可以用另外一种方式表现出你的看法和意见,这是讨论和交流,不是针锋相对
大胆的说出自己各个方面的优势和特长
不要写消遣类的爱好,比如爬上,唱歌看电影
不要谈自己真实问题;用“缺点”衬托自己的优点
公司希望我入职后的3-6个月内,给公司解决什么样的问题
公司(或者对这个部门)未来的战略规划是什么样子的?
一周左右,如果公司需要,可以适当提前
时间、公司名称、任职岗位、主要工作内容、工作业绩、离职原因
刨根问底下沉式追问(注意是下沉式,而不是发散式的)
基本技巧:往自己熟悉的方向说
|
实现代码:
|
拓展需求:当一个有序数组中,有多个相同的数值时,如何将所有的数值都查找到。
代码实现如下:
|
代码实现:
|
核心思想:不断的将大的数组分成两个小数组,直到不能拆分为止,即形成了单个值。此时使用合并的排序思想对已经有序的数组进行合并,合并为一个大的数据,不断重复此过程,直到最终所有数据合并到一个数组为止。
代码实现:
|
定义节点以及前序、中序、后序遍历
|
定义二叉树,前序、中序、后序遍历,前序、中序、后序查找,删除节点
|
|
数据仓库的输入数据源和输出系统分别是什么?
输入系统:埋点产生的用户行为数据、JavaEE后台产生的业务数据。
输出系统:报表系统、用户画像系统、推荐系统
服务器使用物理机还是云主机?
属于研发部,技术总监下面有各个项目组,我们属于数据组,其他还有后端项目组,基础平台等。总监上面就是副总等级别了。其他的还有产品运营部等。
职级就分初级,中级,高级。晋升规则不一定,看公司效益和职位空缺。
小型公司(3人左右):组长1人,剩余组员无明确分工,并且可能兼顾javaEE和前端。
中小型公司(3~6人左右):组长1人,离线2人左右,实时1人左右(离线一般多于实时),JavaEE 1人(有或者没有人单独负责JavaEE,有时是有组员大数据和JavaEE一起做,或者大数据和前端一起做)。
中型公司(5~10人左右):组长1人,离线3~5人左右(离线处理、数仓),实时2人左右,JavaEE 1人左右(负责对接JavaEE业务),前端1人左右(有或者没有人单独负责前端)。
中大型公司(5~20人左右):组长1人,离线5~10人(离线处理、数仓),实时5人左右,JavaEE2人左右(负责对接JavaEE业务),前端1人(有或者没有人单独负责前端)。(发展比较良好的中大型公司可能大数据部门已经细化拆分,分成多个大数据组,分别负责不同业务)
上面只是参考配置,因为公司之间差异很大,例如ofo大数据部门只有5个人左右,因此根据所选公司规模确定一个合理范围,在面试前必须将这个人员配置考虑清楚,回答时要非常确定。
序号 | 命令 | 命令解释 |
1 | top | 查看内存 |
2 | df -h | 查看磁盘存储情况 |
3 | iotop | 查看磁盘IO读写(yum install iotop安装) |
4 | iotop -o | 直接查看比较高的磁盘读写程序 |
5 | netstat -tunlp | grep 端口号 | 查看端口占用情况 |
6 | uptime | 查看报告系统运行时长及平均负载 |
7 | ps aux | 查看进程 |
awk、sed、cut、sort
Hadoop 2.x 版本:
Hadoop 3.x版本:(仅列出相对于2.x版本有更改的端口,箭头前为2.x,箭头后为3.x)
NameNode 的端口:
50070 --> 9870
8020 --> 9820,
50470 --> 9871
Secondary NameNode的端口:
50091 --> 9869
50090 --> 9868
DataNode 的端口:
50020 --> 9867
50010 --> 9866
50475 --> 9865
50075 --> 9864
Hadoop KMS 的端口:
16000 --> 9600
core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml
hadoop-env.sh、yarn-env.sh、mapred-env.sh、slaves
JDK安装
配置SSH免密登录
配置hadoop核心文件:
格式化namenode
HDFS读数据:
HDFS写数据:
MapReduce的详细工作流程:
一、Shuffle机制
二、Hadoop优化
三、压缩
压缩格式 | Hadoop自带? | 算法 | 文件扩展名 | 支持切分 | 换成压缩格式后,原来的程序是否需要修改 |
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 否,需要安装 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
提示:如果面试过程问起,我们一般回答压缩方式为Snappy,特点速度快,缺点无法切分(可以回答在链式MR中,Reduce端输出使用bzip2压缩,以便后续的map任务对数据进行split)
四、切片机制
提示:切片大小公式:max(0,min(Long_max,blockSize))
FIFO(先进先出调度器)、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
Hadoop2.7.2默认的资源调度器是 容量调度器
FIFO调度器:先进先出,同一时间队列中只有一个任务在执行。
容量调度器:多队列;每个队列内部先进先出,同一时间队列中只有一个任务在执行。队列的并行度为队列的个数。
公平调度器:多队列;每个队列内部按照缺额大小分配资源启动任务,同一时间队列中有多个任务执行。队列的并行度大于等于队列的个数。
启用lzo的压缩方式对于小规模集群是很有用处,压缩比率大概能降到原始日志大小的1/3。同时解压缩的速度也比较快。
Hadoop默认不支持LZO压缩,如果需要支持LZO压缩,需要添加jar包,并在hadoop的cores-site.xml文件中添加相关压缩配置。
maven(下载安装,配置环境变量,修改sitting.xml加阿里云镜像)
gcc-c++
zlib-devel
autoconf
automake
libtool
通过yum安装即可
yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
tar -zxvf lzo-2.10.tar.gz
cd lzo-2.10
./configure -prefix=/usr/local/hadoop/lzo/
make
make install
下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip
<hadoop.current.version>2.7.2</hadoop.current.version>
export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
export LIBRARY_PATH=/usr/local/hadoop/lzo/lib
mvn package -Dmaven.test.skip=true
如${HADOOP_HOME}/share/hadoop/common
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>
注意:
LZO本身是不支持分片的,但是我们给LZO压缩的文件加上索引,就支持分片了
dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为10台时,此参数设置为60
Hadoop 带有一些基准测试程序,可以最少的准备成本轻松运行。基准测试被打包在测试程序JAR文件中,通过无参调用JAR文件可以得到其列表
[root@node1 hadoop-2.6.0-cdh5.14.0]# bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0-tests.jar
测试写入10个10M文件
[root@node1 hadoop-2.6.0-cdh5.14.0]# bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB
[root@node1 hadoop-2.6.0-cdh5.14.0]# bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 10MB
[root@node1 hadoop-2.6.0-cdh5.14.0]# cat TestDFSIO_results.log
----- TestDFSIO ----- : write
Date & time: Fri Nov 08 10:43:20 CST 2019
Number of files: 10
Total MBytes processed: 100.0
Throughput mb/sec: 4.551039912620034
Average IO rate mb/sec: 5.2242536544799805
IO rate std deviation: 2.1074511594328245
Test exec time sec: 52.394
----- TestDFSIO ----- : read
Date & time: Fri Nov 08 10:45:28 CST 2019
Number of files: 10
Total MBytes processed: 100.0
Throughput mb/sec: 73.85524372230428
Average IO rate mb/sec: 135.5804901123047
IO rate std deviation: 111.20953898062095
Test exec time sec: 28.231
半数机制
三个核心选举原则:
比如:
集群半数以上存活,集群可用
假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的
ls、get、create
Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。
Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。
Kafka Channel:减少了Flume的Sink阶段,提高了传输效率。
Source到Channel是Put事务
Channel到Sink是Take事务
项目中自定义了:ETL拦截器和区分类型拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
Ganglia监控
加州伯克利大学千禧计划的其中一个开源项目.是一个集群汇总监控用的的软件,和Cacti不同,cacti是详细监控集群中每台服务器的运行状态,而Ganglia是将集群中的服务器数据进行汇总然后监控。有时通过cacti或者zabbix(监控软件)看不出来的集群总体负载问题,却能够在Ganglia中体现。被监控的主机(即client)安装ganglia-gmond并启动该进程。服务器端需要安装gmetad和web程序。大致大构图如下:
参考链接:https://www.cnblogs.com/yinzhengjie/p/9798739.html
不会,Channel存储可以存储在File中,数据传输自身有事务。
开发中在flume-env.sh中设置JVM heap为4G或更高,部署在单独的服务器上(4核8线程16G内存)
-Xmx(设定程序运行期间最大可占用的内存大小)与-Xms(设定程序启动时占用内存大小)最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0,hdfs.roundValue=10,hdfs.roundUnit= second几个参数综合作用,效果如下:
(1)tmp文件在达到128M时会滚动生成正式文件
(2)tmp文件创建超10分时会滚动生成正式文件
举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:
/itcast/20180101/itcast.201801010520.tmp
即使文件内容没有达到128M,也会在05:33时滚动生成正式文件
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
Kafka机器数量=2*(峰值生产速度*副本数/100)+1
7天
每天的数据量*7天
公司自己开发的监控器;
开源的监控器:KafkaManager、KafkaMonitor
分区数并不是越多越好,一般分区数不要超过集群机器数量。分区数越多占用内存越大(ISR等),一个节点集中的分区也就越多,当它宕机的时候,对系统的影响也就越大。
分区数一般设置为:3-10个
一般我们设置成2个或3个,很多企业设置为2个。
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:400条
高峰每秒钟:1150条*(2-20倍)=2300条->23000条
每条日志大小:0.5k-2k
每秒多少数据量:2.3M-20MB
所谓的再平衡,指的是在kafka consumer所订阅的topic发生变化时发生的一种分区重分配机制。一般有三种情况会触发再平衡:
Kafka提供的再平衡策略主要有三种:Round Robin,Range和Sticky,默认使用Range。这三种分配策略的主要区别在于:
本文主要会通过几个示例来对上面讲解的三种分区重分配策略的基本实现原理进行讲解。
关于Round Robin重分配策略,其主要采用的是一种轮询的方式分配所有的分区,该策略主要实现的步骤如下。这里我们首先假设有三个topic:t0、t1和t2,这三个topic拥有的分区数分别为1、2和3,那么总共有六个分区,这六个分区分别为:t0-0、t1-0、t1-1、t2-0、t2-1和t2-2。这里假设我们有三个consumer:C0、C1和C2,它们订阅情况为:C0订阅t0,C1订阅t0和t1,C2订阅t0、t1和t2。那么这些分区的分配步骤如下:
按照上述的步骤将所有的分区都分配完毕之后,最终分区的订阅情况如下:
从上面的步骤分析可以看出,轮询的策略就是简单的将所有的partition和consumer按照字典序进行排序之后,然后依次将partition分配给各个consumer,如果当前的consumer没有订阅当前的partition,那么就会轮询下一个consumer,直至最终将所有的分区都分配完毕。但是从上面的分配结果可以看出,轮询的方式会导致每个consumer所承载的分区数量不一致,从而导致各个consumer压力不均一。
所谓的Range重分配策略,就是首先会计算各个consumer将会承载的分区数量,然后将指定数量的分区分配给该consumer。这里我们假设有两个consumer:C0和C1,两个topic:t0和t1,这两个topic分别都有三个分区,那么总共的分区有六个:t0-0、t0-1、t0-2、t1-0、t1-1和t1-2。那么Range分配策略将会按照如下步骤进行分区的分配:
最终上面六个分区的分配情况如下:
可以看到,如果按照Range分区方式进行分配,其本质上是依次遍历每个topic,然后将这些topic的分区按照其所订阅的consumer数量进行平均的范围分配。这种方式从计算原理上就会导致排序在前面的consumer分配到更多的分区,从而导致各个consumer的压力不均衡。
Sticky策略是新版本中新增的策略,顾名思义,这种策略会保证再分配时已经分配过的分区尽量保证其能够继续由当前正在消费的consumer继续消费,当然,前提是每个consumer所分配的分区数量都大致相同,这样能够保证每个consumer消费压力比较均衡。关于这种分配方式的分配策略,我们分两种情况进行讲解,即初始状态的分配和某个consumer宕机时的分配情况。
初始状态分配的特点是,所有的分区都还未分配到任意一个consumer上。这里我们假设有三个consumer:C0、C1和C2,三个topic:t0、t1和t2,这三个topic分别有1、2和3个分区,那么总共的分区为:t0-0、t1-0、t1-1、t2-0、t2-1和t2-2。关于订阅情况,这里C0订阅了t0,C1订阅了t0和1,C2则订阅了t0、t1和t2。这里的分区分配规则如下:
上面的分配过程中,需要始终注意的是,虽然示例中的consumer顺序始终没有变化,但这是由于各个分区分配之后正好每个consumer所分配的分区数量的排序结果与初始状态一致。这里读者也可以比较一下这种分配方式与前面讲解的Round Robin进行对比,可以很明显的发现,Sticky重分配策略分配得更加均匀一些。
由于前一个示例中最终的分区分配方式模拟宕机的情形比较简单,因而我们使用另一种订阅策略。这里我们的示例的consumer有三个:C0、C1和C2,topic有四个:t0、t1、t2和t3,每个topic都有两个分区,那么总的分区有:t0-0、t0-1、t1-0、t1-1、t2-0、t2-1、t3-0和t3-1。这里的订阅情况为三个consumer订阅所有的主题,那么如果按照Sticky的分区分配策略,初始状态时,分配情况如下,读者可以按照前一示例讲解的方式进行推算:
这里我们假设在消费的过程中,C1发生了宕机,此时就会发生再平衡,而根据Sticky策略,其再分配步骤如下:
在上面的分区分配过程中,我们可以看到,由于分区的不断分配,各个consumer所拥有的分区数量也在不断变化,因而其排序情况也在变化,但是最终可以看到,各个分区是均匀的分配到各个consumer的,并且还保证了当前consumer已经消费的分区是不会分配到其他的consumer上的。
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
Hive存储在HDFS。数据库将数据保存在块设备或者本地文件系统中。
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
Hive执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
RANK() 排序相同时会重复,总数不会变
DENSE_RANK() 排序相同时会重复,总数会减少
ROW_NUMBER() 会根据顺序计算
在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
自定义UDF:继承UDF,重写evaluate方法
自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
为什么要自定义UDF/UDTF,因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试.
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
Reduce个数并不是越多越好
在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;
// 输出合并小文件
SET hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件
SET hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件
SET hive.merge.size.per.task = 268435456; -- 默认256M
SET hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
图2 HBase写数据流程
Flush: <!--当memstore的大小超过这个值的时候,会flush到磁盘。128M--> <property> <name>hbase.hregion.memstore.flush.size</name> <value>134217728</value> </property> <!--单个regionserver的全部memstore的最大值。超过这个值总容量(Max Heap=983.4 M)*0.4, 一个新的put插入操作会被挂起,强制执行flush操作。 --> <property> <name>hbase.regionserver.global.memstore.upperLimit</name> <value>0.4</value> </property> <!--当强制执行flush操作的时候,当低于这个值的时候,flush会停止。默认是堆大小的 35% . --> <property> <name>hbase.regionserver.global.memstore.lowerLimit</name> <value>0.35</value> </property> Compact: <!--当一个HStore含有多于这个值的HStoreFiles(每一个memstore flush产生一个HStoreFile)的时候,会执行一个合并操作,把这HStoreFiles写成一个--> <property> <name>hbase.hstore.compactionThreshold</name> <value>3</value> </property> <!--一个Region中的所有HStoreFile的major compactions的时间间隔。默认是1天。--> <property> <name>hbase.hregion.majorcompaction</name> <value>86400000</value> </property> Split: <!--最大HStoreFile大小。若某个列族的HStoreFile增长达到这个值,这个Hegion会被切割成两个。 默认: 10G.--> <property> <name>hbase.hregion.max.filesize</name> <value>10737418240</value> </property> |
/opt/module/sqoop/bin/sqoop import \
--connect \
--username \
--password \
--target-dir \
--delete-target-dir \
--num-mappers \
--fields-terminated-by \
--query "$2" ' and $CONDITIONS;'
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。
1)场景1:如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据,此时老板正好看到了这个报表数据。而开发工程师发现任务失败后,会调试问题并最终将全部数据正确的导入MySQL,那后面老板再次看报表数据,发现本次看到的数据与之前的不一致,这在生产环境是不允许的。
官网:http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html
Since Sqoop breaks down export process into multiple transactions, it is possible that a failed export job may result in partial data being committed to the database. This can further lead to subsequent jobs failing due to insert collisions in some cases, or lead to duplicated data in others. You can overcome this problem by specifying a staging table via the --staging-table option which acts as an auxiliary table that is used to stage exported data. The staged data is finally moved to the destination table in a single transaction.
–staging-table方式
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string '\N'
2)场景2:设置map数量为1个(不推荐,面试官想要的答案不只这个)
多个Map任务时,采用–staging-table方式,仍然可以解决数据一致性问题。
只有Map阶段,没有Reduce阶段的任务。
Sqoop任务5分钟-2个小时的都有。取决于数据量。
val tuple1 = (1, 2, 3, "heiheihei")
println(tuple1)
val value1 = tuple1._4
println(value1)
方式1:
for (elem <- tuple1.productIterator ) {
print(elem)
}
println()
方式2:
tuple1.productIterator.foreach(i => println(i))
tuple1.produIterator.foreach(print(_))
隐式转换函数是以implicit关键字声明的带有单个参数的函数。这种函数将会自动应用,将值从一种类型转换为另一种类型。
implicit def a(d: Double) = d.toInt
//不加上边这句你试试
val i1: Int = 3.5
println(i1)
1)Scala中函数的地位:一等公民
2) Scala中的匿名函数(函数字面量)
3)Scala中的高阶函数
4)Scala中的闭包
5)Scala中的部分应用函数
6)Scala中的柯里化函数
case class Person(name:String,age:Int)
一般使用在 ds=df.as[Person]
函数编程中,接受多个参数的函数都可以转化为接受单个参数的函数,这个转化过程就叫柯里化,柯里化就是证明了函数只需要一个参数而已。其实我们刚的学习过程中,已经涉及到了柯里化操作,所以这也印证了,柯里化就是以函数为主体这种思想发展的必然产生的结果。
1)柯里化的示例
def mul(x: Int, y: Int) = x * y
println(mul(10, 10))
def mulCurry(x: Int) = (y: Int) => x * y
println(mulCurry(10)(9))
def mulCurry2(x: Int)(y:Int) = x * y
println(mulCurry2(10)(8))
2)柯里化的应用
比较两个字符串在忽略大小写的情况下是否相等,注意,这里是两个任务:
针对这两个操作,我们用一个函数去处理的思想,其实无意间也变成了两个函数处理的思想。示例如下:
val a = Array("Hello", "World")
val b = Array("hello", "world")
println(a.corresponds(b)(_.equalsIgnoreCase(_)))
其中corresponds函数的源码如下:
def corresponds[B](that: GenSeq[B])(p: (A,B) => Boolean): Boolean = {
val i = this.iterator
val j = that.iterator
while (i.hasNext && j.hasNex t)
if (!p(i.next(), j.next() ))
return fals
!i.hasNext && !j.hasNext
}
尖叫提示:不要设立柯里化存在义这样的命题,柯里化,是面向函数思想的必然产生结果。
一个函数把外部的那些不属于自己的对象也包含(闭合)进来。
案例1:
def minusxy(x: Int) = (y: Int) => x - y
这就是一个闭包:
1) 匿名函数(y: Int) => x -y嵌套在minusxy函数中。
2) 匿名函数(y: Int) => x -y使用了该匿名函数之外的变量x
3) 函数minusxy返回了引用了局部变量的匿名函数
案例2
def minusxy(x: Int) = (y: Int) => x - y
val f1 = minusxy(10)
val f2 = minusxy(10)
println(f1(3) + f2(3))
此处f1,f2这两个函数就叫闭包。
val map = Map("Tom"-> 23)
map("Jack") // 抛出异常 java.util.NoSuchElementException: key not found: Jack
map.get("Jack") // None
map("Tom") // 23
map.get("Tom") // Some(23)
使用模式匹配取出最后结果
val optionAge = map.get("Tom")
val age = optionAge match {
case Some(x) => optionAge.get
case None => 0
}
Shell 脚本。
参考答案:
https://blog.csdn.net/gamer_gyt/article/details/79135118
executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M
spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性以及划分任务时候起到重要作用。
Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
对相同K,把V合并成一个集合。
…
根据自身情况选择比较熟悉的算子加以介绍。
reduceBykey:
groupByKey:
…ByKey:
未经优化的HashShuffle:
优化后的Shuffle:
普通的SortShuffle:
当 shuffle read task 的 数 量 小 于 等 于 spark.shuffle.sort。
bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
groupByKey:按照key进行分组,直接进行shuffle。
开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
都是做RDD持久化的
cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。
checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
优点:
编译时类型安全
编译时就能检查出类型错误
面向对象的编程风格
直接通过类名点的方式来操作数据
缺点:
序列化和反序列化的性能开销
无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
GC的性能开销,频繁的创建和销毁对象, 势必会增加GC
DataFrame引入了schema和off-heap
schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。
当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。
三者之间的转换:
join和sql中的inner join操作很相似,返回结果是前面一个集合和后面一个集合中匹配成功的,过滤掉关联不上的。
leftJoin类似于SQL中的左外关联left outer join,返回结果以第一个RDD为主,关联不上的记录为空。
部分场景下可以使用left semi join替代left join:
因为 left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,性能更高,而 left join 则会一直遍历。但是left semi join 中最后 select 的结果中只许出现左表中的列名,因为右表只有 join key 参与关联计算了
一、基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
二、基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
优点如下:
简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
一次且仅一次的事务机制。
三、对比:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
在实际生产环境中大都用Direct方式
窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。
图中time1就是SparkStreaming计算批次大小,虚线框以及实线大框就是窗口的大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。
|
方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。
方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
这里举个例子。比如我们有几百个文件,会有几百个map出现,读取之后进行join操作,会非常的慢。这个时候我们可以进行coalesce操作,比如240个map,我们合成60个map,也就是窄依赖。这样再shuffle,过程产生的文件数会大大减少。提高join的时间性能。
Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。此外,Flink还针对特定的应用领域提供了领域库,例如:Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。
架构模型上:Spark Streaming 的task运行依赖driver 和 executor和worker,当然driver和excutor还依赖于集群管理器Standalone或者yarn等。而Flink运行时主要是JobManager、TaskManage和TaskSlot。另外一个最核心的区别是:Spark Streaming 是微批处理,运行的时候需要指定批处理的时间,每次运行 job 时处理一个批次的数据;Flink 是基于事件驱动的,事件可以理解为消息。事件驱动的应用程序是一种状态应用程序,它会从一个或者多个流中注入事件,通过触发计算更新状态,或外部动作对注入的事件作出反应。
任务调度上:Spark Streaming的调度分为构建 DGA 图,划分 stage,生成 taskset,调度 task等步骤,而Flink首先会生成 StreamGraph,接着生成 JobGraph,然后将 jobGraph 提交给 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的转变,最后由 jobManager 调度执行。
时间机制上:flink 支持三种时间机制事件时间,注入时间,处理时间,同时支持 watermark 机制处理滞后数据。Spark Streaming 只支持处理时间,Structured streaming则支持了事件时间和watermark机制。
容错机制上:二者保证exactly-once的方式不同。spark streaming 通过保存offset和事务的方式;Flink 则使用两阶段提交协议来解决这个问题。
Flink中默认提供了八大分区策略(也叫分区器)。八大分区策略继承关系图
ChannelSelector: 接口,决定将记录写入哪个Channel。有3个方法:
注意:
这里以及下边提到的Channel可简单理解为下游Operator的某个实例。Flink 中改变并行度,默认RebalancePartitioner分区策略。分区策略,可在Flink WebUI上直观看出,如REBALANCE,即使用了RebalancePartitioner分区策略;SHUFFLE,即使用了ShufflePartitioner分区策略。
GlobalPartitioner,GLOBAL分区。将记录输出到下游Operator的第一个实例。
selectChannel实现
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //对每条记录,只选择下游operator的第一个Channel return 0; } |
API使用
dataStream .setParallelism(2) // 采用GLOBAL分区策略重分区 .global() .print() .setParallelism(1); |
ShufflePartitioner,SHUFFLE分区。将记录随机输出到下游Operator的每个实例。
selectChannel实现
private Random random = new Random(); @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //对每条记录,随机选择下游operator的某个Channel return random.nextInt(numberOfChannels); } |
API使用
dataStream .setParallelism(2) // 采用SHUFFLE分区策略重分区 .shuffle() .print() .setParallelism(4); |
RebalancePartitioner,REBALANCE分区。将记录以循环的方式输出到下游Operator的每个实例。
selectChannel实现
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //第一条记录,输出到下游的第一个Channel;第二条记录,输出到下游的第二个Channel...如此循环 nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; } |
API使用
dataStream .setParallelism(2) // 采用REBALANCE分区策略重分区 .rebalance() .print() .setParallelism(4); |
RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
selectChannel实现
private int nextChannelToSendTo = -1; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; } |
API使用
dataStream .setParallelism(2) // 采用RESCALE分区策略重分区 .rescale() .print() .setParallelism(4); |
BroadcastPartitioner,BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。
selectChannel实现
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //广播分区不支持选择Channel,因为会输出到下游每个Channel中 throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); } @Override public boolean isBroadcast() { //启用广播模式,此时Channel选择器会选择下游所有Channel return true; } |
API使用
dataStream .setParallelism(2) // 采用BROADCAST分区策略重分区 .broadcast() .print() .setParallelism(4); |
ForwardPartitioner,FORWARD分区。将记录输出到下游本地的operator实例。ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks。
selectChannel实现
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0; } |
API使用
dataStream .setParallelism(2) // 采用FORWARD分区策略重分区 .forward() .print() .setParallelism(2); |
KeyGroupStreamPartitioner,HASH分区。将记录按Key的Hash值输出到下游Operator实例。
selectChannel实现
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); } // KeyGroupRangeAssignment中的方法 public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } // KeyGroupRangeAssignment中的方法 public static int assignToKeyGroup(Object key, int maxParallelism) { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } // KeyGroupRangeAssignment中的方法 public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; } |
API使用
dataStream .setParallelism(2) // 采用HASH分区策略重分区 .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0) .print() .setParallelism(4); |
CustomPartitionerWrapper,CUSTOM分区。通过Partitioner实例的partition方法(自定义的)将记录输出到下游。
selectChannel实现
Partitioner<K> partitioner; KeySelector<T, K> keySelector; public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { this.partitioner = partitioner; this.keySelector = keySelector; } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance(), e); } return partitioner.partition(key, numberOfChannels); } |
自定义分区器将指定的Key分到指定的分区
// 自定义分区器,将不同的Key(用户ID)分到指定的分区 // key: 根据key的值来分区 // numPartitions: 下游算子并行度 static class CustomPartitioner implements Partitioner<String> { @Override public int partition(String key, int numPartitions) { switch (key){ case "user_1": return 0; case "user_2": return 1; case "user_3": return 2; default: return 3; } } } |
dataStream .setParallelism(2) // 采用CUSTOM分区策略重分区 .partitionCustom(new CustomPartitioner(),0) .print() .setParallelism(4); |
Flink程序由多个任务(Source、Transformation、Sink)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行度。Flink中人物的并行度可以从多个不同层面设置:操作算子层面(Operator Level)、执行环境层面(Execution Environment Level)、客户端层面(Client Level)、系统层面(System Level)。Flink可以设置好几个level的parallelism,其中包括Operator Level、Execution Environment Level、Client Level、System Level在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism;在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism。
重启策略种类:固定延迟重启策略(Fixed Delay Restart Strategy)故障率重启策略(Failure Rate Restart Strategy)无重启策略(No Restart Strategy)Fallback重启策略(Fallback Restart Strategy)
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。
当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。
在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变量Broadcast便是解决这种情况的。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
假如我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口。
当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。
在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。
Flink流计算中可能有各种方式来保存状态:
当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复。选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互。
Flink内部提供了这些状态后端:
如果没有其他配置,系统将使用MemoryStateBackend。
Flink中的时间与现实世界中的时间是不一致的,在flink中被划分为事件时间,摄入时间,处理时间三种。如果以EventTime为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime如果以IngesingtTime为基准来定义时间窗口将形成IngestingTimeWindow,以source的systemTime为准。如果以ProcessingTime基准来定义时间窗口将形成ProcessingTimeWindow,以operator的systemTime为准。
Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件的,处理乱序事件通常用watermark机制结合window来实现。详细参考:https://www.jianshu.com/p/1c2542f11da0
TableEnvironment是Table API和SQL集成的核心概念。它负责:
StreamSQL API的执行原理如下:
Flink设计者认为:有限流处理是无限流处理的一种特殊情况,它只不过在某个时间点停止而已。Flink通过一个底层引擎同时支持流处理和批处理。详细参考:https://cloud.tencent.com/developer/article/1501348
大概的原理,上游的task产生数据后,会写在本地的缓存中,然后通知JM自己的数据已经好了,JM通知下游的Task去拉取数据,下游的Task然后去上游的Task拉取数据,形成链条。
但是在何时通知JM?这里有一个设置,比如pipeline还是blocking,pipeline意味着上游哪怕产生一个数据,也会去通知,blocking则需要缓存的插槽存满了才会去通知,默认是pipeline。
虽然生产数据的是Task,但是一个TaskManager中的所有Task共享一个NetworkEnvironment,下游的Task利用ResultPartitionManager主动去上游Task拉数据,底层利用的是Netty和TCP实现网络链路的传输。
那么,一直都在说Flink的背压是一种自然的方式,为什么是自然的了?
从上面的图中下面的链路中可以看到,当下游的process逻辑比较慢,无法及时处理数据时,他自己的local buffer中的消息就不能及时被消费,进而导致netty无法把数据放入local buffer,进而netty也不会去socket上读取新到达的数据,进而在tcp机制中,tcp也不会从上游的socket去读取新的数据,上游的netty也是一样的逻辑,它无法发送数据,也就不能从上游的localbuffer中消费数据,所以上游的localbuffer可能就是满的,上游的operator或者process在处理数据之后进行collect.out的时候申请不能本地缓存,导致上游的process被阻塞。这样,在这个链路上,就实现了背压。
如果还有相应的上游,则会一直反压上去,一直影响到source,导致source也放慢从外部消息源读取消息的速度。一旦瓶颈解除,网络链路畅通,则背压也会自然而然的解除。
Flink基于分布式快照与可部分重发的数据源实现了容错。用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。
详细参考:https://www.jianshu.com/p/1fca8fb61f86
注意:这里window产生的数据倾斜指的是不同的窗口内积攒的数据量不同,主要是由源头数据的产生速度导致的差异。核心思路:1.重新设计key 2.在窗口计算前做预聚合可以参考这个:https://blog.csdn.net/it_lee_j_h/article/details/88641894
首先要确定问题产生的原因,找到最耗时的点,确定性能瓶颈点。比如任务频繁反压,找到反压点。主要通过:资源调优、作业参数调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。
SKU(库存量单位):一台银色、128G内存的、支持联通网络的iPhoneX
SPU(标准产品单位):iPhoneX
标签 | 含义 | |
id | 订单编号 | |
total_amount | 订单金额 | |
order_status | 订单状态 | |
user_id | 用户id | |
payment_way | 支付方式 | |
out_trade_no | 支付流水号 | |
create_time | 创建时间 | |
operate_time | 操作时间 | |
标签 | 含义 | |
id | 订单编号 | |
order_id | 订单号 | |
user_id | 用户id | |
sku_id | 商品id | |
sku_name | 商品名称 | |
order_price | 商品价格 | |
sku_num | 商品数量 | |
create_time | 创建时间 | |
标签 | 含义 | |
id | skuId | |
spu_id | spuid | |
price | 价格 | |
sku_name | 商品名称 | |
sku_desc | 商品描述 | |
weight | 重量 | |
tm_id | 品牌id | |
category3_id | 品类id | |
create_time | 创建时间 | |
标签 | 含义 | |
id | 用户id | |
name | 姓名 | |
birthday | 生日 | |
gender | 性别 | |
| 邮箱 | |
user_level | 用户等级 | |
create_time | 创建时间 | |
标签 | 含义 | |
id | id | |
name | 名称 | |
标签 | 含义 | |
id | id | |
name | 名称 | |
category1_id | 一级品类id | |
标签 | 含义 | |
id | id | |
name | 名称 | |
Category2_id | 二级品类id | |
标签 | 含义 | |
id | 编号 | |
out_trade_no | 对外业务编号 | |
order_id | 订单编号 | |
user_id | 用户编号 | |
alipay_trade_no | 支付宝交易流水编号 | |
total_amount | 支付金额 | |
subject | 交易内容 | |
payment_type | 支付类型 | |
payment_time | 支付时间 | |
订单表跟订单详情表有什么区别?
订单表的订单状态会变化,订单详情表不会,因为没有订单状态。
订单表记录user_id,订单id订单编号,订单的总金额order_status,支付方式,订单状态等。
订单详情表记录user_id,商品sku_id ,具体的商品信息(商品名称sku_name,价格order_price,数量sku_num)
实体表,维度表,事务型事实表,周期性事实表
其实最终可以把事务型事实表,周期性事实表统称实体表,实体表,维度表统称维度表
订单表(order_info)(周期型事实表)
订单详情表(order_detail)(事务型事实表)
商品表(实体表)
用户表(实体表)
商品一级分类表(维度表)
商品二级分类表(维度表)
商品三级分类表(维度表)
支付流水表(事务型实体表)
实体表,维度表统称维度表,每日全量或者每月(更长时间)全量
事务型事实表:每日增量
周期性事实表:拉链表
1NF:属性不可再分割(例如不能存在5台电脑的属性,坏处:表都没法用)
2NF:不能存在部分函数依赖(例如主键(学号+课名)-->成绩,姓名,但学号--》姓名,所以姓名部分依赖于主键(学号+课名),所以要去除,坏处:数据冗余)
3NF:不能存在传递函数依赖(学号--》宿舍种类--》价钱,坏处:数据冗余和增删异常)
Mysql关系模型:关系模型主要应用与OLTP系统中,为了保证数据的一致性以及避免冗余,所以大部分业务系统的表都是遵循第三范式的。
Hive 维度模型:维度模型主要应用于OLAP系统中,因为关系模型虽然冗余少,但是在大规模数据,跨表分析统计查询过程中,会造成多表关联,这会大大降低执行效率。所以HIVE把相关各种表整理成两种:事实表和维度表两种。所有维度表围绕着事实表进行解释。
雪花模型、星型模型和星座模型
(在维度建模的基础上又分为三种模型:星型模型、雪花模型、星座模型。)
星型模型(一级维度表),雪花(多级维度),星座模型(星型模型+多个事实表)
sqoop
导数据的原理是mapreduce,
import 把数据从关系型数据库 导到 数据仓库,自定义InputFormat,
export 把数据从数据仓库 导到 关系型数据库,自定义OutputFormat,
用sqoop从mysql中将八张表的数据导入数仓的ods原始数据层
全量无条件,增量按照创建时间,增量+变化按照创建时间或操作时间。
origin_data
sku_info商品表(每日导全量)
user_info用户表(每日导全量)
base_category1商品一级分类表(每日导全量)
base_category2商品二级分类表(每日导全量)
base_category3商品三级分类表(每日导全量)
order_detail订单详情表(每日导增量)
payment_info支付流水表(每日导增量)
order_info订单表(每日导增量+变化)
(八张表,表名,字段跟mysql完全相同)
从origin_data把数据导入到ods层,表名在原表名前加ods_
对ODS层数据进行判空过滤。对商品分类表进行维度退化(降维)。其他数据跟ods层一模一样
订单表 dwd_order_info
订单详情表 dwd_order_detail
用户表 dwd_user_info
支付流水表 dwd_payment_info
商品表 dwd_sku_info
其他表字段不变,唯独商品表,通过关联3张分类表,增加了
category2_id` string COMMENT '2id',
`category1_id` string COMMENT '3id',
`category3_name` string COMMENT '3',
`category2_name` string COMMENT '2',
`category1_name` string COMMENT '1',
小结:
如果被退化的维度,还有其他业务表使用,退化后处理起来就麻烦些。
还有如果要删除数据,对应的维度可能也会被永久删除。
城市的三级分类(省、市、县)等
从订单表 dwd_order_info 中获取 下单次数 和 下单总金额
从支付流水表 dwd_payment_info 中获取 支付次数 和 支付总金额
从事件日志评论表 dwd_comment_log 中获取评论次数
最终按照user_id聚合,获得明细,跟之前的mid_id聚合不同
从用户行为宽表中dws_user_action,根据统计日期分组,聚合,直接sum就可以了。
从日活跃数表 ads_uv_count 和 日新增设备数表 ads_new_mid_count 中取即可。
从用户行为宽表dws_user_action中取,下单人数(只要下单次数>0),支付人数(只要支付次数>0)
从日活跃数表 ads_uv_count 中取活跃人数,然后对应的相除就可以了。
需求:以月为单位统计,购买2次以上商品的用户
从用户购买商品明细宽表dws_sale_detail_daycount中,根据品牌id--sku_tm_id聚合,计算每个品牌购买的总次数,购买人数a=购买次数>=1,两次及以上购买人数b=购买次数>=2,三次及以上购买人数c=购买次数>=3,
单次复购率=b/a,多次复购率=c/a
宽表要3-5张,用户行为宽表,用户购买商品明细行为宽表,商品宽表,购物车宽表,物流宽表、登录注册、售后等。
需求目标,把每个用户单日的行为聚合起来组成一张多列宽表,以便之后关联用户维度信息后进行,不同角度的统计分析。
订单表拉链表 dwd_order_info_his
`id` string COMMENT '订单编号',
`total_amount` decimal(10,2) COMMENT '订单金额',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id' ,
`payment_way` string COMMENT '支付方式',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间' ,
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
1)创建订单表拉链表,字段跟拉链表一样,只增加了有效开始日期和有效结束日期
初始日期,从订单变化表ods_order_info导入数据,且让有效开始时间=当前日期,有效结束日期=9999-99-99
(从mysql导入数仓的时候就只导了新增的和变化的数据ods_order_info,dwd_order_info跟ods_order_info基本一样,只多了一个id的判空处理)
2)建一张拉链临时表dwd_order_info_his_tmp,字段跟拉链表完全一致
3)新的拉链表中应该有这几部分数据,
(1)增加订单变化表dwd_order_info的全部数据
(2)更新旧的拉链表左关联订单变化表dwd_order_info,关联字段:订单id, where 过滤出end_date只等于9999-99-99的数据,如果旧的拉链表中的end_date不等于9999-99-99,说明已经是终态了,不需要再更新
如果dwd_order_info.id is null , 没关联上,说明数据状态没变,让end_date还等于旧的end_date
如果dwd_order_info.id is not null , 关联上了,说明数据状态变了,让end_date等于当前日期-1
把查询结果插入到拉链临时表中
4)把拉链临时表覆盖到旧的拉链表中
Ganglia监控Flume发现尝试提交的次数大于最终成功的次数
Flume上传文件到HDFS时参数大量小文件?
调整hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount这三个参数的值。
在下一级消费者中去重。(redis、SparkStreaming)
Hive的metadata存储在MySql中(配置MySql的高可用(主从复制和读写分离和故障转移))
自定义UDF(extends UDF 实现evaluate方法) 解析公共字段
自定义UDTF(extends Genertic UDTF->实现三个方法init(指定返回值的名称和类型)、process(处理字段一进多出)、close方法) -> 更加灵活以及方便定义bug
Ads层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式
Sqoop中导入导出Null存储一致性问题:
Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用--input-null-string和--input-null-non-string两个参数。导入数据时采用--null-string和--null-non-string。
当Sqoop导出数据到MySql时,使用4个map怎么保证数据的一致性
因为在导出数据的过程中map任务可能会失败,可以使用—staging-table –clear-staging
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string '\N'
任务执行成功首先在tmp临时表中,然后将tmp表中的数据复制到目标表中(这个时候可以使用事务,保证事务的一致性)
如何优雅的关闭SparkStreaming任务(将写好的代码打包,Spark-Submit)
Kill -9 xxx ?
开启另外一个线程每5秒监听HDFS上一个文件是否存在。如果检测到存在,调用ssc.stop()方法关闭SparkStreaming任务(当你要关闭任务时,可以创建你自定义监控的文件目录)
如果不配置datanode.data.dir多目录,每次插入一块新的硬盘都需要重启服务器
配置了即插即用
Namenode有一个工作线程池,用来处理与datanode的心跳(报告自身的健康状况和文件恢复请求)和元数据请求 dfs.namenode.handler.count=20 * log2(Cluster Size)
4)编辑日志存储路径dfs.namenode.edits.dir设置与镜像文件存储路径 dfs.namenode.name.dir尽量分开,达到最低写入延迟(提高写入的吞吐量)
5)YARN参数调优yarn-site.xml
(1)服务器节点上YARN可使用的物理内存总量,默认是8192(MB)
(2)单个任务可申请的最多物理内存量,默认是8192(MB)。
6)HDFS和硬盘空闲控制在70%以下。
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
每天数据总量100g(1亿条) 10000万/24/60/60 = 1150条/s
平均每秒钟:1150条
低谷每秒:400条
高峰每秒钟:1150 * 10 = 11000 条
每条日志大小: 1K左右
每秒多少数据量:20MB
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。
–staging-table方式 --clear-staging
Sqoop任务5分钟-2个小时的都有。取决于数据量。
Zip a.job b.job c.job job.zip 把压缩的zip包放到azkaban的web界面上提交(指定sechduler)
压缩采用Snappy,存储采用orc,压缩比是100g数据压缩完10g左右。
select case when a is null then b else a end as JZR,
...
from A
Sql、mr、rdd、kettle、Python(项目中采用sql进行清除)
1万条数据清洗掉1条。
具体宽表名称:用户行为宽表,用户购买商品明细行为宽表,商品宽表,购物车宽表,物流宽表、登录注册、售后等。
最宽的是用户行为宽表。大概有60-100个字段
评论、打赏、收藏、关注--商品、关注--人、点赞、分享、好价爆料、文章发布、活跃、签到、补签卡、幸运屋、礼品、金币、电商点击、gmv
CREATE TABLE `app_usr_interact`(
`stat_dt` date COMMENT '互动日期',
`user_id` string COMMENT '用户id',
`nickname` string COMMENT '用户昵称',
`register_date` string COMMENT '注册日期',
`register_from` string COMMENT '注册来源',
`remark` string COMMENT '细分渠道',
`province` string COMMENT '注册省份',
`pl_cnt` bigint COMMENT '评论次数',
`ds_cnt` bigint COMMENT '打赏次数',
`sc_add` bigint COMMENT '添加收藏',
`sc_cancel` bigint COMMENT '取消收藏',
`gzg_add` bigint COMMENT '关注商品',
`gzg_cancel` bigint COMMENT '取消关注商品',
`gzp_add` bigint COMMENT '关注人',
`gzp_cancel` bigint COMMENT '取消关注人',
`buzhi_cnt` bigint COMMENT '点不值次数',
`zhi_cnt` bigint COMMENT '点值次数',
`zan_cnt` bigint COMMENT '点赞次数',
`share_cnts` bigint COMMENT '分享次数',
`bl_cnt` bigint COMMENT '爆料数',
`fb_cnt` bigint COMMENT '好价发布数',
`online_cnt` bigint COMMENT '活跃次数',
`checkin_cnt` bigint COMMENT '签到次数',
`fix_checkin` bigint COMMENT '补签次数',
`house_point` bigint COMMENT '幸运屋金币抽奖次数',
`house_gold` bigint COMMENT '幸运屋积分抽奖次数',
`pack_cnt` bigint COMMENT '礼品兑换次数',
`gold_add` bigint COMMENT '获取金币',
`gold_cancel` bigint COMMENT '支出金币',
`surplus_gold` bigint COMMENT '剩余金币',
`event` bigint COMMENT '电商点击次数',
`gmv_amount` bigint COMMENT 'gmv',
`gmv_sales` bigint COMMENT '订单数')
PARTITIONED BY ( `dt` string)
5% 30% 70%
网站流量指标 独立访问数UV 页面访客数PV
流量质量指标类 跳出率 平均页面访问时长 人均页面访问数
加入购物车次数 加入购物车买家次数 加入购物车商品数 购物车支付转化率
下单笔数 下单金额 下单买家数 浏览下单转化率
支付金额 支付买家数 支付商品数 浏览-支付买家转化率
下单-支付金额转化率 下单-支付买家数转换率
交易成功订单数 交易成功金额 交易成功买家数 交易成功商品数
交易失败订单数 交易失败订单金额 交易失败买家数
交易失败商品数 退款总订单量 退款金额 退款率
新增访问人数 新增注册人数 广告投资回报率 UV订单转化率
买家评价数 买家上传图片数 买家评价率 买家好评率 买家差评率
物流平均配送时间
发起投诉数 投诉率 撤销投诉(申诉数)
产品总数 SKU数 SPU数
上架商品SKU数 上架商品SPU数 上架商品数
日活跃用户,
月活跃用户,
各区域Top10商品统计,
季度商品品类点击率top10,
用户留存,
月APP的用户增长人数,
广告区域点击数top3,
活跃用户每天在线时长,
投诉人数占比,
沉默用户占比,
用户的新鲜度,
商品上架的sku数,
同种品类的交易额排名,
统计买家的评价率,
用户浏览时长,
统计下单的数量,
统计支付的数量,
统计退货的数量,
用户的(日活、月活、周活),
统计流失人数
日活,周活,月活,沉默用户占比,增长人数,活跃用户占比,在线时长统计,歌曲访问数,歌曲访问时长,各地区Top10歌曲统计 ,投诉人数占比,投诉回应时长,留存率,月留存率,转化率,GMV,复购vip率,vip人数,歌榜,挽回率,粉丝榜,打赏次数,打赏金额,发布歌曲榜单,歌曲热度榜单,歌手榜单,用户年龄组,vip年龄组占比,收藏数榜单,评论数
最近连续3周活跃用户数:
最近7天连续3天活跃用户数:
基本一个项目建一个库,表格个数为初始的原始数据表格加上统计结果表格的总数。(一般70-100张表格)
每天0:30开始运行。
所有离线数据报表控制在8小时之内
大数据实时处理部分控制在5分钟之内。
常用的包括:textFile,rcFile,ORC,Parquet,一般企业里使用ORC或者Parquet,因为是列式存储,且压缩比非常高,所以相比于textFile,查询速度快,占用硬盘空间少
Echarts、kibana、supesrset
测试服务器一般三台
一部分自己写Java程序自己造,一部分从生产环境上取一部分。
需要造一些特定的测试数据,测试。
离线数据和实时数据分析的结果比较。
测试环境的配置是生产的一半
上线的时候,将脚本打包,提交git。先发邮件抄送经理和总监,运维。通过之后跟运维一起上线。
刚入职第一个需求大概需要7天左右。
对业务熟悉后,平均一天一个需求。
影响时间的因素:开会讨论需求、表的权限申请、测试等
差不多一个月会迭代一次。就产品或我们提出优化需求,然后评估时间。每周我们都会开会做下周计划和本周总结。
有时候也会去预研一些新技术。
新需求比如埋点或是报表来了之后,需要设计做的方案,设计完成之后跟产品讨论,再开发。
数仓的任何步骤出现问题,需要查看问题,比如日活,月活下降等。
hashMap的底层结构在jdk1.7中由数组+链表实现,在jdk1.8中由数组+链表+红黑树实现,以数组+链表的结构为例。
JDK1.8之前Put方法:
JDK1.8之后Put方法:
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。这种类型的线程池特点是:
工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。延迟3秒执行。
HashMap是线程不安全的,HashTable是线程安全的,其中的方法是Synchronize的,在多线程并发的情况下,可以直接使用HashTabl,但是使用HashMap时必须自己增加同步处理。
HashMap只有containsValue和containsKey方法;HashTable有contains、containsKey和containsValue三个方法,其中contains和containsValue方法功能相同。
Hashtable中,key和value都不允许出现null值。HashMap中,null可以作为键,这样的键只有一个;可以有一个或多个键所对应的值为null。
HashTable在不指定容量的情况下的默认容量为11,而HashMap为16,Hashtable不要求底层数组的容量一定要为2的整数次幂,而HashMap则要求一定为2的整数次幂。
Hashtable扩容时,将容量变为原来的2倍加1,而HashMap扩容时,将容量变为原来的2倍。
HashSet是采用hash表来实现的。其中的元素没有按顺序排列,add()、remove()以及contains()等方法都是复杂度为O(1)的方法。
TreeSet是采用树结构实现(红黑树算法)。元素是按顺序进行排列,但是add()、remove()以及contains()等方法都是复杂度为O(log (n))的方法。它还提供了一些方法来处理排序的set,如first(), last(), headSet(), tailSet()等等。
final:修饰符(关键字)有三种用法:修饰类、变量和方法。修饰类时,意味着它不能再派生出新的子类,即不能被继承,因此它和abstract是反义词。修饰变量时,该变量使用中不被改变,必须在声明时给定初值,在引用中只能读取不可修改,即为常量。修饰方法时,也同样只能使用,不能在子类中被重写。
finally:通常放在try…catch的后面构造最终执行代码块,这就意味着程序无论正常执行还是发生异常,这里的代码只要JVM不关闭都能执行,可以将释放外部资源的代码写在finally块中。
finalize:Object类中定义的方法,Java中允许使用finalize() 方法在垃圾收集器将对象从内存中清除出去之前做必要的清理工作。这个方法是由垃圾收集器在销毁对象时调用的,通过重写finalize() 方法可以整理系统资源或者执行其他清理工作。
== : 如果比较的是基本数据类型,那么比较的是变量的值
如果比较的是引用数据类型,那么比较的是地址值(两个对象是否指向同一块内存)
equals:如果没重写equals方法比较的是两个对象的地址值。
如果重写了equals方法后我们往往比较的是对象中的属性的内容
equals方法是从Object类中继承的,默认的实现就是使用==
缓存雪崩
缓存雪崩我们可以简单的理解为:由于原有缓存失效,新缓存未到期间(例如:我们设置缓存时采用了相同的过期时间,在同一时刻出现大面积的缓存过期),所有原本应该访问缓存的请求都去查询数据库了,而对数据库CPU和内存造成巨大压力,严重的会造成数据库宕机。从而形成一系列连锁反应,造成整个系统崩溃。
缓存正常从Redis中获取,示意图如下:
缓存失效瞬间示意图如下:
缓存失效时的雪崩效应对底层系统的冲击非常可怕!大多数系统设计者考虑用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。还有一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
以下简单介绍两种实现方式的伪代码:
(1)碰到这种情况,一般并发量不是特别多的时候,使用最多的解决方案是加锁排队,伪代码如下:
加锁排队只是为了减轻数据库的压力,并没有提高系统吞吐量。假设在高并发下,缓存重建期间key是锁着的,这是过来1000个请求999个都在阻塞的。同样会导致用户等待超时,这是个治标不治本的方法!
注意:加锁排队的解决方式分布式环境的并发问题,有可能还要解决分布式锁的问题;线程还会被阻塞,用户体验很差!因此,在真正的高并发场景下很少使用!
(2)还有一个解决办法解决方案是:给每一个缓存数据增加相应的缓存标记,记录缓存的是否失效,如果缓存标记失效,则更新数据缓存,实例伪代码如下:
解释说明:
1、缓存标记:记录缓存数据是否过期,如果过期会触发通知另外的线程在后台去更新实际key的缓存;
2、缓存数据:它的过期时间比缓存标记的时间延长1倍,例:标记缓存时间30分钟,数据缓存设置为60分钟。 这样,当缓存标记key过期后,实际缓存还能把旧数据返回给调用端,直到另外的线程在后台更新完成后,才会返回新缓存。
关于缓存崩溃的解决方法,这里提出了三种方案:使用锁或队列、设置过期标志更新缓存、为key设置不同的缓存失效时间,还有一各被称为“二级缓存”的解决方法,有兴趣的读者可以自行研究。
缓存穿透
缓存穿透是指用户查询数据,在数据库没有,自然在缓存中也不会有。这样就导致用户查询的时候,在缓存中找不到,每次都要去数据库再查询一遍,然后返回空(相当于进行了两次无用的查询)。这样请求就绕过缓存直接查数据库,这也是经常提的缓存命中率问题。
有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。
另外也有一个更为简单粗暴的方法,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。通过这个直接设置的默认值存放到缓存,这样第二次到缓冲中获取就有值了,而不会继续访问数据库,这种办法最简单粗暴!
把空结果,也给缓存起来,这样下次同样的请求就可以直接返回空了,即可以避免当查询的值为空时引起的缓存穿透。同时也可以单独设置个缓存区域存储空值,对要查询的key进行预先校验,然后再放行给后面的正常缓存处理逻辑。
缓存预热
缓存预热这个应该是一个比较常见的概念,相信很多小伙伴都应该可以很容易的理解,缓存预热就是系统上线后,将相关的缓存数据直接加载到缓存系统。这样就可以避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询事先被预热的缓存数据!
解决思路:
缓存更新
除了缓存服务器自带的缓存失效策略之外(Redis默认的有6种策略可供选择),我们还可以根据具体的业务需求进行自定义的缓存淘汰,常见的策略有两种:
两者各有优劣,第一种的缺点是维护大量缓存的key是比较麻烦的,第二种的缺点就是每次用户请求过来都要判断缓存失效,逻辑相对比较复杂!具体用哪种方案,大家可以根据自己的应用场景来权衡。
缓存降级
当访问量剧增、服务出现问题(如响应时间慢或不响应)或非核心服务影响到核心流程的性能时,仍然需要保证服务还是可用的,即使是有损服务。系统可以根据一些关键数据进行自动降级,也可以配置开关实现人工降级。
降级的最终目的是保证核心服务可用,即使是有损的。而且有些服务是无法降级的(如加入购物车、结算)。
在进行降级之前要对系统进行梳理,看看系统是不是可以丢卒保帅;从而梳理出哪些必须誓死保护,哪些可降级;比如可以参考日志级别设置预案:
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
用文字描述一下故障切换(failover)的过程。假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。这样对于客户端而言,一切都是透明的。
string | 字符串 |
list | 可以重复的集合 |
set | 不可以重复的集合 |
hash | 类似于Map<String,String> |
zset(sorted set) | 带分数的set |
Redis重新启动时读取这个文件,重新执行新建、修改数据的命令恢复数据。
保存策略:
推荐(并且也是默认)的措施为每秒持久化一次,这种策略可以兼顾速度和安全性。
缺点:
选择策略:
官方推荐:
如果对数据不敏感,可以选单独用RDB;不建议单独用AOF,因为可能出现Bug;如果只是做纯内存缓存,可以都不用
所谓悲观锁:具有强烈的独占和排他特性。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。
简单来说:执行操作前假设当前的操作肯定(或有很大几率)会被打断(悲观)。基于这个假设,我们在做操作前就会把相关资源锁定,不允许自己执行期间有其他操作干扰。
悲观锁的并发性能差,但是能保证不会发生脏数据的可能性小一点。
执行操作前假设当前操作不会被打断(乐观)。基于这个假设,我们在做操作前不会锁定资源,万一发生了其他操作的干扰,那么本次操作将被放弃。Redis使用的就是乐观锁。
对比项 | MyISAM | InnoDB |
外键 | 不支持 | 支持 |
事务 | 不支持 | 支持 |
行表锁 | 表锁,即使操作一条记录也会锁住整个表,不适合高并发的操作 | 行锁,操作时只锁某一行,不对其它行有影响, 适合高并发的操作 |
缓存 | 只缓存索引,不缓存真实数据 | 不仅缓存索引还要缓存真实数据,对内存要求较高,而且内存大小对性能有决定性的影响 |
数据结构:B+Tree
一般来说能够达到range就可以算是优化了
口诀(两个法则加6种索引失效的情况)
全值匹配我最爱,最左前缀要遵守;
带头大哥不能死,中间兄弟不能断;
索引列上少计算,范围之后全失效;
LIKE百分写最右,覆盖索引不写*;
不等空值还有OR,索引影响要注意;
VAR引号不可丢,SQL优化有诀窍。
一、事务的基本要素(ACID)
二、事务的并发问题
小结:不可重复读的和幻读很容易混淆,不可重复读侧重于修改,幻读侧重于新增或删除。解决不可重复读的问题只需锁住满足条件的行,解决幻读需要锁表
三、MySQL事务隔离级别
事务隔离级别 脏读 不可重复读 幻读
读未提交(read-uncommitted) 是 是 是
不可重复读(read-committed) 否 是 是
可重复读(repeatable-read) 否 否 是
串行化(serializable) 否 否 否
name kecheng fenshu
张三 语文 81
张三 数学 75
李四 语文 76
李四 数学 90
王五 语文 81
王五 数学 100
王五 英语 90
A: select distinct name from table where name not in (select distinct name from table where fenshu<=80)
B:select name from table group by name having min(fenshu)>80
自动编号 学号 姓名 课程编号 课程名称 分数
1 2005001 张三 0001 数学 69
2 2005002 李四 0001 数学 89
3 2005001 张三 0001 数学 69
删除除了自动编号不同, 其他都相同的学生冗余信息
A: delete tablename where 自动编号 not in(select min(自动编号) from tablename group by学号, 姓名, 课程编号, 课程名称, 分数)
select a.name, b.name
from team a, team b
where a.name < b.name
year month amount
1991 1 1.1
1991 2 1.2
1991 3 1.3
1991 4 1.4
1992 1 2.1
1992 2 2.2
1992 3 2.3
1992 4 2.4
查成这样一个结果
year m1 m2 m3 m4
1991 1.1 1.2 1.3 1.4
1992 2.1 2.2 2.3 2.4
答案
select year,
(select amount from aaa m where month=1 and m.year=aaa.year) as m1,
(select amount from aaa m where month=2 and m.year=aaa.year) as m2,
(select amount from aaa m where month=3 and m.year=aaa.year) as m3,
(select amount from aaa m where month=4 and m.year=aaa.year) as m4
from aaa group by year
SQL: select * into b from a where 1<>1 (where1=1,拷贝表结构和数据内容)
ORACLE:create table b
As
Select * from a where 1=2
[<>(不等于)(SQL Server Compact)
比较两个表达式。 当使用此运算符比较非空表达式时,如果左操作数不等于右操作数,则结果为 TRUE。 否则,结果为 FALSE。]
购物人 商品名称 数量
A 甲 2
B 乙 4
C 丙 1
A 丁 2
B 丙 5
……
给出所有购入商品为两种或两种以上的购物人记录
答:select * from 购物信息 where 购物人 in (select 购物人 from 购物信息 group by 购物人 having count(*) >= 2);
date result
2005-05-09 win
2005-05-09 lose
2005-05-09 lose
2005-05-09 lose
2005-05-10 win
2005-05-10 lose
2005-05-10 lose
如果要生成下列结果, 该如何写sql语句?
win lose
2005-05-09 2 2
2005-05-10 1 2
答案:
(1) select date, sum(case when result = "win" then 1 else 0 end) as "win", sum(case when result = "lose" then 1 else 0 end) as "lose" from info group by date;
(2) select a.date, a.result as win, b.result as lose
from
(select date, count(result) as result from info where result = "win" group by date) as a
join
(select date, count(result) as result from info where result = "lose" group by date) as b
on a.date = b.date;
java虚拟机主要分为以下几个区:
本地方法栈和虚拟机栈类似,只不过本地方法栈为Native方法服务。
java堆是所有线程所共享的一块内存,在虚拟机启动时创建,几乎所有的对象实例都在这里创建,因此该区域经常发生垃圾回收操作。
内存空间小,字节码解释器工作时通过改变这个计数值可以选取下一条需要执行的字节码指令,分支、循环、跳转、异常处理和线程恢复等功能都需要依赖这个计数器完成。该内存区域是唯一一个java虚拟机规范没有规定任何OOM情况的区域。
Java类加载需要经历一下几个过程:
加载时类加载的第一个过程,在这个阶段,将完成一下三件事情:
验证的目的是为了确保Class文件的字节流中的信息不回危害到虚拟机.在该阶段主要完成以下四钟验证:
准备阶段是为类的静态变量分配内存并将其初始化为默认值,这些内存都将在方法区中进行分配。准备阶段不分配类中的实例变量的内存,实例变量将会在对象实例化时随着对象一起分配在Java堆中。
该阶段主要完成符号引用到直接引用的转换动作。解析动作并不一定在初始化动作完成之前,也有可能在初始化之后。
初始化时类加载的最后一步,前面的类加载过程,除了在加载阶段用户应用程序可以通过自定义类加载器参与之外,其余动作完全由虚拟机主导和控制。到了初始化阶段,才真正开始执行类中定义的Java程序代码。
判断一个对象是否存活有两种方法:
实现通过类的权限定名获取该类的二进制字节流的代码块叫做类加载器。
主要有一下四种类加载器:
内存分配:
回收策略以及Minor GC和Major GC:
当Eden区没有足够的空间进行分配时,虚拟机会执行一次Minor GC.Minor GC通常发生在新生代的Eden区,在这个区的对象生存期短,往往发生GC的频率较高,回收速度比较快;Full Gc/Major GC 发生在老年代,一般情况下,触发老年代GC的时候不会触发Minor GC,但是通过配置,可以在Full GC之前进行一次Minor GC这样可以加快老年代的回收速度。
当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。分布式锁可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存,如 Redis,通过set (key,value,nx,px,timeout)方法添加分布式锁。
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
-----------------------------------------------------------------------------------
如果你是大数据从业者,建议关注下公众号【五分钟学大数据】,本号专注于大数据技术研究,绝对能让你在大数据技术方面有所收获!
可直接扫码关注
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。