赞
踩
Flink 是一个分布式的流式数据的处理引擎,对于有界和无界数据进行状态计算,提供了很多便于用户编写分布式任务的 API,有 DataSetAPI,但是新版本中已经被舍弃了,即将淘汰了,现在用的是 DataStreamAPI,还有一些 TbaleAPI,但是做的并不是十分完善,比起 SqarkSQL 还是有很大的差距,Flink 里面还提供了容错机制,FlinkCEP实时预警等功能。
1)从架构角度上:SparkStreaming 的 Task 的运行依赖于 Driver,Executor,Worker,Flink 运行主要依赖于JobManager,TaskManager。
2)从数据处理的角度:SparkStreaming 是微批处理,需要指定微批处理的时间间隔,而 Flink 是通过事件时间作为驱动的,是真正意义上的流处理。
3)从时间机制上:Flink 提供了事件时间,注入时间,处理时间,最主要的就是事件时间。同时 Flink 支持 WaterMark 水位线机制,支持数据的延迟处理,这方面 SaprkStreaming 只有处理时间、StructStreaming 支持事件时间和 WaterMark,但是并没有 Flink 做的好,Flink 做的更加完善。
4)checkpoint方面,Flink 的 checkpoint 有 exactly-once,保证数据刚好被处理一次,spark 可能会处理多次。
1、checkpoint
这点是最重要的,并且会延伸出很多其他的问题,首先搞清楚什么是 checkpoint:
数据在处理的过程中,保存了处理的状态还有处理的位置等,保存的方式和拍快照差不多,如果系统出现故障了,重新启动之后,就会从 checkpoint 记录的位置或者状态继续进行计算,这样数据就能保证不丢失了。
说到 checkpoint 就要说到栅栏机制:
JobManager 启动一个 checkpoint 的协调器线程(如果记不住名字,可以直接说启动一个线程就行了),然后这个线程会定时的发送 barrier(栅栏),栅栏会按照顺序,依次经过各个 operator,比如 Source、Transfomation、Sink 这个顺序,经过 Source 的时候,Source 就会进行一个短暂的暂停(极其短暂,并不影响数据的运行,因为 Flink 是分布式的),然后将当前 Source 的状态拍快照,暂时存储在 HDFS 中,然后栅栏通过,进行下一个 Transfomation 的状态存储,然后重复上面的步骤,然后栅栏通过,进行 Sink,也是重复上面的步骤,最后栅栏返回到 checkpoint 的协调器线程中,checkpoint 的协调器线程判断上面所有的步骤都 OK 的时候,才算是 checkpoint 成功的执行了一次,中间有一个 operator 失败了,都算做 checkpoint 失败。
2、checkpoint 的执行模式(一致性):
at-most-once:至多一次。故障发生之后,计算结果可能丢失;
at-least-once:至少一次。就算故障发生了,也保证每个至少处理一次数据,所以可能会出现重复处理的情况;
exactly-once:精确一次。不管系统是否发生故障,结果只会精确的出现一次,也成为恰好一次;
(大多数 checkpoint 执行模式都是选用 exactly-once 进行一致性处理)
3、State(状态):
把状态存储到每个 taskManager(就是节点,如 node1,node2,node3 等)的内存中,说到内存肯定是不如磁盘安全的
注意:checkpoint就是对每个 operator 的 State 做了持久化存储(持久化就是保存在磁盘中,如HDFS)
代码更改后只要 checkpoint 中的数据结构不发生变化,是不影响数据恢复的。
1、开始事务创建一个临时文件夹,把数据写入到这个文件夹里面。
2、预提交将内存中缓存的数据写入文件并关闭。
3、正式提交将之前写完的临时文件放入目标目录下。这代表这最终的数据。
4、丢失执行成功的时候会吧临时文件都干掉。
5、若失败发生在预提交之后,正式提交前,可以根据状态来提交预提交的数据,也可以删除预提交的数据。
1、考虑使用处理时间。
2、加机器。
3、使用滚动事件减少数据的重复。
4、kafka缓存。
solt 就是 TaskManager 上面的槽,属于提供方;
并行度是在 slot 上执行的,属于使用方。
是基于 Flink 的 checkpoint 才可以启动的,默认就是固定间隔重启,比如重启几次、不重启等,不设置 checkpoint 就是没有重启策略,还有 FallBack 重启策略,还有故障率重启策略。
因为 Flink 中的任务是在 slot 中并行执行的,如果一旦多个 slot 都需要一份变量数据的时候,那么我们可以用到广播变量,将这个变量分别发送到各个 TaskManager(节点)上,这样每个 slot 就可以直接到自己的 TaskManager 节点和上获取变量数据了,减少了远程传输带来的性能的损耗,但是每个 TaskManager 上只能出现同一个广播变量一份。
没有固定的时间间隔,就是当用户退出了这个会话,就对会话阶段内的窗口进行数据统计。
三个:MemoryStateBackend,FSStateBackend,RocksDBstateBackend
1、在 windows 之前进行预聚合。
2、重新对 key 进行设置或者修改。
其实flink是自带反压策略的,jobManager 和 taskManager 是有通讯的,一旦出现下游处理时间较长,那么 source 阶段就会调整自己拉取的数据量大小,如果自动处理的策略还是没有减少压力的效果,那么可以通过设置 parrilizims 通过提高并行度来解决,还可以通过设置 slot 的数量来提高数据处理的速率来解决。
能减少线程之间的切换,减少消息的序列化还有反序列化,减少延迟的同时提高了吞吐量,至于什么情况下才会出现 operatorChain 呢,当连续的 task 之间出现 OneToOne 模式就会自动将这几个多个 task 连接,形成一条算子链,那么什么是 OneToOne 呢,OneToOne 就是上下游的并行度相同,也没有发生数据的排序变化,此时就认为是 OneToOne 模式,一般在代码调试阶段会将operatorChain关闭,这样方便在 Flink-dashboard 中观察数据流转信息。
1、首先在业务上避免这种问题,比如上海北京的订单比较多,其他的少,那我们可以对上海北京的数据做单独处理。
2、在key上进行处理。
3。参数设置,缓存一定的数据后再进行触发,以减少对state的访问,从而减少数据短时间的传输,减少数据倾斜。
Flink 中的 TaskManager 是执行任务的真正的 worker,每个 TaskManager 都会创建一个独立的线程来进行 task 任务的执行,所以设置了 slot 就是将 TaskManager 的资源平均分配出来,这样可以让多个 Task 同时执行,这样就避免了多个 task 之间导致的资源的竞争问题,但是注意 solt 只会对内存进行隔离,CPU 是不会进行隔离的。
map,filter,window,windowAll,apply,process,keyby,reduce
分区策略是指数据如何发送至下游,flink中提供了8中分区策略:
shufflepartitioner,broadcast,rebalance,forwards,keygroupstream,custom,global,rescale
slot 可以说是 taskManager 上一个设置了三个,有3个 taskmanager 那么就是9个,但是当我们的parallelism设置了1,那么就会有8个 slot 空闲了。
水印就是 watermark,本质上就是时间戳,是用来解决flink中的数据延迟问题导致的事件乱序问题。
在内部 catalog 中注册表,注册外部 catalog,执行 sql 的查询,注册用户自定义函数,将 dataStream 转换为表。
首先用 calcite 对 sql 进行语法检验,语法检验通过之后转换成 calcite 的逻辑树节点,生成 calcite 逻辑计划,然后采用枫林客自定义的优化规则和 calcite 火山模型,启发式模型,共同对逻辑树进行优化,形成最优的物理执行计划,对物理执行计划生成代码,提交到 Flink 平台上进行运行。
通过 typeinformation 这个基类,根据数据集中的数据类型来自动进行 typeserializer 的匹配,所以可以高效的对数据进行序列化和反序列化。
Flink 是可以独立于 Hadoop 之上独立完成使用的,但是 Hadoop 使我们大数据环境中无法躲避的技术栈,比如我们的 HDFS,大多数数据的存储都依赖于此,还有 Yarn 作为集群的资源调度,也是我们所依赖的。
Source+Transformation+Sink
Flink 的任务在执行的时候会被并行执行,每个并行执行的实例处理一部分,有几个并行执行的实例就有几个并行度,Flink 的并行度可以通过这么几种方式进行设置,分别是:算子,env,客户端,系统端,优先级是从前到后依次递减。
就是将数据存放在每个 taskManager 节点中,防止 task 重复拉取。
简单来说就是 Flink 的生产者认为,批处理是流处理的一种特殊情况,批处理是有限的流处理。
Flink 的数据交换是在 slot 中进行的,但是 slot 是归于 taskManager 所管理的,taskManager 从 buffer 中获取 record,但是并不是获取一个 record 就发送一条出去,而是以 batch (批)的形式等待累积再发送。
checkpoint 进行分布式的快照处理存储在 HDFS 上,state 对每个 operator 的计算过程中的状态进行存储。
barrier
机制中有做介绍。
Flink 中有这么一个特殊的组件为 connector (连接器),在 Flink1.9 版本之前,不同的 kafka 版本在 Flink 中使用需要用不同的连接器的版本,Flink1.9 版本之后不需要改变连接器的版本就可以使用不同版本的 kafka了。
Flink 集群下的 taskManager 会将资源平均的分配给各个 slot,所以在一个节点上就可以运行多个 slot,所以多个 slot 可以运行在同一个 jvm 中,这样就可以减少了数据的远程传输,也能共享一些数据结构,在一定程度上减少了每个 task 对资源的消耗,并且一个 slot 上可以进行多个算子的计算,也可以满足 Flink 计算资源调度的优化。
FlinkCEP 是对某个事件在指定时间内的触发次数做判断然后可以进行实时或离线的预警功能,首先说一下 FlinkCEP 的执行的一个流程。
首先自定义一个触发的逻辑,Pattern<泛型>.begin…,主要的算子有begin(指定begin的name),where(返回的是一个boolean的数据类型,如果返回的是true,证明begin被出发了),followBy(指定followBy的name),where(同上),within(指定触发的时间范围)
然后将这个自定义的CEP规则,应用到数据流中,CEP.pattern,指定input和自定义的逻辑,返回一个PatternStream。
将这个PatternStream利用select算子,处理的是一个Map集合,key是刚才的followBy设置的name,value就是收集的input,然后通过get(followBy的name)获取value,返回的是List,通过判断List集合的长度,来进行相关的预警业务,如发送邮件等…
相关算子
begin,where,next(挨着),followby(宽松临近),notnext(不挨着),notFollowBy(不临近),within,select,flatSelect(提取事件结果进行结果的处理),timeOrMore(最少几次…)等…
FlinkCEP 底层的原理(一半)
FlinkCEP 在运行的时候会将逻辑转换成为一个NFA对象,对象里面包含中间状态,以及连接中间状态的边,当一个state调到另一个state,需要通过一条边StateTransition,这条边中包含一个Condition对象,也就是 where() 中返回的boolean的方法,也就是说Condition对象中包含着是否可以完成状态跳变的条件,一个状态向跳到另一个状态,就需要满足边中的条件,其中StateTransition分为三种,take(状态满足后直接跳到下一个),ignore(状态满足条件后,回到原来的状态),process(这条边可以忽略,也可以不忽略)
用户编写的CEP逻辑会被返回成一个Pattern,此时就会自动生成一个NFAFactory,NFAFactory中包含着中间的State,工厂将所有的State存放到Map中,key就是我们设置的name。
此时NFA会取出事件时间,然后判断是否符合watermark的条件,如果不符合有侧道输出,就输出到侧道中,没有的话直接丢弃,然后初始化就完成了,然后有一个queue的队列,总结起来就两部,来一条数据,遍历queue中所有的state,看看有哪些state能匹配上,根据遍历queue的结果更新queue,用于下一条数据的匹配,最后调用select方法处理结果。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//老版本需要手动设置,新版本是过期的
env.enableCheckpointing(5000);
env.getCheckpointConfig(、setMinPauseBetweenCheckpoints(500);//两个barriers之间的最小间隔
env.getCheckpointConfig(、setFailOnCheckpointingErrors(false);//checkpoint中间出现失误,整体任务是否直接失败,默认是true
--默认就用,可以不写的:
//设置checkpoint的执行模式为EXACTLY_ONCE(默认),注意:得需要外部支持,如Source和Sink的支持
env.getCheckpointConfig(、setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间,如果Checkpoint在60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig(、setCheckpointTimeout(60000);//默认10分钟
//设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig(、setMaxConcurrentCheckpoints(1);*///默认为1*/
检查数据源是否堆积,还有就是检查 Flink 是否有数据积压,通过 FlinkWebUI 端口号查看进行中的任务(8081),查看是否出现了反压,webUI 提供了可视化的反压监控功能。
Flink 有 exactly-once 一致性语义,spark 不能做到恰好一次消费,可能需要整合其他框架如 redis 等。
Flink 中 watermark 通常与 window 窗口进行结合使用,正常情况下数据全部到达 window 中后,才对数据进行分析处理,但是无法避免有延迟的数据,watermark 就是为了解决这个问题,watermark 可以保证数据尽量完整的到达 window 窗口中(当事件进入窗口中,会根据事件最大的事件时间产生watermark时间戳)
watermark 的 window是怎么触发窗口函数的?
watermark>=窗口结束时间即可触发
如何计算 watermark 的值?
watermark=事件时间-最大允许的延迟时间
最小值
checkpoint+exactly-once
checkpoint:
有一些 barriers 机制,主要是做分布式快照,对状态和计算过程进行快照存储,防止计算过程中状态的丢失导致数据要重新计算,容错机制 Flink 重新在服务器启动的时候,要基于 checkpoint 重新启动。
savepoint:
savepoint 是 checkpoint 的一种特殊实现,底层其实也是使用的是 checkpoint 的机制,savepoint 的用户一般使用手动命令的方式来触发 checkpoint,并且将结果持久化到指定的存储路径中(hdfs 路径),其重要的目的就是帮助用户在升级或者维护集群过程中保存系统中的状态数据,避免防止运维的一些正常操作使作业无法恢复到原有的计算状态,从而无法实现端到端的 Excatly-Once 的语义保证。
SavePoint 的路径需要在 flink-conf.yaml
中配置。
state.savepoints.dir:hdfs://node01:8020/flink/state/savepoint
系统大规模的升级维护的时候用到,先手动执行savepoint,然后停止job,然后重启job
checkpoint 就是数据跟着 barriers 走,一般运行一次的时间就是流式数据执行一条数据的时间,一般是秒级,state 的大小 300M 左右即可。
checkpoint 执行一次的时间根据数据量来决定,一般 Flink on Hive 中,数据量较小的 checkpoint 可以达到秒级,数据量较大时要达到几分钟级。
首先 Driver 启动创建 SparkContext,sc 向 ClusterManager 注册资源,CM 向 Worker 分配资源,Worker 启动 executor 进程,然后 executor 等待 task 的提交,Driver 构建 DAG,DAG 提交到 DAGSchedule 进行划分 Stage,然后到 TaskSchedule 提交 task 到 executor 中,最后注销资源。
Spark 的并行度取决于 Stage 中的 task 的数量,代表各个阶段 Stage 的并行度,一般 task 的数量为 SparkCore 总数的2~3倍最佳。
client 是将 driver 运行在本地,然后和 yarn 集群进行远程通信,所以说有远程通信带来的资源的损耗,但是好处是运行结果可以再本地显示出来。
cluster 是将 driver 分配到 yarn 集群中,减少了远程传输带来的资源的损耗,结果不能再本地显示,但是可以通过 webUI8088 中的 application 中的 log 文件中查看结果。
driver-core:默认是1
driver-memory:driver的内存大小,默认是512M
executor-core:默认是1官方建议2~5个,我们是4个
num-executor:默认是2个,设置可以为10个
executor-memory:executor的内存大小,默认为1G
提交的样板:
spark-submit\
--masterlocal[5]\
--driver-cores2\
--driver-memory8g\
--executor-cores4\
--num-executors10\
--executor-memory8g\
--classPackageName.ClassNameXXXX.jar\
--name"SparkJobName"\
InputPath\
OutputPath
首先 Yarn 是支持动态的资源分配,而 Sparkalone 的资源分配是固定的,按照 job 顺序依次分配资源,资源不够的时候就排队,而且 Sparkalone 具有单点故障的问题,最后 Yarn 作为一个成熟的资源调度平台不光可以对 Spark 提供资源调度,还有Hadoop,MR,Hive等。
血统可以用来进行容错,Spark 中的 RDD 之间存在依赖关系,一旦某个 RDD 的数据发生丢失,可以从父 RDD 中进行重新计算,不用进行整个的计算。
Transformation
map flatMap reduceByKey groupByKey filter mapPartition combineByKey aggregateByKey union coalesce persist cache join leftOutJoin rightOutJoin textFile mapValues flatMapValues
Action–触发Job的算子
first
take
top
reduce
saveAsTextFile
count
countByKey
countByValue
collect
foreach
foreachPartition
distinct
Shuffle–产生Stage划分的算子
reduceByKey
groupByKey
join
coalesce
repartition
repartition 的底层就是 coalesce,但是试讲 shuffle 开启的,coalesce 是根据传入的参数来决定 shuffle 是否开启,一般增大 rdd 的分区数用 repartition,减少 rdd 的分区数用 coalesce。
两个都有 shuffle 的操作,都是按照 key 进行分区,但是 reduceByKey 在 shuffle 之前进行了预聚合,在使用中 reduceByKey 是优于 groupByKey 的。
cache的底层就是persist,但是persist是缓存到内存中的,checkpoint是持久化到磁盘的
persist使用后要使用unpersist进行内存的释放,而checkpoint不需要
persist保留着RDD之间的依赖关系,而checkpoint切断了血缘关系
cache只有一种缓存就是MEMORY_ONLY,但是persist可以根据需求进行不同的缓存位置
累加器是一种分布式的变量的机制,记录分布式的改变,并聚合这些改变,通常用于记录事件变化的次数。
共享变量就是由于Spark是分布式的,如果有一个数据集,集合中的每一个任务都要用到这个数据集,那么可以用共享变量的方式将变量分布到每一个机器中,减少了数据的远程传输带来的资源损耗。
广播变量只能由 Driver 端定义
在每个 executor 端只能读取不能修改
只能在 Driver 端进行修改
广播变量使用 BlockManager 进行管理,存在每个 executor 的内存中
每次遇到一个 action 都会生成一个 job,用户提交的 job 会提交给 DAGScheduler,job 会被分解成 Stage 和 Task。
可以从血统或者窄依赖这个角度进行分析,还有就是 checkpoint。
worker 就是每个节点上启动的进程,负责管理本节点的进程,其实也可以理解为就是程序真正运行的节点,executor 是 Spark 程序启动之后启动的一个进程,一旦 Spark 程序结束了那么 executor 也会随之消失,和 Spark 程序有着相同的生命周期。
Master 是在 Local 模式和 Standalone 下的负责分配资源的和资源管理工作的进程。
当 Spark 进行相对应的操作的时候,Spark 会将操作记录在一个日志中,然后执行操作,一旦执行的过程中操作出现了中断,那么就重新读取日志,重新进行日志中的操作即可。
Driver 是程序启动后启动的一个进行,里面带有程序的 main 方法,是程序的入口点
Driver 的作用是,如果是 Master 模式就向 Master 申请资源注册资源,如果是 Yarn 模式就向 Yarn 集群进行资源的申请,生成 Stage,并调度 Task 到 executor 上。
首先是基于内存进行计算的,减少了大量的磁盘交互,其次是高效的调度方法,基于 DAG 进行任务执行,有 Lingage 血统容错机制,即使 Spark 不基于内存速度也会比 MR 块。
RDD是一个分布式的数据集,是Spark中最基本的数据抽象,RDD有这么五个特点,分别是:
依赖性:RDD之间有依赖关系。
计算位置:RDD会自己选择最佳的计算位置,可以避免数据移动带来的开销。
计算函数:用来计算RDD各个分区上的数据。
分区列表:每个分区里是RDD的部分数据。
分区器(可选):用于键/值类型的RDD,比如某个RDD是按散列来分区。
RDD 不支持细粒度的写和更新操作,不支持迭代计算,Flink支持。
transformation 和 action;还有 crontroller,如 persist,cache 等控制算子,对性能效率有很好的支持。
1)使用程序中的集合创建rdd。
2)使用本地文件系统创建rdd。
3)使用hdfs创建rdd。
4)基于数据库db创建rdd。
5)基于Nosql创建rdd,如hbase。
6)基于s3创建rdd。
7)基于数据流,如socket创建rdd。
如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。
首先输出多个小文件的时候会产生多个 task,还有就是分区数量影响 task 的数量,可以再配置文件中进行更改。
使用 foreachPartition 代替 foreach,因为 foreach 是有几个元素就进行几次连接,而 foreachPartition 是有几个分区进行几个连接,减少连接带来的资源的消耗。
Job 是每次触发 Action 函数的时候都会形成一个 Job,Task 是 Stage 中的 RDD 分区数是多少就有多少个 task。
executor内存:主要是记录一些shuffle等计算过程中的临时数据。
storage内存:主要是cache/persist时的内存。
用户内存:主要是记录RDD之间的Transformation和依赖关系的地方。
预留内存:用来存储Spark内部对象。
使用wholeTextFile,读取目录下的所有小文件。
当一个 RDD 的计算过程比较复杂,并且这个 RDD 后续要被多次使用的时候进行缓存。
RDD 的好处是安全,编译时就能检查出类型的错误,面向对象的方式进行打点调方法,但是缺点就是进行大量的序列化和反序列化的操作,还会进行频繁的创建和销毁对象的操作,势必会增加 GC。
DataFrame 在 RDD 的基础上加入了 schema 约束,让数据变得更加的结构化,减少了序列化和反序列化操作。
DataSet 在 DataFrame 的基础上添加了泛型的操作。
三者转化:
//rdd->DF:
.toDF
//rdd->DS:
.toDS
//DS->rdd:
.rdd
//DF->rdd:
.rdd
//DF->DS:
.toDS
//DS->DF:
.as[泛型]
相当于 map 和 reducebykey 算子,但是还是有一定的区别的,MR 是自动排序的,spark 要看用的是什么 partitioner。
shuffle翻译过来是洗牌的意思,将带有共同特征的数据汇聚到一个节点上进行计算时用shuffle。
并行度在 Spark 中非常重要,指的是 Stage 中可以并行执行的task的数量,官网建议的是 CPU 核数 * executor 数量的 2~3 倍最佳。
valconf=newSparkConf()
valsc=newSparkcontext(conf、setMaster("locla[*]")
valresult=sc.textFile("xxx.txt")
.flatMap(_.split(""))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile()//.foreach(println)
sc.stop()
cache 后面可以加其他算子,但是加了其他的算子就起不到缓存的作用了,因为会重新触发 cache,cache 不是 action 操作,是 transformation 操作。
spark-submit
https://www.cnblogs.com/haozhengfei/p/e19171de913caf91228d9b432d0eeefb.html
spark 的并行度就是 stage 上同时运行的task的数量,一般 spark 设置并行度和 cpu 的 core 相同即可,官方建议是 cpu 的 core 的 2~3 倍,但是一般设置相同即可。
SparkStreaming 的执行主要是靠 Driver 和 Executor之 间的相互协作进行的。
首先 Driver 端创建病启动 SparkContext 对象,SparkContext 对象中管理着 DStreamGraph 和 JobSchedule,DStreamGraph 主要是来定义 DStream 并且管理 DStream 之间的依赖关系,JobSchedule 主要是生成和调度 Job,executor 是在节点中的一个进程,是随着 Spark 程序的结束而消失的,可以说 executor 和 Spark 程序有着相同的生命周期。
executor 端不断地接收消息,调用 store() 方法,将数据存储,然后处理数据,处理数据就是生成 job,执行 job,job 是 DStreamGraph 根据 DStream 之间的依赖关系生成的,生成后会提交到 submitjobSet 中,然后执行 job,job 会根据我们编写的代码执行输出。
receiver
receiver 是使用 Kafka 的高层次的 Consumer-API 进行实现的,数据是存储在 executor 的内存中,如果数据短时间内大量增加,batch 数据堆积,可能会出现内存溢出问题,然后 SparkStreaming 会启动job处理这些数据,但是这种处理数据依然可能会因为失败而丢失数据,所以底层还会有预写日志机制,会将从 Kafka 处理的数据同步到 HDFS 上的,所以即使出现问题也是可以再日志中进行恢复的。
direct
Spark1.3 中引用了 direct 方式,摒弃了 receiver 这种方式,这种方式会周期性的查询 Kafka,获取 Topic+Partition 的 offset,通过 Kafka 简单的 Consumer-API,获取 Kafka 的 offset 区间的数据。
对比
receiver 是使用 Kafka 高层次 Consumer-API 在 ZK 中保存消费过的 offset,这是消费 Kafka 数据的传统方式,这种方式要配合 WAL 机制,可以保证数据不丢失,但是无法保证数据只被处理一次,因为 Spark 和 ZK 无法做到同步,可能被处理多次,但是 direct 模式使用 Kafka 简单的 API,采用 SparkStreaming 自动追踪 offset,并保存在 checkpoint 中,Spark 自己本身肯定是能保证同步的,因此可以保证数据消费一次,生产中大多使用 direct 模式。
窗口用来处理指定时间内的数据,并且需要通过设置参数指定窗口滑动的步长,用来设置当前任务计算完成后,下次任务从什么时间进行计算。
Spark 的 Job 的运行信息
//TODO2.transformation //RDD[(单词,数量)] valresult:RDD[(String,Int)]=lines.filter(StringUtils.isNoneBlank(_)) .flatMap(_.split("")) .map((_,1)) .reduceByKey(_+_) //需求:对WordCount的结果进行排序,取出top3 valsortResult1:Array[(String,Int)]=result .sortBy(_._2,false)//按照数量降序排列 .take(3)//取出前3个 //result.map(t=>(t._2,t._1)) valsortResult2:Array[(Int,String)]=result.map(_.swap) .sortByKey(false)//按照数量降序排列 .take(3)//取出前3个 valsortResult3:Array[(String,Int)]=result.top(3)(Ordering.by(_._2))//topN默认就是降序 //TODO3.sink/输出 result.foreach(println) sortResult1.foreach(println) sortResult2.foreach(println) sortResult3.foreach(println)
RDD 间的 lineage 机制,RDD 之间有依赖关系,宽依赖和窄依赖。
checkpoint 机制,DAG 中的血缘过长,如果一旦重新计算消耗的资源太大,所以需要进行检查点的设置,另外在宽依赖上进行 checkpoint 获得的收益更大。
packagecn.itcast.sql importorg.apache.spark.SparkContext importorg.apache.spark.sql.expressions.UserDefinedFunction importorg.apache.spark.sql.{Dataset,SparkSession} /** *Authoritcast *Desc演示SparkSQL-使用SparkSQL-UDF将数据转为大写 */ objectDemo09_Hive{ defmain(args:Array[String]):Unit={ //TODO0.准备环境---需要增加参数配置和开启hivesql语法支持 valspark:SparkSession=SparkSession.builder(、appName("sparksql"、master("local[*]") .config("spark.sql.shuffle.partitions","4")//本次测试时将分区数设置小一点,实际开发中可以根据集群规模调整大小,默认200 .config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse")//指定Hive数据库在HDFS上的位置 .config("hive.metastore.uris","thrift://node2:9083") .enableHiveSupport()//开启对hive语法的支持 .getOrCreate() valsc:SparkContext=spark.sparkContext sc.setLogLevel("WARN") importspark.implicits._ //TODO1.操作Hive spark.sql("showdatabases"、show(false) spark.sql("showtables"、show(false) spark.sql("CREATETABLEperson4(idint,namestring,ageint)rowformatdelimitedfieldsterminatedby''") spark.sql("LOADDATALOCALINPATH'file:///D:/person.txt'INTOTABLEperson4") spark.sql("showtables"、show(false) spark.sql("select*fromperson4"、show(false) spark.stop() } }
作为目前最火爆的开发语言,使用Java已经是大数据开发者的基本技能,很多大数据组件也适用于Java开发,如:Flink
1、继承Thread类。
2、实现Runnable接口。
3、实现Callable接口。
4、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。
详细:https://www.cnblogs.com/big-keyboard/p/16813151.html
1、利用 HashSet,泛型指定我们创建好的 JavaBean,通过 hashSet 中的 add() 方法,进行去重。
public static void main(String[] args) { Date date=new Date(); //获取创建好的javaBean对象并进行赋值 JavaBean t1 = new JavaBean(); t1.setLat("121"); t1.setLon("30"); t1.setMmsi("11"); t1.setUpdateTime(date); //再创建一个javaBean对象,赋同样的值 JavaBean t2=new JavaBean(); t2.setLat("121"); t2.setLon("30"); t2.setMmsi("11"); t2.setUpdateTime(date); //用HashSet HashSet<JavaBean> hashSet = new HashSet<JavaBean>(); hashSet.add(t1); hashSet.add(t2); System.out.println(hashSet); System.out.println(); for(JavaBean t:hashSet){ //只会出现一个值 System.out.println(t); }
2、利用 ArrayList,泛型指定我们创建好的 javaBean,通过 ArrayList 中的 contain() 方法进行判断后去重。
//用List
List<TestMain18> lists = new ArrayList<TestMain18>();
if(!lists.contains(t1)){
lists.add(0, t1);
}
if(!lists.contains(t2)){//重写equals
lists.add(0, t2);
}
System.out.println("长度:"+lists.size());
==
用来判断对象在内存中的地址是否相等,equals
用来判断对象中的内容是否相等。
1、Timer timer = new Timer() timer.schedule(重写 new TimerTask 方法)
//方式1:
Timer timer = new Timer();
//TimerTask task, 要定时执行的任务
//long delay,延迟多久开始执行
//long period,每隔多久执行延迟
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("每隔1s执行一次");
}
}, 5000,1000 );*/
2、Executors.newScheduleThreadPool(线程池数量)返回的对象 scheduleAtFixeRate,重写 run 方法。
//方式2:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("每隔1s执行一次");
}
},5,1, TimeUnit.SECONDS);
3、SprintBoot 中提供了定时任务的相关注解,使用起来特别方便,利用corn定义触发的规则。
@Component//表示该类是Spring的组件,会由Spring创建并管理
@EnableScheduling//表示开启定时任务扫描
public class TestTimedTask2 {
//https://cron.qqe2.com/
@Scheduled(cron = "0/3 * * * * ? ")
public void task(){
System.out.println("每隔3s执行一次");
}
}
SQL 作为计算机行业最基本的语言之一,也是必须要了解的。
avg(),max(),min(),sum(),count()
等
注意:
聚合函数不会自己使用,也不会和 where 一起使用,一般使用的时候都是和 group by 一起使用(分组必聚合),还有就是在 having 语句后面进行使用。
inner join
(内连接):就是找到两个表中的交集。
left join
(左外连接):以左边的表为主,如果没有与右边的表相对应的用 null 补充在表中。
right join
(右外连接):以右边的表为主,同上。
full join
(满外连接):Hive 中特有的,MySQL 中没有,保留两边的表的所有内容,没有对应的互相都用 null 补充在表中。
MySQL的数据结构为 B + 树
1、关系型数据库的特点:
结构化的存储
采用结构化的查询语言sql
操作数据要具有一致性,比如事务操作
可以进行join等复杂查询
无法进行大量数据的高并发读写
2、nosql数据库的特点:
非结构化的数据库
高并发大数据下读写能力强
事务性差
join 的复杂操作能力弱
在大数据环境安装部署、提交 jar 包的时候,都会应用到 Linux 操作系统,所以了解需要了解 Linux 常用命令。
1、由客户端向 RM 提交任务(MR,Spark…)
2、RM 接收任务,并根据任务随机找一台NM,启动 AppMaster,通知以 container 方式。
container:资源信息容器(节点信息,内存信息,CPU信息),运行:AppMaster
3、指定 NM 启动 AppMaster,启动后和 RM 保持心跳机制,用于报告当前已经启动了,并且通过心跳来传递相关信息。
4、根据 RM 给定任务信息,根据任务信息,对任务进行分配,主要会分配出要启动多少个 map 和多少个 reduce,以及每个 map 和每个 reduce 需要使用多大资源空间,然后将资源申请相关信息发送给 RM(心跳发送)
5、RM 接收到资源申请信息后,将申请信息交给内部资源调度器,由资源调度器,根据相关的资源调度方案,进行资源分配即可,如果当下没有资源,在此处等待。
注意:
资源并不是一次性全部给到 AppMaster,一般会采用极可能满足方案,如果满足不了,会先给与一定资源进行运行,如果空闲资源连一个 container 都不足,就会将这些资源挂起,等待资源充足。
6、AppMaster 基于心跳机制,不断询问RM是否已经准备好了资源了,如果发现已经准备好了,然后直接将资源信息获取。
7、根据资源信息说明,到指定的 NM 上启动 container 资源容器,开始运行相关任务。
8、NM 接收启动的信息后,开始启动执行,此时会和 AppMaster 以及 RM 保持心跳连接。
RM 将任务的相关信息根据心跳通知 AppMaster
AppMaster 将资源的使用信息根据心跳通知 RM
9、当 NM 运行完成后,会通知 AppMaster 并将资源使用完成情况通知给 RM。
10、AppMaster 告知给 RM 任务已经运行完成了, RM 回收资源,通知AppMaster进行自毁即可。
注意:
当 NM 在运行过程中,如果发生错误了,此时 RM 会立即将资源回收,此时 AppMaster 就需要重新和 RM 申请资源。
1、FIFO scheduler
:先进先出调度方案
当一个调度任务进入到调度器之后,那么调度器会优先满足第一个MR任务全部资源,此时就有可能将资源全部都获取到了,导致后续的任务本身的运行时间很短,但是由于第一个MR将资源全部抢走了, 导致后续任务全部等待。
此种调度器在生产中 一般不会使用,因为生产中yarn平台不是你自己的。
2、Fair scheduler
:公平调度器
可以预先分配出多个队列, 相当于对资源进行预先的划分。
3、capacity scheduler
:容量调度器
此种调度器是有 Yahoo 提供一种调度方案,同时也是当下Apache版本的hadoop默认调度方案。每个队列,可以指定占用多少的百分比的资源,从而保证,大的任务可以有单独的队列来运行,并且小的任务,也可以正常的运行。
clustermanager、nodemanager、applicationmaster。
【Hadoop-HDFS】HDFS的读写流程 & SNN的数据写入流程
HDFS 擅长存储大文件,我们知道,HDFS 中每个文件都有各自的元数据信息,如果 HDFS 中有大量的小文件,就会导致元数据爆炸,集群管理的元数据的内存压力会非常大(namenode 节点)
1、使用官方工具 parquet-tools 合并指定的 parquet 文件。
# 合并 HDFS 上的 parquet 文件
hadoop jar parquet-tools-1.9.0.jar merge /tmp/a.parquet /tmp/b.parquet
# 合并本地的 parquet 文件
java -jar parquet-tools-1.9.0.jar merge /tmp/a.parquet /tmp/b.parquet
2、合并本地的小文件,上传到 HDFS(通过 HDFS 客户端的 appendToFile 命令对小文件进行合并上传)
hdfs dfs -appendToFile user1.txt user2.txt /test/upload/merged_user.txt
3、合并 HDFS 的小文件,下载到本地,可以通过 HDFS 客户端的 getmerge 命令,将很多小文件合并成一个大文件,然后下载到本地,最后重新上传至 HDFS。
hdfs dfs -getmerge /test/upload/user*.txt ./merged_user.txt
4、Hadoop Archives (HAR files)是在 0.18.0 版本中引入到 HDFS 中的,它的出现就是为了缓解大量小文件消耗 NameNode 内存的问题。
HAR 文件是通过在 HDFS 上构建一个分层文件系统来工作。HAR 文件通过 hadoop archive 命令来创建,而这个命令实际上是运行 MapReduce 作业来将小文件打包成少量的 HDFS 文件(将小文件进行合并成几个大文件)
# Usage: hadoop archive -archiveName name -p <parent> <src> <dest>
# har命令说明
# 参数 “-p” 为 src path 的前缀,src 可以写多个 path
# 归档文件:
hadoop archive -archiveName m3_monitor.har -p /tmp/test/archive_test/m3_monitor/20220809 /tmp/test/archive
# 删除数据源目录:
hdfs dfs -rm -r /tmp/test/archive_test/m3_monitor/20220809
# 查看归档文件:
hdfs dfs -ls -R har:///tmp/test/archive/m3_monitor.har
# 解归档:将归档文件内容拷贝到另一个目录
hdfs dfs -cp har:///tmp/test/archive/m3_monitor.har/part-1-7.gz /tmp/test/
namenode、datanode、secondarynamenode;namenode 有 active 和 standby。
SecondaryNameNode 并不是 NameNode 的备份节点,主要是将内存中的 Fsimage 和磁盘上的 Fsimage 文件进行合并。
【Hadoop-HDFS】HDFS中Fsimage与Edits详解
【Hadoop-MapReduce】MapReduce编程步骤及工作原理(详见标题4:map 阶段的工作机制)
【Hadoop-MapReduce】MapReduce编程步骤及工作原理(详见标题5:reduce 阶段的工作机制)
不管是 map 阶段还是 reduce 阶段,大量进行磁盘到内存,内存到磁盘相关的 IO 操作,主要目的能够解决处理海量数据计算问题。
带来好处:能够处理海量的数据。
带来的弊端:造成大量的磁盘 IO 工作导致效率比较低。
配置 | 默认值 | 释义 |
---|---|---|
mapreduce.task.io.sort.mb | 100 | 设置环型缓冲区的内存值大小 |
mapreduce.map.sort.spill.percent | 0.8 | 设置溢写的比例 |
mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 溢写数据目录 |
mapreduce.task.io.sort.factor | 10 | 设置一次合并多少个溢写文件 |
Hive 的元数据存储在mysql中(默认derby,不支持多客户端访问),数据存储在 HDFS,执行引擎为 MR。
1、建表时用 external 区分。
2、删除外表时删除元数据,删除内表是删除的是元数据和存储数据。
3、外表存储位置自己定,内表存储位置在 /uer/hive/warehouse 中。
1、创建表时指定字段进行分区。
2、使用 alter table 对已经创建完成的表进行分区的增加和删除操作。
# 建表:
create table tablename(col1 string) partitioned by(col2 string);
# 添加分区:
alter table tablename add partition(col2=’202101’);
# 删除分区:
alter table tablename drop partition(col2=’202101’);
3、修改分区。
alter table db.tablename
set location '/warehouse/tablespace/external/hive/test.db/tablename'
alter table db.tablename add if not exists partition (sample_date='20220102',partition_name='r') location '/tmp/db/tablename/sample_date=20220102/partition_name=r';
msck repair table test;
1、Hive 中的排序方式有:order by,sort by,distribute by,cluster by
2、四种排序方式的区别:
order by
: 对数据进行全局排序,只有一个 reduce 工作
sort by
: 一般和 distribute by 一起使用,当 task 为 1 时效果和 order by 一样
distribute by
: 对 key 进行分区,和 sort by 一起实现对分区内数据的排序工作
cluster by
: 当 order by 和 distribute by 的字段相同时可以用 cluster by 代替,但是只能升序
注意:
生产环境中使用 order by 比较少,容易造成 oom,使用 sort by 和 distribute by 较多。
三者都是对数据进行标号排序
row_number()
:不会出现序号的增加或减少,当数值相同时也会有排名先后之分
rank()
:当排序相同时序号重复,总序不会变化
dense_rank()
:当排序相同时序号重复,总序会减少
导入数据:
① load data 的方式可以对本地或HDFS上的数据进行导入
② Location方式
create external if not exists stu2 like student location '/user/hive/warehouse/student/student.txt';
③ sqoop方式
导出数据:一般用sqoop方式
导出数据时采用 --input-null-string
和 --input-null-non-string
两个参数。
导入数据时采用 --null-string
和 --null-non-string
。
Sqoop 中遇到特殊字符可以使用 hive-drop-import-delims
丢弃,也可以使用 --hive-delims-replacement
,它会将特殊字符替换为我们设定的字符。
只有 map 阶段,没有 reduce 阶段的任务。默认是 4 个 MapTask。
1、Oozie 一般不单独使用,因为需要配置 xml 文件很麻烦。
2、Oozie 一般与 Hue 一起使用,可以调度各种任务比如 Shell,MR,Hive等。
3、无论是单独使用还是与 Hue 集成,Oozie中最重要的点都在于 workflow 的配置。
4、与 Hue 整合时在 Schedule 中配置定时任务的时间,在 workflow 中配置任务的相关位置信息。
1、元数据存储在 Mysql 中。
2、有三种部署模式: solo-server(所有服务在一台服务器上), tow-server(web,executor在不同服务器上),multiple-executor-server(一般不常用)。
3、azkaban的任务调度:
Hive脚本: test.sql
use default;
drop table aztest;
create table aztest(id int,name string) row format delimited fields terminated by ',';
load data inpath '/aztest/hiveinput' into table aztest;c
reate table azres as select * from aztest;insert overwrite directory '/aztest/hiveoutput' select count(1) from aztest;
hive.job(名称.job)
type=command #固定
dependencies=xx #有依赖的任务的时候添加这个
command=/home/hadoop/apps/hive/bin/hive -f 'test.sql'
将所有文件打包成 zip 包上传到 Azkaban 上,然后点击 summary,然后选择 schedule 进行时间的配置。
Flume 是一个用来实时采集流数据的分布式数据采集系统,容错性强,可靠性高。
(source,channel,sink)
1、Flume 中的 agent:包含 source,channel,sink 的统称。
2、source:是用来采集数据的组件,可以监控一个文件的变化,可以监控一个目录下新文件的变化,可以监控一个目录下所有文件的内容变化,也可以自定义数据源,在配置的时候要配置 source 的名称。
3、channnel:可以配置两种方式进行数据的缓存,一种是内存,另一种是以生成文件的方式。
4、sink:支持 HDFS,Kafka,自定义目标源等,还可以支持下一个 agent。
5、event:Flume 将采集到的数据封装到 event 中进行传输,本质上是一个字节数组。
1、Flume 可以以 agent 的方式进行串联。
2、Flume 可以并联,将多个 agent 的 sink 传输到新的 agent 的 source 中。
3、Flume 可以 串联+并联+多sink 等。
1、source,channel,sink 的名称。
2、channel 是基于内存还是基于文件。
3、sink 对应的 channel。
4、sink 到 kafka 的话要配置 kafka 的 topic,端口号,ack 等。
1、查询速度快
分区,文件分段。
二分查找法定位消息在哪个段(文件)中。
2、写入速度快
顺序写入。
零拷贝:数据直接从磁盘文件复制到网卡设备中,不需要经过应用程序之手。零拷贝技术通过 DMA 技术,将文件内容复制到内核模式下的 ReadBuffer,直接将数据内核的数据传递到网卡设备中,所以零拷贝是针对于内核,数据再内核模式下实现了零拷贝。
批量发送:通过 batch.size 参数来设置批量提交数据的大小,默认是16K,当数据积压到这一值时就会统一发送,数据会发送到一个分区中。
数据压缩:Producer 端压缩,Broker 端保持,Consumer 端解压缩。
可以利用两阶段事务提交(Flink)或者容器去重(HashSet,Redis,布隆过滤器)
Kafka 是全局无序但是局部有序,只要我们在推送消息的时候都推送到同一个分区,消费时也指定一个分区消费即可。
提升读写效率 + 方便集群扩容 + 消费者负载均衡
1、生产者端,设置 ack(0、1、-1 / all)
0
:生产者端不会等到 Broker 端返回 ack,继续生产数据。
1
:当 Broker 中的 Leader 端收到数据就返回 ack。
-1 / all
:当 Broker 中的 Leader 和 所有的Follower 都接收到数据才返回 ack。
2、消费者端,采用先消费后提交的方式,宁愿重复消费也不能让数据丢失。
3、Broker 端:有副本机制保证数据安全性。
同一时间一条消息,只能被同一个消费者组的一个消费者消费,不能被同一个消费者组的其他消费者消费。但是可以被不同消费者的消费者组所消费。
【Kafka-架构及基本原理】Kafka生产者、消费者、Broker原理解析 & Kafka原理流程图
HMaster
HRegionServer
Region
zookeeper
HBase:Client -> Zookeeper -> HRegionServer -> HLog -> Region -> store -> memorystore -> storeFile -> HFile
Hash
时间戳倒转
HBase 在创建表的时候可以指定预分区规则。
HBase 建表后和可以通过 split 命令进行分区的更改。
HBase 也可以再建表的时候通过 split.txt 文件中的信息通过 SPLIT_FILE 命令进行预分区。
优点:
支持非结构化数据的存储
相对于关系型数据库HBase采用列式存储,写入的效率很快
HBase中的null值不会被记录在内,节省空间并提高了读写性能
支持高并发的读写
支持大量数据的存储工作
缺点:
本身并不支持sql查询
不适合大范围的扫描查询
CH 建表时要指定引擎。
更新:Alter table update (xx="xx") where 主键 = ?
删除:Alter table delete where 主键= ?
写入:insert into table() values()
MergeTree:
MergeTree引擎共同特点:
用插入来表示数据的更新。因为在 CH 中,主键列不是唯一的,仅仅是为了创建索引,提高我们的查询效率,再次插入数据,数据重复。在 MergeTree 引擎中,后台会有一个进程,在定期执行数据合并操作,但是什么时候不知道。
注意:
如果在插入了一条数据之后,马上使用 select * from tablename
,这个操作不会读取到最新的数据,要等后台合并之后查询到的数据才是最新的,因此为了确保每次查询都是最新的数据有两种解决办法:
1、手动触发后台进程合并操作:optimize table 表名 final
手动合并,但是如果大量数据进行合并会变慢,而且合并过程中表不可以(不推荐)
2、每次查询的时候对数据进行分组聚合
操作,已达到去重的目的。
日志引擎:
TinyLog:
TinyLog 最小最轻量级的引擎,当我们需要快速写入小表(100w 行以内),以后整体读取它们这个引擎是最高效的,追加的方式。
TinyLog缺点:
没有并发控制,同时读取和写入的时候,读取操作报异常,同时写入多个表,数据会被破坏,不支持索引。
集成引擎:
Kafka,MySQL,HDFS,ODBC,JDBC,与其他存储数据库或者存储介质进行整合时使用。
项目没使用集成MySQL的引擎原因:
因为 CH 是映射 MySQL 的表,本质上还是在 MySQL 中做 select 查询,对数据库造成访问压力。
注意:
使用了 MySQL 引擎关联 MySQL 中指定的库,那么在 CH 中就会出现一个库,这个库中是用 MySQL 的 sql 语法来进行操作的。
其他特定功能引擎(用的比较少)
MergeTree:
1、orderby 一般用于排序主键,使用了 orderby 就不用 primary by 了。
2、建表后,在指定的目录下,会按照分区进行分文件夹,分区中每个列都是一个 .bin 文件,是一个列被压缩的表现,.idx 文件代表主键索引,.mrk 保存块偏移量。
ReplacingMergeTree:
1、如果表会进行进行修改使用此引擎,但是此引擎只保证数据最终被修改,无法保证查询过程中主键重复 。
2、ENGINE=ReplacingMergeTree([ver])
,ver 为版本,如果 ver 没有指定,那么会保留最后一次修改的内容,如果 ver 指定,保留第一次的内容。
SummingMergeTree:
1、聚合功能,只能进行sum聚合。
2、SummingMergeTree([columns])
,如果指定了 columns,那么所选列不能是主键和非数值类型,如果没有选,会自动把非主键列中的数值列进行聚合。
3、如果不手动刷新,就要等待后台自动进行聚合,或者使用分组聚合的 sql 查询可以得到刷新后的结果。
AggregatingMergeTree:
1、聚合功能,功能更加全面,除了 sum 外还有别的聚合功能。
2、一般和 MergeTree 一起使用,把明细数据存放到 MergeTree 中,然后 insert into 插入到 AggregatingMergeTree 表中,然后再使用分组聚合查询就能得到聚合后的结果。
CollapsinMergeTree:
CH 中不支持 update 和 delete 操作(不能使用标准的更新和删除语法操作 CH),因此 CH 推荐的方式使用 CollapsinMergeTree 进行删除,在表中增加一个字段 sign,1 表示状态行,-1 表示取消行,用取消行来表示状态行的数据被干掉了,当触发合并操作之后,会对 1 和 -1 进行折叠操作,也就是同时删除了这两条数据。
状态行和取消行不折叠的两种情况:
1、由于合并机制是后台发生的,具体的执行时间无法预测,所以可能会出现数据冗余的情况。
2、如果是先出现的取消行,然后出现的状态行,也无法进行折叠。
3、CH不能保证数据插入的时候相同主键落到同一个节点上,不同节点上的数据是无法进行折叠的。
解决方式:在查询数据的时候,使用 sum 将 sign 字段求和,having 出大于 0 的数据即可。
注意:取消行的特点是除了 sign 列其他的数据都是状态行的拷贝,状态行的数据可以来自于 ogg 和 canal,但是取消行的数据我们难道要通过根据主键查询 CH 表的数据,然后复制拼接取消行数据的 sql 语句吗?
答案:不需要,ogg 中的删除操作有 before 字段可以直接作为取消行插入到 CH 表中,canal 中的删除操作有 data 字段可以直接作为取消行插入到 CH 表中。
VersionCollapsinMergeTree:
相对于 CollapsinMergeTree 来说对状态行和取消行插入的顺序没有严格的要求了,但是不仅要添加 sign 列还要增加一个 version 列,用上一条数据的 update 时间来作为 version 的值。
1、查询依旧使用分组聚合的方式。
2、或者使用 select * from tablename final
,此操作并没有真正触发合并,而是后台将数据计算好了呈现出来的效果(这是一个非常低效的方式来选择数据,尽量不要用来查询数据量较大的表)
本地表:部署在单机环境下
集群表:部署在集群环境下
在 CH 中不能使用常规的 sql 语句进行 update 和 delete 操作,而是要使用 alter table 进行操作。
如何查看操作的数据:
在CH的system库mutations表中,记录着CH修改操作的记录
mutation 具体过程:
首先用 where 条件找到具体的分区,重建每个分区,用新的分区覆盖老的分区,分区一旦被替换,不可退回(即使修改一条数据,也会覆盖,所以效率上来说比较慢)
ROLAP:(对明细层的数据进行实时聚合计算查询)
impala、presto
MOLAP:(对数据进行多维预聚合加工,然后对预聚合数据进行实时计算查询)
Druid、Kylin
HyBirdOLAP:(ROLAP+MOLAP)
TiDB
ClickHouse 属于 ROLAP,但是也可以做 MOLAP(通过物化视图的方式进行预聚合)
CH 与其他 OLAP 的区别:
Druid
:
实时查询,但是不支持复杂sql查询,不支持数据更新操作。
Kylin
:
亚秒级查询,支持复杂sql查询,但是会构建 cube,容易出现维度爆炸,维度控制在10个左右,不支持数据更新操作。
ClickHouse
:
1、即可以对明细数据进行实时OLAP分析,也可以对数据进行预聚合计算。
2、单表查询优势巨大。
3、支持数据更新操作。
4、支持sql查询,多表联查等操作(不支持窗口函数以及相关子查询)
Kudu 的建表必须要有主键。
Kudu 不需要依赖 Hadoop 集群,Kudu 有自己的存储机制。
hash、range、hash&range(混合分区)三种方式,项目中使用hash方式进行分区。
Kudu 不依赖 HDFS。
master:
负责管理从节点,并且存储数据在从节点中的位置,也就是元数据。
catelog:
元数据的存储位置,无法直接访问,只能通过 API 查询,记录着表的结构,表的位置,状态,以及各个 tablet 开始和结束的 key(分区)
作用:
管理元数据,负载均衡,监听 tablet-server 状态。
tablet-server:
只负责存储数据,类似于 datanode。
1、Kudu 的存储是以表的形式,有分区,有列名,列的类型,列值,类似于一个关系型数据库,但是还不是关系型数据库。
2、Kudu 是主从结构,有 master 和 tablet-server,主节点又分为 action 和 wait 两个。
3、Kudu 的表分区,分区的副本存在于各个节点上,类似于 Kafka。
4、Kudu 的分区类似于 HBase 的 Region,一张表按照一定间隔规则分成若干个分区。
5、Kudu 的主从节点分工类似于 HDFS 中的 datanode 和 namenode。
6、Kudu 表(tablet-server)中的一个分区会被拆分成多个 tablet,每一个分区对应一个 tablet,分区有三种方式:Hash、Range、Hash+Range。
根据DataFrame建表:
创建KuduContext对象(DataFrame,SparkConf)
kuduContext.tableExists(tableName)
dataFream.schema(),返回变结构 StructType。
StructType.fields.map(field=>{StructField(field.name, field.dataType, true/false[判断字段是否为主键])})
将更改好的 StructField 创建成新的表结构。
newCreateTableOption(),指定表的分区方式根据主键字段进行分区,设置副本数。
kuduContext.createTable(tableName, kuduStructType, List(primaryFieldName), options)
Kudu 创建表需要 new 一个 createOptions 对象,通过这个对象指定副本数和分区方式,指定分区方式时要指定主键字段。
OGG 是一种基于日志的结构化数据复制软件,通过解析源数据库在线日志或归档日志获得数据的增删改变化。
OGG 能够实现大量交易数据的获取,转换,发送。
首先 Oracle 要开启 archivelog(归档日志)和辅助日志功能。
ogg源端:(开启管理进程 mgr,extract 进程和 pump 进程)
1、mgr 进程是负责管理两个子进程的。
2、extract 进程是到 oracle 中拉取增删改等日志的作用,然后将日志转换成一个中间文件(LocalTrail)
3、pump 进程是请求目标端的 mgr 进程开启 collect 进程,然后压缩中间文件传送到 connect 进程。
ogg目标端:(开启管理进程 mgr,connect 进程和 replicat 进程)
1、mgr 进程是负责管理两个子进程,并完成与远端 pump 进程的通信。
2、connect 进程是接收 pump 进程传输的中间文件的,connect 进程可以没有,没有的时候 replicat 会代替 connect 接收文件。
3、replicat 进程是用来接收文件,并将文件转换成目标端对应的 SQL 或者对应的插入语句,在目标端执行即可实现数据同步。
Canal 是模仿 MySQL 的主从架构,主备原理:
在 MySQL 主从关系中,salve 会向 master 发送 dump 命令,master 会根据配置情况判断是否是自己的从节点,如果是自身的从节点,那么 MySQL 每更新一条数据,salve 也会随之变更。
Canal 实现原理:
Canal 像是一个伪装者一样,装成 MySQL的从节点,也想 MySQL 的 master 发送 dump 命令,从而达到 MySQL 与 Canal 的数据同步。
首先 MySQL 要开启 bin-log 日志(binary-log)功能,顾名思义,这个日志中记录的都是二进制的数据
(canal是一个cs架构,分别有canal-server和canal-client)
canal-server:
canal-server 有 1~n 个 instance,每个 instance 下有四个进程来协作同步数据,通常一个 instance 对应一个数据库,方便对数据的不同目标端发送。
EventParse:从 bin-log 日志中拉取数据。
MetaManager:记录读取 bin-log 日志的位置,也就是 offset,同时记录 client 端在 EventStore 拉取数据的 offset。
EventSink:从 EventParse 中获取数据,进行解析转换,然后将数据写到 EventSotre。
EventStore:client 从此处拉取数据(是一个内存环形缓冲区)
canal-client:
canal-client 从 server 端拉取数据,每次拉取多少,拉取到了哪里,由 canal-server 中的 MetaManager 所管理。
拉取到的数据经过数据处理,最终将数据写到 Kafka 集群。
Druid 支持海量数据的实时毫秒级查询,适合我们的行情分时数据业务(指数项目中应用)
1、优秀的分布式架构设计,各个角色各司其职分工协作,提供了整体的效率。
2、底层存储支持 Chunk 分文件夹,Segment/Partition 分文件。
3、预聚合,加快查询效率。
4、位图索引(了解即可,只需要知道是用空间换时间,用二进制位运算避免了表扫描即可)
1、Kylin 支持 sql,但是不支持数据更新。
2、Kylin 会对数据进行预计算,也就是构建 cube。
3、Kylin 在预计算的时候比较耗费时间和空间,但是构建完 cube 后面查询效率就提升了。
1、存储在 HBase 中。
2、一个 cube 由多个 cuboid 组成,cuboid 的计算方式为 2 的(维度 -1)次方。
构建完 cube 后的数据量大小 ÷ 原始数据量大小,膨胀率小于 1000% 都是正常。
1、使用衍生维度,会将表的非主键维度排除掉。
2、指定字段不进行 cube 的构建。
3、指定多维度的组合同时出现时才会进行 cube 的构建,单独出现时不会构建 cube。
一张大表,并发访问量不是很高的情况下,想要实现亚秒级查询,可以使用 Kylin。
FE、BE
FE 中有三个进程:leader、follower、observer,leader 和follower 是主从节点的概念,防止单点故障,observer 是当查询压力过大时的拓展查询功能,并且完成元数据的备份工作,所以 observer 只读不写。
BE 作为数据存储的节点,是分布式的,多借点并行进行数据查询,同时 BE 还会将数据存储多个副本中,可按照数据配置。
Broker 是一个无状态进程,可以帮助 Doris 访问外部数据源,通常在每台节点上部署一个 Broker 实例。
主要用到的端口号:
8030:FEHttpServer
8040:BEHttpServer
9030:FEMysqlServer
1、Doris 对于数据查询可以做到毫秒级别的响应速度,但是对于高并发写入不太友好,如 streamload 写入的时候最好是每 1s 或者多秒执行一次 streamload。
2、Doris 支持 jdbc 协议。
3、Doris 的 FE,BE 节点都可以进行扩容和缩容。
1、Doris 安装后会用自身拥有的 jdk,如果我们自行安装就会和 Doris 进行冲突导致 Doris 的宕机。
2、使用 Mysql 客户端登录,查看 FE 的状态:mysql-hip-P9030-uroot-pxxxxx
。
3、Mysql 中的 varchar 到 Doris 中转变成的字符类型要×3。
4、Doris 中的文本类型为 String。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。