赞
踩
目前一直在做大数据平台的开发,所以有机会看到全公司的小伙伴们提交spark任务的时候,常见的错误。所以这里列举下遇到的错误以及解决方案,做个记录,以后定期更新:
这个错误很简单,就是表或者视图没找到。引起这个原因的可能如下:
- 如果你是连接hive中的数据表,程序有没有正确加载hive-site.xml。在你的代码resource里面加上hive-site.xml或者在spark-submit提交的时候–files指定hive-site.xml目录(如果是采用–files加载hive配置,spark on yarn hive-site.xml最好放在hdfs上,如果放在本地目录,则需要在所有的nodemanager上都创建hive-site.xml目录)或者在安装的spark目录下的conf中将hive-site.xml拷贝过去,以上3种方案完成一种即可。
- 创建sparkSession的时候,没有加enableHiveSupport()这个选项。SparkSession.builder().appName(“HiveMySQLApp”).master(“local[2]”).enableHiveSupport().getOrCreate()
- 可能是你hive当中或者spark sql当中真的没存在那个表格。这里可能要细心下,仔细检查下库名表名,查看hive当中是否存在该表。或者spark sql中是否存在该视图
- 是否正确使用了spark的临时视图和全局视图,如果你跨session使用了临时视图而不是全局视图,也会出现这个问题。
HiveFileFormat
. It doesn’t match the specified format `ParquetFileFormat这两个错误可以归为一类错误,所以就放在一起说了。首先,这个问题常见于spark写入hive当中,其中hive的存储格式为parquet,通过spark saveAsTable 无法写入hive parquet表。
下面通过一个简单的例子来阐述一下这个错误的原因。
1. 创建一个实验表
CREATE TABLE parquet_fzy (
name string,
sex string,
age int
)
STORED AS PARQUET;
2.
3. val spark = SparkSession.builder().appName(“insertParquet”).enableHiveSupport().getOrCreate();
import spark.implicits._
val df = Seq(
(“fzy”, “nan”, 25)
).toDF(“name”, “sex”, “age”).write.mode(“append”).saveAsTable(“test_db.parquet_fzy”)
然后我们插入数据,看一下会报什么错
The format of the existing table test_db.parquet_fzy is HiveFileFormat. It doesn't match the specified format ParquetFileFormat.;
解决方法:ALTER TABLE parquet_fzy SET TBLPROPERTIES (‘spark.sql.sources.provider’=‘parquet’);
然后继续执行,报错如下
The column number of the existing table test_db.parquet_fzy(struct<>) doesn't match the data schema(struct<name:string,sex:string,age:int>);
到这为止,我已经是怀疑我这个表建的有问题了,所以我就用spark sql从新建了一个parquet_fzy_4表,建表语句一模一样,然后让我们看看结构是什么
少了一个这个属性spark.sql.sources.schema.part.0(这里不建议手动去插,因为这个不好构造,所以建议如果你想spark sql 插入hive parquet表,直接用spark sql写sql的方式插入表)
但是我们仍然发现,还是少了ALTER TABLE parquet_fzy_6 SET TBLPROPERTIES (‘spark.sql.sources.provider’=‘parquet’);这个属性。然后手动加上就行了。
正确的表结构如下:
这里总结一下:
1. Exception in thread “main” org.apache.spark.sql.AnalysisException: The format of the existing table db_src.parquet_test isHiveFileFormat
. It doesn’t match the specified formatParquetFileFormat
.;
//解决办法
ALTER TABLE parquet_fzy_6 SET TBLPROPERTIES (‘spark.sql.sources.provider’=‘parquet’);
//报错信息
2.Exception in thread “main” org.apache.spark.sql.AnalysisException: The column number of the existing table db_src.parquet_test(struct<>) doesn’t match the data schema(structname:string,sex:string,age:int);
//解决办法
用spark sql(写sql的方式)的方式插入hive parquet表!(不推荐你们用手动去更新,你构建schema的json很难)
------------------------------------------------分割线----------------------------------
同理!如果你用hive去更新hive parquet表结构的话,Exception in thread “main” org.apache.spark.sql.AnalysisException: The column number of the existing table db_src.parquet_test(struct<>) doesn’t match the data schema(structname:string,sex:string,age:int);
//解决办法
用spark sql 去更新表结构
当然,究极解决办法当然是用其他格式存储,orc等啥的格式存储数据
aaa
.bbbb
because the number of columns are different: need 30 columns, but query has 31 columns.;
- 你select出来的数据和要插入的数据的columns对不上就会报此错误
- 如果是静态分区,你需要insert overwrite table a
partition (dt = ‘2019-12-12’, group_id= 222) select e.id, e.name from user e
!!!具体列举出你需要select的字段!!!
**注意:**而不是insert overwrite table a partition (dt = ‘2019-12-12’, group_id= 222) select * from user e。这里要特别注意!- 如果是动态分区,你需要insert overwrite table a
partition (dt , group_id) select e.id, e.name,e.dt as dt,e.group_id as group_id from user e像这样把字段列举出来。
这里说一下,你如果设置动态分区,你需要常用的3个设置
1、set hive.exec.dynamic.partition=true; 是开启动态分区
2、set hive.exec.dynamic.partition.mode=nonstrict; 这个属性默认值是strict,就是要求分区字段必须有一个是静态的分区值(如上面2号标题所示),当前设置为nonstrict,那么可以全部动态分区。
3、SET hive.exec.max.dynamic.partitions=100000; 如果你本次插入动态分区的数量超过这个数,就会报错(Fatal error occurred when node tried to create too many dynamic partitions.),这个可以根据实际情况更改。
这个非常简单,一般就是某个目录下的写权限是某个用户没有的,只需要找集群管理者或者管理员给你适配下权限即可
因为发现yarn日志中有-XX:OnOutOfMemoryError=‘kill %p’ \ org.apache.spark.executor.CoarseGrainedExecutorBackend \ 这么一句话,所以怀疑可能是内存溢出,把executor运行的容器给kill掉了/yarn自动给你kill掉了。所以解决方案就是加大你的程序申请的资源,优先加大内存。
当你更新了一个表的某些列,并且马上查询的话,就会报这种错误。如果想解决这个问题,请刷新与该表关联的所有缓存项。调用命令如下:
REFRESH TABLE [db_name.]table_name
spark.sql.autoBroadcastJoinThreshold 单位字节,如果小于这个数字,则spark采用广播变量的方式进行join,默认是10M(10485760)。如果出现此问题,可能就是driver或者executor的内存溢出了。
解决办法有三种:1. spark.sql.autoBroadcastJoinThreshold 设置为-1关闭掉大表join小表的功能。 2. 将driver和executor申请的内存够调大 3. 将spark.sql.autoBroadcastJoinThreshold 数值调小。
- 调整上述参数(spark.sql.autoBroadcastJoinThreshold ),使join变为broadcast join 或者在 join 之前,filter掉无用的数据
- 增大shuffle并行度。 如果是使用RDD计算:spark.default.parallelism一般设置为container的cores的2-3倍。如果是spark sql 通过spark.sql.shuffle.partitions控制分区数,默认为200。可以设置为400,500等试试。
- 增加shuffle失败的重试次数和重试的时间间隔,提高容错性,稳定性。
spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如60。
spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如60s。- 终极懒人杀招:加大executor内存(–executor-memory)以及堆外内存(–conf spark.executor.memoryoverhead)
- 可能有数据倾斜,去spark-ui看一看各个的shuffle read 和shuffle write的大小和时间,确定下有没有数据倾斜的存在
将代码中设置master的方法去掉 .master(sparkMaster) 不要出现这句。
spark写parquet数据的时候,会报如上错误
原因:当你读取的数据中array或者map为空集合,或者map中存在key为空的情况下会出现这种情况。
解决方案:
- 不用spark sql 执行,采用hive sql执行该sql
- spark读取hive数据的时候,对空集合或者map key为null的数据统一修改为null(spark dataframe程序好实现,直接写个filter就行)
- 将hive中的空集合数据或者map key 为null的统一刷为null
- spark sql ,写一个udf函数来协助过滤
spark.udf.register(“filter_map”, ((map : Map[String, String]) => {if (map != null && !map.isEmpty) map.filter(_._1 != null) else null}))
dt
’ given input columns:
- sql select了不存在的字段就会报这个错误。如果是dataframe操作数据,在每次get每个column的时候,判断下字段字段是否存在,就不会报错了。
- 如果是spark读取的csv文件,将csv文件的后缀修改为.txt。csv编码好像有点问题。
- 将pom中的spark版本升至2.2.0以上的版本
设置这个参数就行set hive.exec.dynamic.partition.mode=nonstrict; 默认是严格模式,严格模式的意思就是至少要有一个静态分区的存在,如果你的插入数据全部都是动态分区,就要加上上述的配置。
dataframe写入的一种模式,dataframe写入的模式一共有4种,如果不在这4种当中,就会报此异常
def mode(saveMode: String): DataFrameWriter = {
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite // 覆盖已经存在的文件
case "append" => SaveMode.Append // 向存在的文件追加
case "ignore" => SaveMode.Ignore // 如果文件已存在,则忽略保存操作
case "error" | "default" => SaveMode.ErrorIfExists // 如果文件存在,则报错
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
}
}
- 将代码中setMaster去掉,是什么模式通过spark-submit提交。
- –conf spark.yarn.am.waitTime=1000s 提高超时时间
当时同take(1) 或者 first()方法的时候,如果rdd的数据内容为空,会报此种异常。
def first(self):
rs = self.take(1)
if rs:
return rs[0]
raise ValueError("RDD is empty")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。