赞
踩
使用coalesce()算子,缩小分区
插入hive表 write.mode 选择插入模式,inserinto 兼容hive 根据字段顺序去匹配hive表。saveastable 不兼容hive 根据字段名字进行匹配
core-site.xml, hdfs-site.xml ,hive-site.xml,yarn-site.xml放到resource源码包下
HA支持:
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val ssc = sparkSession.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS", "hdfs://mycluster")
ssc.hadoopConfiguration.set("dfs.nameservices", "mycluster")
将hive-site.xml配置文件移动到spark conf下即可
join 分为三种
(1)hash join 老版本spark用的,条件苛刻,现在spark已经弃用了
(2)broadcast hast join 适用于小表join大表 只能广播小表
(3)sortmerge join spark默认走的一个join 大表join大表
广播join api 方式
导包 import org.apache.spark.sql.functions._
broadcast(表).join(表)
spark sql 方式
通过spark.sql.autoBroadcastJoinThreshold 控制默认10mb,即小于等于10mb的表都属于小表
spark 内存分为两块
(1)storage内存:用于存储 cache ,缓存数据用的
(2)shuffle(execution)内存:用于计算 join groupby reduce
在1.6之前是静态管理的,1.6动态管理的 只要把executor内存分配足就可以了
rdd dataset/dataframe
例子:rdd java序列化cache 默认缓存 1.7G
rdd kryo序列化并且使用 ser 序列化缓存 缓存大小降到 269.6 MB
dataset 默认cache 34.2mb
dataset ser序列化缓存 33.8 MB
spark ui界面storage页签可以监控存储内存大小
分区个数得是cpu个数2倍到3倍,目的为了充分利用cpu
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2 --executor-memory 2g --class com.atguigu.other.streaming.RawLogSparkStreaming --queue spark com_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
–num-executors 2 * --executor-cores 2 的个数是总cpu个数,那么spark 分区个数得是这个个数2倍或3倍那么是最优的
spark.reducer.maxSizeInFilght 控制reduce去map端拉取的数据量
spark.shuffle.file.buffer 控制shuffle文件大小
优化效果:5%
产生数据倾斜解决方案
(1)打散大表,扩容小表 实验结果:虽然解决了数据倾斜,但是更加耗时了
(2)广播join 本地join就能解决数据倾斜问题,也是spark 优化经常会用的
大表join大表优化
smb join 针对桶的join 优化的是排序时间 数据量一定要大不然会产生大量小文件
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
3.spark streaming初始化的时候,去mysql查询下偏移量,如果说mysql里有偏移量则根据偏移量,如果mysql里没有偏移量那么证明是第一次启动,从earliest
spark.streaming.backpressure.enabled 背压 解决spark streaming解压问题,动态去拉取数据,上限值由spark.streaming.kafka.maxRatePerPartition 这个参数决定
spark.streaming.stopGracefullyOnShutdown 优雅关闭 开启之后接收到kill命令不会立马kill,而是等当前批次数据处理完毕没有问题再kill的
spark streaming操作数据库:
resultDStream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
//创建数据库的连接
partition.foreach( //使用连接)
//退到分区处 关闭连接
}
}
1.加分布式锁 zk redis 都可以 但是加了肯定会影响效率
2.reducebykey或者groupby groupbykey 将相同 key先聚合到同一个分区 那么相同key发出的查询请求只会有一次
spark submit提交时 --num-executors 5 --executor-core 2 两参数相乘 等于分区数 cpu比上分区数是1:1 那么spark streaming运行速度最快,spark streaming当中不会去刻意增大分区或减小分区的,因为增大会进行shuffle,减小分区并行度就减少了 所以不会刻意用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。