当前位置:   article > 正文

【spark_01】_hive spark 批处理

hive spark 批处理

Spark_01_笔记

大数据:
linux 、 shell、mysql、hadoop、hive、sqoop、superset、xxl、flume、kafka

sqoop => datax 、自己开发 数据同步
xxl =》 dolphin
hive => sparksql

数据:
日志数据:log文件
业务数据:mysql 业务库
其他数据:excel、文本数据

hive:table 数据仓库
1.日志数据 =》采集 =》flume=》 hdfs/hive
2.业务数据 =》采集 =》 sqoop =》 hdfs/hive
3. 其他数据 => hadoop fs -put hdfs

数据分析:
业务周期性:

1.离线计算:批处理
hive:构建离线数仓
1.数据分层:把数据解耦
ods层:原始数据层
ods_xxx_table
dwd层: 数据清洗 【脏数据 =》 数据转换】 20张表
dws层:宽表层 【dwd层表进行join操作】
rpt层、app层:指标、报表层
group by +聚合指标

业务周期性:T+1 => 延迟一天处理
rpt:
20230103 =》
处理的数据 20230102的数据

业务周期性:H+1 => 延迟一小时处理
20230103=》
处理的数据 20230103的数据
rpt:
sparksql

2.实时计算:实时处理
spark、flink

日志数据 =》 flume =》kafka
业务数据 =》 maxwell/cannel/flinkcdc => kafka

kafka + spark/flink/storm
kafka 实时数仓: 业务周期性 1min 5min 10min
ods层:业务数据+ 日志数据
dws层:宽表层
rpt层: 报表层

topic:
ods_xx
dws_xx
rpt_xx

数据可视化展示:

离线计算、实时计算 =》 rpt层 =》 数据存储的库 查询速度一定要快 =》 数据可视化展示

​ 离线计算:
​ sqoop =》 mysql/clickhouse/drois => 数据可视化展示
​ 实时计算 =》
​ spark、flink 支持外部数据源 =》 mysql/clickhouse/drois => 数据可视化展示

实时展示: 1min

实时指标: 业务周期性 很快
s级别出数据 :
1s 3s 5s

spark

1.spark产生的背景:

​ 1.mr、hive 批处理,离线处理 存在一些局限性:

​ 1.mr api 开发复杂

​ 2.只能做离线计算 不嫩做实时计算

​ 3.性能不高

​ 需求: sql => mr

​ mr1=> mr2=>mr3

​ map => reduce

​ map处理完数据 =》 disk 数据落盘 =》 reduce mr

​ kv进行操作 =》 k进行排序

2.什么是spark?

​ 1.官网:spark.apache.org

​ 2.计算引擎 【不关注数据存储】

​ 3.特点:

	1.Batch/streaming data =》 批流一体
	2.SQL analytics
	3.Data science at scale
	4.Machine learning
  • 1
  • 2
  • 3
  • 4

​ 4.速度块:

​ 1.spark是基于内存计算(内存不够就使用磁盘)

​ 2.DAG

​ mr1=>mr2=>mr3

​ 算子 链式编程

​ 3.pipline 【通信的】

​ 4.编程模型 线程级别

​ 5.易用性:

	1.开发语言: java、scala、python、sql
	2.外部数据源
	3.80多个高级算子 =》 scala算子

	mr => 去读 mysql 数据
		1.DBinputformat
	spark =》
		封装好了 多种外部数据源
			jdbc json csv

	mr:
		map
		reduce
	spark :
		80
5.通用性:
	子模块
	sparkcore   =》 离线计算
	sparksql    =》 离线计算
	sparkstreaming、structstreaming  =》 实时计算
	mllib  =》 机器学习
	图计算 =》 图处理

	spark的子模块 之间可以进行交互式使用
6.运行作业的地方
	1.yarn  ***
	2.mesos
	3.k8s  ***
	4.standalone[spark本身集群 ]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

3.hadoop生态圈 vs spark 生态圈

1.Batch:mr、hive vs sparkcore、sparksql
2.SQL:hive、impala vs sparksql
3.stream:storm vs sparkstreaming、sss
4.MLLib: Mahout vs MLlib
5.real 存储: HBase、cassandra vs DataSouce Api

思考: spark 能不能替换hadoop?
1.替换不了
spark =》 可以mr

spark版本:
1.spark 1.x
2.x 主流
3.x 主流
编程模型:
sparkcore =》 RDD
sparksql =》 dataframe & dataset
sparkstreaming =》 DS

1.sparkcore
RDD
1.rdd 开发 降低开发人员的开发成本 vs mr

2.什么是rdd ?

low level => mr
high level => spark 高级算子

1.resilient distributed dataset (RDD)
Represents an immutable,

  • partitioned collection of elements that can be operated on in parallel

1.“弹性 分布式 数据集”
2.数据集:
partitions:
元素 =》 一条一条的数据

  1. 可以以并行的方式进行计算

1.弹性?
容错 =》 计算的时候 可以重试

2.分布式?
1.存储
rdd: 1 2 3 4 5 6
partition1:1 2 3
partition2:4 5
partition3:6

	bigdata22:partition1
	bigdata23:partition2
	bigdata24:partition3
2.计算
	对rdd进行操作实际上是操作 rdd分区里面的数据

	rdd: 1 2 3 4 5 6         + 1
		partition1:1 2 3   + 1
		partition2:4 5		+ 1
		partition3:6        + 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3.数据集:
就是构建rdd本身的数据

