当前位置:   article > 正文

Spark 任务常见错误以及解决方案_valueerror: rdd is empty

valueerror: rdd is empty

目前一直在做大数据平台的开发,所以有机会看到全公司的小伙伴们提交spark任务的时候,常见的错误。所以这里列举下遇到的错误以及解决方案,做个记录,以后定期更新:

  1. Table or view not found: aaa.bbb

这个错误很简单,就是表或者视图没找到。引起这个原因的可能如下:

  1. 如果你是连接hive中的数据表,程序有没有正确加载hive-site.xml。在你的代码resource里面加上hive-site.xml或者在spark-submit提交的时候–files指定hive-site.xml目录(如果是采用–files加载hive配置,spark on yarn hive-site.xml最好放在hdfs上,如果放在本地目录,则需要在所有的nodemanager上都创建hive-site.xml目录)或者在安装的spark目录下的conf中将hive-site.xml拷贝过去,以上3种方案完成一种即可。
  2. 创建sparkSession的时候,没有加enableHiveSupport()这个选项。SparkSession.builder().appName(“HiveMySQLApp”).master(“local[2]”).enableHiveSupport().getOrCreate()
  3. 可能是你hive当中或者spark sql当中真的没存在那个表格。这里可能要细心下,仔细检查下库名表名,查看hive当中是否存在该表。或者spark sql中是否存在该视图
  4. 是否正确使用了spark的临时视图和全局视图,如果你跨session使用了临时视图而不是全局视图,也会出现这个问题。
  1. 第一个:The format of the existing table aaa.bbb is HiveFileFormat. It doesn’t match the specified format `ParquetFileFormat
    第二个:The column number of the existing table aaa.bbb(struct<>) doesn’t match the data schema(struct<user_id:string,dt:string,list:array>);

这两个错误可以归为一类错误,所以就放在一起说了。首先,这个问题常见于spark写入hive当中,其中hive的存储格式为parquet,通过spark saveAsTable 无法写入hive parquet表。
下面通过一个简单的例子来阐述一下这个错误的原因。
1. 创建一个实验表
CREATE TABLE parquet_fzy (
name string,
sex string,
age int
)
STORED AS PARQUET;
2. 在这里插入图片描述
3. val spark = SparkSession.builder().appName(“insertParquet”).enableHiveSupport().getOrCreate();
import spark.implicits._
val df = Seq(
(“fzy”, “nan”, 25)
).toDF(“name”, “sex”, “age”).write.mode(“append”).saveAsTable(“test_db.parquet_fzy”)
然后我们插入数据,看一下会报什么错
The format of the existing table test_db.parquet_fzy is HiveFileFormat. It doesn't match the specified format ParquetFileFormat.;
解决方法:ALTER TABLE parquet_fzy SET TBLPROPERTIES (‘spark.sql.sources.provider’=‘parquet’);
然后继续执行,报错如下
The column number of the existing table test_db.parquet_fzy(struct<>) doesn't match the data schema(struct<name:string,sex:string,age:int>);
到这为止,我已经是怀疑我这个表建的有问题了,所以我就用spark sql从新建了一个parquet_fzy_4表,建表语句一模一样,然后让我们看看结构是什么
在这里插入图片描述
少了一个这个属性spark.sql.sources.schema.part.0(这里不建议手动去插,因为这个不好构造,所以建议如果你想spark sql 插入hive parquet表,直接用spark sql写sql的方式插入表)
但是我们仍然发现,还是少了ALTER TABLE parquet_fzy_6 SET TBLPROPERTIES (‘spark.sql.sources.provider’=‘parquet’);这个属性。然后手动加上就行了。
正确的表结构如下:
在这里插入图片描述
这里总结一下:
1. Exception in thread “main” org.apache.spark.sql.AnalysisException: The format of the existing table db_src.parquet_test is HiveFileFormat. It doesn’t match the specified format ParquetFileFormat.;
//解决办法
ALTER TABLE parquet_fzy_6 SET TBLPROPERTIES (‘spark.sql.sources.provider’=‘parquet’);
//报错信息
2.Exception in thread “main” org.apache.spark.sql.AnalysisException: The column number of the existing table db_src.parquet_test(struct<>) doesn’t match the data schema(structname:string,sex:string,age:int);
//解决办法
用spark sql(写sql的方式)的方式插入hive parquet表!(不推荐你们用手动去更新,你构建schema的json很难)

------------------------------------------------分割线----------------------------------
同理!如果你用hive去更新hive parquet表结构的话,Exception in thread “main” org.apache.spark.sql.AnalysisException: The column number of the existing table db_src.parquet_test(struct<>) doesn’t match the data schema(structname:string,sex:string,age:int);
//解决办法
用spark sql 去更新表结构
当然,究极解决办法当然是用其他格式存储,orc等啥的格式存储数据

  1. Cannot insert into table aaa.bbbb because the number of columns are different: need 30 columns, but query has 31 columns.;
  1. 你select出来的数据和要插入的数据的columns对不上就会报此错误
  2. 如果是静态分区,你需要insert overwrite table a
    partition (dt = ‘2019-12-12’, group_id= 222) select e.id, e.name from user e

    !!!具体列举出你需要select的字段!!!
    **注意:**而不是insert overwrite table a partition (dt = ‘2019-12-12’, group_id= 222) select * from user e。这里要特别注意!
  3. 如果是动态分区,你需要insert overwrite table a
    partition (dt , group_id) select e.id, e.name,e.dt as dt,e.group_id as group_id from user e
    像这样把字段列举出来。
    这里说一下,你如果设置动态分区,你需要常用的3个设置
    1、set hive.exec.dynamic.partition=true; 是开启动态分区
    2、set hive.exec.dynamic.partition.mode=nonstrict; 这个属性默认值是strict,就是要求分区字段必须有一个是静态的分区值(如上面2号标题所示),当前设置为nonstrict,那么可以全部动态分区。
    3、SET hive.exec.max.dynamic.partitions=100000; 如果你本次插入动态分区的数量超过这个数,就会报错(Fatal error occurred when node tried to create too many dynamic partitions.),这个可以根据实际情况更改。
  1. Permission denied: user=aaa_community, access=READ_EXECUTE, inode="/warehouse/aaa/ods/user_data/":hive:hive:drwxrwx–x

这个非常简单,一般就是某个目录下的写权限是某个用户没有的,只需要找集群管理者或者管理员给你适配下权限即可

  1. Application attempt appattempt_1596454596856_199421_000001 doesn’t exist in ApplicationMasterService cache.。 Could not find CoarseGrainedScheduler。

因为发现yarn日志中有-XX:OnOutOfMemoryError=‘kill %p’ \ org.apache.spark.executor.CoarseGrainedExecutorBackend \ 这么一句话,所以怀疑可能是内存溢出,把executor运行的容器给kill掉了/yarn自动给你kill掉了。所以解决方案就是加大你的程序申请的资源,优先加大内存。

  1. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running ‘REFRESH TABLE tableName’ command in SQL or by recreating the Dataset/DataFrame involved.

当你更新了一个表的某些列,并且马上查询的话,就会报这种错误。如果想解决这个问题,请刷新与该表关联的所有缓存项。调用命令如下:
REFRESH TABLE [db_name.]table_name

  1. Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value

spark.sql.autoBroadcastJoinThreshold 单位字节,如果小于这个数字,则spark采用广播变量的方式进行join,默认是10M(10485760)。如果出现此问题,可能就是driver或者executor的内存溢出了。
解决办法有三种:1. spark.sql.autoBroadcastJoinThreshold 设置为-1关闭掉大表join小表的功能。 2. 将driver和executor申请的内存够调大 3. 将spark.sql.autoBroadcastJoinThreshold 数值调小。

  1. Missing an output location for shuffle 3
  1. 调整上述参数(spark.sql.autoBroadcastJoinThreshold ),使join变为broadcast join 或者在 join 之前,filter掉无用的数据
  2. 增大shuffle并行度。 如果是使用RDD计算:spark.default.parallelism一般设置为container的cores的2-3倍。如果是spark sql 通过spark.sql.shuffle.partitions控制分区数,默认为200。可以设置为400,500等试试。
  3. 增加shuffle失败的重试次数和重试的时间间隔,提高容错性,稳定性。
    spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如60。
    spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如60s。
  4. 终极懒人杀招:加大executor内存(–executor-memory)以及堆外内存(–conf spark.executor.memoryoverhead)
  5. 可能有数据倾斜,去spark-ui看一看各个的shuffle read 和shuffle write的大小和时间,确定下有没有数据倾斜的存在
  1. User did not initialize spark context!

将代码中设置master的方法去掉 .master(sparkMaster) 不要出现这句。

  1. Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead

spark写parquet数据的时候,会报如上错误
原因:当你读取的数据中array或者map为空集合,或者map中存在key为空的情况下会出现这种情况。
解决方案:

  1. 不用spark sql 执行,采用hive sql执行该sql
  2. spark读取hive数据的时候,对空集合或者map key为null的数据统一修改为null(spark dataframe程序好实现,直接写个filter就行)
  3. 将hive中的空集合数据或者map key 为null的统一刷为null
  4. spark sql ,写一个udf函数来协助过滤
    spark.udf.register(“filter_map”, ((map : Map[String, String]) => {if (map != null && !map.isEmpty) map.filter(_._1 != null) else null}))
  1. cannot resolve ‘dt’ given input columns:
  1. sql select了不存在的字段就会报这个错误。如果是dataframe操作数据,在每次get每个column的时候,判断下字段字段是否存在,就不会报错了。
  2. 如果是spark读取的csv文件,将csv文件的后缀修改为.txt。csv编码好像有点问题。
  3. 将pom中的spark版本升至2.2.0以上的版本
  1. Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict

设置这个参数就行set hive.exec.dynamic.partition.mode=nonstrict; 默认是严格模式,严格模式的意思就是至少要有一个静态分区的存在,如果你的插入数据全部都是动态分区,就要加上上述的配置。

  1. Unknown save mode: overview. Accepted save modes are ‘overwrite’, ‘append’, ‘ignore’, ‘error’, ‘errorifexists’.

dataframe写入的一种模式,dataframe写入的模式一共有4种,如果不在这4种当中,就会报此异常

def mode(saveMode: String): DataFrameWriter = {
	this.mode = saveMode.toLowerCase match {
	      case "overwrite" => SaveMode.Overwrite              // 覆盖已经存在的文件
	      case "append" => SaveMode.Append                    // 向存在的文件追加
	      case "ignore" => SaveMode.Ignore                    // 如果文件已存在,则忽略保存操作
	      case "error" | "default" => SaveMode.ErrorIfExists  // 如果文件存在,则报错
	      case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
	        "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. Futures timed out after [100000 milliseconds]
  1. 将代码中setMaster去掉,是什么模式通过spark-submit提交。
  2. –conf spark.yarn.am.waitTime=1000s 提高超时时间
  1. ValueError: RDD is empty

当时同take(1) 或者 first()方法的时候,如果rdd的数据内容为空,会报此种异常。

    def first(self):
        rs = self.take(1)
        if rs:
            return rs[0]
        raise ValueError("RDD is empty")
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/474565
推荐阅读
相关标签
  

闽ICP备14008679号