4.immutable 不可变的
scala中: val var
rdda => rddb
不可变=》 rdda 通过计算/操作得到一个新的rdd

5.partitioned collection of elements 【rdd 可以被分区存储/计算的】

rdd: 1 2 3 4 5 6
		partition1:1 2 3
		partition2:4 5
		partition3:6
bigdata22:partition1
bigdata23:partition2
bigdata24:partition3

一个rdd是由多个partition所构成
rdd数据存储是分布式的 ,是跨节点进行存储的
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

6.elements that can be operated on in parallel =》 计算

	rdd: 1 2 3 4 5 6         + 1
		partition1:1 2 3   + 1
		partition2:4 5		+ 1
		partition3:6        + 1

	对rdd进行操作 就是对rdd底层partition里面的元素进行操作
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

abstract class RDD[T: ClassTag](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[
]]
) extends Serializable with Logging {

1.abstract
2.T 泛型 =》 限定rdd里面数据 是什么类型的
RDD[String]、RDD[Int] 、RDD[Student]
3.Serializable 序列化 =》数据经过网络传输
4.@transient 注解 这个属性就不需要序列化 【了解】

RDD五大特性:

1.A list of partitions
2.A function for computing each split
3.A list of dependencies on other RDDs
4.Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
5.Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)

1.rdd底层存储一系列partition
2.针对rdd做计算/操作其实就是对rdd底层的partition进行计算/操作
3.rdd之间的依赖关系 (血缘关系)
rdda => rddb
rdd 不可变
rdda => rddb => rddc
4.Partitioner => kv类型的rdd

rdd: 1 2 3 4 5 6
partition1:1 2 3
partition2:4 5
partition3:6
默认分区:hash

5.数据本地性 =》 减少数据传输的网络io 优点
rdd进行操作

​ 好处:

​ 有限把作业调度在数据所在节点 => 【理想状况】

	【常见计算】
		作业调度在别的节点上 ,数据在另外一台节点上吗,
		只能把数据通过网络把数据传输到 作业所在节点上去进行计算;
  • 1
  • 2
  • 3

1.def getPartitions 一系列partition

eg:
scala中: List(1,2,3,4).map(_*2) scala list 单机的

RDD中:
	数据集
	rdd.map(_*2)   对每个分区里面的元素做计算 是分布式
  • 1
  • 2
  • 3
  1. def compute

3.def getDependencies rdd之间的依赖关系
rdda => rddb => rddc

  1. val partitioner
    5.def getPreferredLocations(split: Partition):

RDD操作:
1.构建sparkcore 作业 【idea】
添加依赖:


org.apache.spark
spark-core_2.12
3.2.1

mapreduce:
程序入口:job

Initializing Spark:
1. SparkContext =》 sparkcore 程序入口
tells Spark how to access a cluster.
2.SparkConf =》 指定 spark app 详细信息
1.AppName => 作业名字
2.Master =》 作业运行在什么地方 spark作业的运行模式
1.local、yarn、standalone、k8s、mesos
工作中:
yarn
k8s
local
code编写测试阶段 :
local

spark作业里面只能有一个 sparkcontext

如何指定Master spark作业运行模式?
1.local模式
k 指的是线程数
2.standalone
spark://HOST:PORT
3.yarn
yarn 两种模式:
client模式
cluster模式
4.k8s
k8s://HOST:PORT

rdd 进行编程:
1.创建rdd
1. parallelize existing collection =》 已经存在的集合
2.referencing a dataset in an external storage system
hdfs、hbase、其他数据存储系统

	1.外部数据源存储
		hdfs、local、hbase、s3、cos、
	2.数据文件类型:
		text files, SequenceFiles, and any other Hadoop InputFormat.
  • 1
  • 2
  • 3
  • 4

spark部署: 分布式计算框架
spark app =》 yarn

spark 不是部署分布式的 client而已 =》 hive
spark 支持分布式部署 =》 standalone

1.下载安装包
2.解压
[hadoop@bigdata32 software]$ tar -zxvf ./spark-3.3.1-bin-hadoop3.tgz -C ~/app/
3.软连接
[hadoop@bigdata32 app]$ ln -s ./spark-3.3.1-bin-hadoop3/ spark
4.环境变量
[hadoop@bigdata32 spark]$ vim ~/.bashrc
[hadoop@bigdata32 spark]$ source ~/.bashrc

spark-core:
脚本:spark-shell

启动spark-shell : 测试code
1.web ui =》 每个spark作业 http://bigdata32:4040
2.参数:
–master =》 spark shell 以什么方式去运行

思考: 为什么spark-shell webui名字 是Spark shell?
可不可以改?

作业name在Spark shell 脚本里面 ,可以更改,通过 --name

思考:
spark-core 如何触发作业的执行呢?

​ action 算子

思考:
task数为什么是2?

并行度: 并行度高 处理的效率高 【前提 资源够】
针对spark处理数据
rdd :
partition
有多少个partition就有多少个task去处理我们的数据

1T => 1 partion => 1 task 1 cpu =>
10 partition => 10 task =>

跟我们运行模式有关系:
local[2] => rdd 分区默认就是2

创建rdd:
1.存在的集合
2.外部数据源
local、hdfs

spark-core:
1.texFile() =》 hdfs
2.mem => hdfs

思考: spark =》 hdfs 生成的文件数量 跟什么有关系?
mr => hdfs reduce task

spark => hdfs => tasks数 =》 parition数
rdd 里面parition有多少个 =》 文件落地有多少个文件

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/593310
推荐阅读
相关标签
  

闽ICP备14008679号