当前位置:   article > 正文

spark错题本_cannot assign instance of scala.collection.immutab

cannot assign instance of scala.collection.immutable.list$serializationproxy

一,Task not serializable

在这里插入图片描述

原因:用了mysql的jdbc,其connect需要在各个服务器上单据创建,不能集群共享一个数据连接。

下图中的driver和excutor不在同一台服务器,connection不能共享。
在这里插入图片描述

第二种:使用了实现某个接口匿名内部类,这个内部类没有任何序列化的标识,所以报错,这种情况下不能使用匿名内部类,创建一个类,实现需要的接口,同时实现Serializable接口:

static class MyMapFunction implements scala.Function1<Row, Teacher>,Serializable {


        @Override
        public Teacher apply(Row row) {
            String info = row.getString(0);
            String[] split = info.split(",");
            return new Teacher(split[0],split[1],split[2]);
        }

        @Override
        public <A> Function1<A, Teacher> compose(Function1<A, Row> g) {
            return Function1.super.compose(g);
        }

        @Override
        public <A> Function1<Row, A> andThen(Function1<Teacher, A> g) {
            return Function1.super.andThen(g);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

二,value toDF is not a member of org.apache.spark.rdd.RDD[Person]

val dfFromRddPerson = rddPerson.toDF
  • 1

原因是样例类不能在使用样例类的方法内部使用

三,idea提交作业到spark集群

报错:

 Asked to remove non-existent executor 203
  • 1

类似出现了死循环,一直打如上日志。

百度有人说时没有指定driver的ip,driver运行在本地,在SparkConf设置driver的ip

val conf= new SparkConf().setAppName(this.getClass.getName)
      .setMaster("spark://node1:7077")
      .set("spark.driver.host","192.168.239.1")
  • 1
  • 2
  • 3

配置完成后就可以了。

--------- 参考文献

四,idea提交spark作业到yarn

报错:

 Could not parse Master URL: 'yarn'
  • 1

查源码发现,有这么一段代码:

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoaders =
      ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
    if (serviceLoaders.size > 1) {
      throw new SparkException(
        s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
    }
    serviceLoaders.headOption
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

往上跟,发现当spark作业需要外部集群支持时,需要ExternalClusterManager,对于yarn集群来说,就是YarnClusterManager,这个类在spark-yarn包中。

 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

添加了依赖之后,还是报错,在代码里加了这样一句代码:

 val value = this.getClass.getClassLoader.loadClass("org.apache.spark.scheduler.cluster.YarnClusterManager")
  • 1

发现这句代码报错:找不到对应的类。

明明已经有了对应的包怎么还不行,想起之前的问题,要配置idea。
在这里插入图片描述

五,idea提交spark到yarn执行失败

/usr/local/hadoop$ hadoop pipes -Dhadoop.pipes.java.recordreader=true -Dhadoop.pipes.java.recordwriter=true -input someFile.txt -output /out -program bin/wordcount
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.

13/12/14 20:12:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
13/12/14 20:12:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
13/12/14 20:12:07 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:08 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:09 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:10 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:11 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:12 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:13 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:14 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

原因:第一次打包时没有在resource目录下添加yarn-site.xml文件,后面添加了这个文件,但是没有重新打包,所以服务器上找不到对应的配置文件。

六,idea提交作业到yarn:.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
	at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:45)
	at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:163)
	at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
	at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:161)
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:501)
	at spark2.SparkWithYarn$.main(SparkWithYarn.scala:34)
	at spark2.SparkWithYarn.main(SparkWithYarn.scala)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

第一种解决办法:在yarn-site.xml中关闭timeline服务

<property>
  <name>yarn.timeline-service.enabled</name>
  <value>true</value>
</property>
  • 1
  • 2
  • 3
  • 4

第二种解决办法:在pom文件中指定jersey-core、jersey-client的版本。

<dependency>
            <groupId>com.sun.jersey</groupId>
            <artifactId>jersey-core</artifactId>
            <version>1.9</version>
        </dependency>
        <dependency>
            <groupId>com.sun.jersey</groupId>
            <artifactId>jersey-client</artifactId>
            <version>1.9</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

七,提交到yarn之后报错

参考文章

8/05/22 17:13:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
  • 1

八,本地运行spark demo报错

报错信息如下

Cannot assign requested address: bind: Service 'sparkDriver' failed
  • 1

解决方案配置环境变量:
在这里插入图片描述

SPARK_LOCAL_IP=127.0.0.1
  • 1

原因不明。

九,jdbc连接spark thrift服务

Exception in thread "main" java.sql.SQLException: No suitable driver found for  jdbc:hive2://localhost:10000/mytest
	at java.sql.DriverManager.getConnection(DriverManager.java:689)
	at java.sql.DriverManager.getConnection(DriverManager.java:247)
	at spark2.sql.JdbcSparkPort10000$.main(JdbcSparkPort10000.scala:7)
	at spark2.sql.JdbcSparkPort10000.main(JdbcSparkPort10000.scala)
  • 1
  • 2
  • 3
  • 4
  • 5

1,首先,“ jdbc:hive2://node3:10000/mytest”jdbc前面加了空格。

2,其次,没有添加依赖。

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

十,java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row

两种方法:方法一

方法二

十一,object apache is not a member of package org

idea启动spark工程,报错

在这里插入图片描述

原因是工程有4个模块,其中一个模块要依赖另一个模块,没有依赖成功。

十二,Scala Spark - java.lang.UnsupportedOperationException: empty.init

CAST(value AS STRING) 写错了,写成了 CAST(value) AS STRING

十三,Cannot resolve overloaded method ‘createTable’

调用kuduContext创建表的api报错:

Cannot resolve overloaded method 'createTable'
  • 1

在这里插入图片描述

不知道是为什么。

解决方法,把光标移动到报红的代码,按键:alt+enter:
在这里插入图片描述

按照提示,回车选择“Convert to infix expression”

后来找到原因了:是因为import的包不对

十四,Exception in thread “main” org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for LEFT OUTER join between logical plans

报错的信息告诉我们,左外关联有隐式的笛卡尔积,我在网上找了一下,有一篇文章提到这个错误,原因是其中两个DataFrame有相同的血缘关系。参考文献

我检查了下代码,发现可能有血缘关系的有这两个DataFrame:

 val codesDF: DataFrame = getKuduSource(spark, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY)

val codeCustomerDF: DataFrame = codesDF
      .where($"type" === CodeTypeMapping.CustomerType)
      .select($"code", $"codeDesc")
      
val codeChannelDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
      .select($"code", $"codeDesc")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

codeCustomerDF 和 codeChannelDF都来自codesDF。

但之前我也有过这种写法,没报错,所以把之前的代码找出来,对比了下,发现区别在于给select出来的两列都取了别名:

 val codeCustomerDF: DataFrame = codesDF
      .where($"type" === CodeTypeMapping.CustomerType)
      .select($"code".as("type"), $"codeDesc".as("name"))
    val codeChannelDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
      .select($"code".as("type"), $"codeDesc".as("name"))
  • 1
  • 2
  • 3
  • 4
  • 5

照着上面改了代码,果然可以运行,但不知为什么,百思不得其解。

十五,spark isn’t [Type: int64, Type: unixtime_micros], it’s string

spark写入kudu时先判断表是否创建,如果未创建就先创建表,后插入数据。

第一次执行时把字段类型搞错了,执行过程中报错,但此时已经创建表。

报错之后,把字段类型修正过来,执行还是报错,原因在于第一次已经创建表,且是按照错误的类型创建的。第二次执行时虽然代码改过来了,在代码层面数据和字段类型是一致的,但是数据的实际类型和之前创建的表的字段类型不一致,所以报错。

解决办法,把表删掉,重新自动创建。

十六,Exception in thread “main” java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.(Ljava/io/InputStream;Z)V

Exception in thread "main" java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
	at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
	at org.apache.spark.sql.execution.SparkPlan.org$apache$spark$sql$execution$SparkPlan$$decodeUnsafeRows(SparkPlan.scala:276)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$1.apply(SparkPlan.scala:368)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$1.apply(SparkPlan.scala:368)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:368)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

原因:https://blog.csdn.net/m0_37914799/article/details/84992275

解决办法:添加依赖:

<dependency>
    <groupId>net.jpountz.lz4</groupId>
    <artifactId>lz4</artifactId>
    <version>1.3.0</version>
</dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

十七,自定义数据源报错

自定义数据源报错:

Exception in thread "main" java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.LogicalRelation$.apply(LogicalRelation.scala:77)
	at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:415)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
	at SparkTest$.main(SparkTest.scala:37)
	at SparkTest.main(SparkTest.scala)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

原因是:DefaultSource创建HBaseRelation时传递schema为空,尽管在HBaseRelation没有用到这个变量,也不能穿孔

在这里插入图片描述

十八,自定义数据源报错

愿意是将查询结果转化为Row时,没有将之从bytes转换为string

scala.MatchError: [B@53a3605e (of class [B)
  • 1

在这里插入图片描述

十九,UnknownHostException

原因是主机名中有空格

21/08/02 15:16:03 ERROR client.StaticHostProvider: Unable to connect to server: bigdatacdh01.cn:2181
java.net.UnknownHostException: bigdatacdh01.cn
	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:907)
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1302)
	at java.net.InetAddress.getAllByName0(InetAddress.java:1255)
	at java.net.InetAddress.getAllByName(InetAddress.java:1171)
	at java.net.InetAddress.getAllByName(InetAddress.java:1105)
	at org.apache.zookeeper.client.StaticHostProvider.<init>(StaticHostProvider.java:60)
	at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:445)
	at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:380)
	at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.checkZk(RecoverableZooKeeper.java:141)
	at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.<init>(RecoverableZooKeeper.java:128)
	at org.apache.hadoop.hbase.zookeeper.ZKUtil.connect(ZKUtil.java:137)
	at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:185)
	at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:153)
	at org.apache.hadoop.hbase.client.ZooKeeperKeepAliveConnection.<init>(ZooKeeperKeepAliveConnection.java:43)
	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveZooKeeperWatcher(ConnectionManager.java:1823)
	at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:104)
	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:931)
	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:658)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
	at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
	at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
	at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
	at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:186)
	at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:241)
	at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:240)
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
	at com.lcy.models.tags.BaseModel$class.readBusinessData(BaseModel.scala:89)
	at com.lcy.models.tags.PayTypeModel$.readBusinessData(PayTypeModel.scala:4)
	at com.lcy.models.tags.BaseModel$class.execute(BaseModel.scala:186)
	at com.lcy.models.tags.PayTypeModel$.execute(PayTypeModel.scala:4)
	at com.lcy.models.tags.PayTypeModel$.main(PayTypeModel.scala:14)
	at com.lcy.models.tags.PayTypeModel.main(PayTypeModel.scala)
21/08/02 15:16:03 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bigdatacdh01.cn:2181 sessionTimeout=90000 watcher=hconnection-0x3b2114ce0x0, quorum=bigdatacdh01.cn:2181, baseZNode=/hbase
21/08/02 15:16:03 ERROR client.StaticHostProvider: Unable to connect to server: bigdatacdh01.cn:2181
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

二十,key是Int类型时,map获取不到值

val basicRuleMap: Map[String, String] = basicDF
      .filter($"level" === 5)
      .collect().map(row => {
      
      // todo 不知道为什么如果key是Int,后面使用map.getOrElse时获取不到正确的值
      // row.getAs[Int]("rule")->row.getAs[String]("name")
      
      row.getAs[String]("rule")->row.getAs[String]("name")
    }).toMap



    val centerNameMap = kMeansModel.clusterCenters.zipWithIndex.map(row => {
      (row._1.toArray.sum, row._2)
    }).sortWith((kv1, kv2) => kv1._1 > kv2._1)
      .zipWithIndex.map(
      kv => {
        println("kv_2:"+kv._2)
        // 如果basicRuleMap的key是Int则获取不到值,必须是String
        (kv._1._2, basicRuleMap.getOrElse(kv._2.toString, ""))
      }
    ).toMap
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

二十一,spark sql 使用cast报错

org.apache.spark.sql.AnalysisException: cannot resolve '`total`' given input columns: [CAST(count(label) AS `total` AS BIGINT), label, CAST(sum(female) AS `cnt_female` AS DOUBLE), CAST(sum(male) AS `cnt_male` AS DOUBLE), userId];;
'Project [userId#83, UDF('total, 'cnt_male, 'cnt_female) AS usg#117]
+- Aggregate [userId#83, label#45], [userId#83, label#45, cast(count(label#45) as bigint) AS CAST(count(label) AS `total` AS BIGINT)#108L, cast(sum(cast(male#89 as bigint)) as double) AS CAST(sum(male) AS `cnt_male` AS DOUBLE)#109, cast(sum(cast(female#90 as bigint)) as double) AS CAST(sum(female) AS `cnt_female` AS DOUBLE)#110]
   +- Project [userId#83, label#45, CASE WHEN (label#45 = 0) THEN 1 ELSE 0 END AS male#89, CASE WHEN (label#45 = 1) THEN 1 ELSE 0 END AS female#90]
      +- Project [memberId#31 AS userId#83, color#50, product#55, label#45]
         +- Join Inner, (orderSn#32 = cOrderSn#14)
            :- Project [cOrderSn#14, CASE WHEN (ogColor#15 = 蓝色) THEN 17 WHEN (ogColor#15 = 金色) THEN 8 WHEN (ogColor#15 = 月光银) THEN 11 WHEN (ogColor#15 = 银色) THEN 20 WHEN (ogColor#15 = 黑色) THEN 2 WHEN (ogColor#15 = 梦境极光【布朗灰】) THEN 5 WHEN (ogColor#15 = 香槟金) THEN 14 WHEN (ogColor#15 = 梦境极光【卡其金】) THEN 4 WHEN (ogColor#15 = 香槟色) THEN 13 WHEN (ogColor#15 = 樱花粉) THEN 16 WHEN (ogColor#15 = 金属灰) THEN 7 WHEN (ogColor#15 = 香槟金色) THEN 1 WHEN (ogColor#15 = 布鲁钢) THEN 10 WHEN (ogColor#15 = 玫瑰金) THEN 19 WHEN (ogColor#15 = 金属银) THEN 18 WHEN (ogColor#15 = 乐享金) THEN 9 WHEN (ogColor#15 = 白色) THEN 3 WHEN (ogColor#15 = 时尚光谱【浅金棕】) THEN 12 WHEN (ogColor#15 = 灰色) THEN 15 WHEN (ogColor#15 = 粉色) THEN 6 ELSE 0 END AS color#50, CASE WHEN (productType#16 = 电热水器) THEN 23 WHEN (productType#16 = 前置过滤器) THEN 8 WHEN (productType#16 = 滤芯) THEN 17 WHEN (productType#16 = 电饭煲) THEN 26 WHEN (productType#16 = 嵌入式厨电) THEN 11 WHEN (productType#16 = Haier/海尔冰箱) THEN 2 WHEN (productType#16 = 燃气灶) THEN 20 WHEN (productType#16 = 冰吧) THEN 5 WHEN (productType#16 = 料理机) THEN 14 WHEN (productType#16 = 挂烫机) THEN 13 WHEN (productType#16 = Leader/统帅冰箱) THEN 4 WHEN (productType#16 = 电水壶/热水瓶) THEN 22 WHEN (productType#16 = 净水机) THEN 7 WHEN (productType#16 = 波轮洗衣机) THEN 16 WHEN (productType#16 = 电风扇) THEN 25 WHEN (productType#16 = 吸尘器/除螨仪) THEN 10 WHEN (productType#16 = 4K电视) THEN 1 WHEN (productType#16 = 空气净化器) THEN 28 WHEN (productType#16 = 烤箱) THEN 19 WHEN (productType#16 = 破壁机) THEN 27 WHEN (productType#16 = 取暖电器) THEN 9 WHEN (productType#16 = 烟灶套系) THEN 18 WHEN (productType#16 = 微波炉) THEN 12 WHEN (productType#16 = LED电视) THEN 3 WHEN (productType#16 = 燃气热水器) THEN 21 WHEN (productType#16 = 智能电视) THEN 15 WHEN (productType#16 = 冷柜) THEN 6 WHEN (productType#16 = 电磁炉) THEN 24 ELSE 0 END AS product#55, CASE WHEN (((((((ogcolor#15 = 樱花粉) || (ogcolor#15 = 白色)) || (ogcolor#15 = 香槟色)) || (ogcolor#15 = 香槟金)) || (productType#16 = 料理机)) || (productType#16 = 挂烫机)) || (productType#16 = 吸尘器/除螨仪)) THEN 1 ELSE 0 END AS label#45]
            :  +- LogicalRDD [cOrderSn#14, ogColor#15, productType#16]
            +- Relation[memberId#31,orderSn#32] com.lcy.models.datasource.HBaseRelation@6700374f

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

原因在于cast现在as后面,导致as没生效。
错误写法:

count("label").as("total").cast(LongType)
  • 1
count("label").cast(LongType).as("total")
  • 1

二十二

Caused by: java.io.NotSerializableException: com.asinking.profit.ods.ods_s3_to_hudi_eg2$$Lambda$853/535708280

  • 1
  • 2

解决方案,把匿名内部类改为实现了Seriable的类

static class MyFunction implements Function1<String, Row>, Serializable {
        @Override
        public Row apply(String item) {

            final String[] items = item.split(",");
            return RowFactory.create(items[0], items[1], items[2], items[3]);

        }

        @Override
        public <A> Function1<A, Row> compose(Function1<A, String> g) {
            return null;
        }

        @Override
        public <A> Function1<String, A> andThen(Function1<Row, A> g) {
            return null;
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

二十三,

Exception in thread "main" java.lang.ExceptionInInitializerError
	at org.apache.spark.unsafe.array.ByteArrayMethods.<clinit>(ByteArrayMethods.java:54)
	at org.apache.spark.internal.config.package$.<init>(package.scala:1006)
	at org.apache.spark.internal.config.package$.<clinit>(package.scala)
	at org.apache.spark.SparkConf$.<init>(SparkConf.scala:639)
	at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
	at org.apache.spark.SparkConf.set(SparkConf.scala:94)
	at org.apache.spark.SparkConf.set(SparkConf.scala:83)
	at com.asinking.utils.SparkUtils.sparkConf(SparkUtils.java:11)
	at com.asinking.ch.ods.OdsBase.exec(OdsBase.java:42)
	at com.asinking.ch.ods.OdsCHFinanceshipmentEvent.main(OdsCHFinanceshipmentEvent.java:6)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module @2362f559
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
	at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
	at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188)
	at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181)
	at org.apache.spark.unsafe.Platform.<clinit>(Platform.java:56)
	... 10 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

应该是jdk版本的问题,不能用openjdk

二十四

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.<init>(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
	at org.apache.hudi.MergeOnReadSnapshotRelation.$anonfun$buildFileIndex$1(MergeOnReadSnapshotRelation.scala:151)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.Iterator.foreach(Iterator.scala:941)

  • 1
  • 2
  • 3
  • 4
  • 5

原因是服务器上的spark jar是亚马逊改造过的包,而其上的hudi包是没改造过的,依赖的是apache的spark包,这两个spark包的部分代码不一致。

二十五,If the schema is inferred from a Scala tuple/case class, or a Java bean, ple

当DF的schema是从一个class推断出来的情况下,schema中的字段都是非空,如果出现空字段就会报错。

二十六,Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:

spark读Hudi的过程中,报错,原因是Hudi中有非法数据,导致报错

二十七,spark多字段join

如下写法会导致后面的Select报错

 Seq<String> conditionSeq = JavaConverters
                .asScalaIteratorConverter(Arrays.asList("event_type","financial_event_group_id","company_id")
                        .listIterator()).asScala().toSeq();
        Dataset<Row> finalDS = newDS.join(oldDataSet, oldDataSet.col("event_type").equalTo(newDS.col("event_type")),"full").select(columns(oldDataSet,newDS));
        
  • 1
  • 2
  • 3
  • 4
  • 5

正解如下,join条件不要用Seq,Seq会导致关联字段在结果中去重,即结果只有一列

            Dataset<Row> finalDS = newDS.join(oldDataSet,
                    oldDataSet.col(Constant.COMPANY_ID).equalTo(newDS.col(Constant.COMPANY_ID))
                            .and(oldDataSet.col(Constant.EVENT_TYPE).equalTo(newDS.col(Constant.EVENT_TYPE)))
                            .and(oldDataSet.col(Constant.EVENT_GROUP_ID).equalTo(newDS.col(Constant.EVENT_GROUP_ID)))
//                        .and(oldDataSet.col("index").equalTo(newDS.col("index")))
                    , Constant.FULL_JOIN).select(columns(oldDataSet, newDS));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二十八,spark on yarn 错误ExitCodeException exitCode=13

exitCode对应不同的错误,exitCode=13是由于集群模式冲突

SparkSession用了.master(MASTER)或者.setMaster(MASTER)。而我现在要用的时yarn模式。

二十九, cannot resolve ‘page’ given input columns

在这里插入图片描述

往下翻日志,看哪一行代码报错:

在这里插入图片描述
查看代码猜测是查询的字段在目标表中不存在

三十,cannot resolve '(named_struct(‘company_id’, hudi.company_id, ‘financial_event_group_id’, hudi.financial_event_group_id) IN (named_struct(‘col1’, ‘90136216048254976’

在这里插入图片描述
原因是company_id被识别为long类型,后面的值不能加单引号

三十一,java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.CosFileSystem not found

首先要依赖hadoop-cos包,其次pom不能是provided

三十二, Listing leaf files and directories for 40212 paths:

由于小文件产生的spark job performance问题

三十三,Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

  resDf.map( x => x.toString().replace("B35C300672","B35C300672_1").replace(",",Consts.starrocksSep))
  • 1

解决办法:

implicit val encoder=org.apache.spark.sql.Encoders.STRING
  • 1

三十四,value hasNext is not a member of Object

scala 2.12的问题,后面我们只需要将所有 DF 需要 foreachPartition 的地方,前面都加上 rdd. 就可以了
在这里插入图片描述

三十五,No implicits found for parameter evidence$6: Encoder[String]

implicit val encoder=org.apache.spark.sql.Encoders.STRING
  • 1

在这里插入图片描述

三十六:AnalysisException: cannot resolve ‘c.sid_seller’ given input columns:

在这里插入图片描述
这个错可能有多种原因,我这次是因为在left join时on中的条件使用了在之后才定义的表,如下:

在这里插入图片描述

三十七,.c000.snappy.parquet is not a Parquet file

spark读取parquet时报如上错误,原因是数据是从tar压缩包解压出来的,而压缩包是不全的,重新上传一次压缩包

三十八,spark作业启用动态资源分配,先分配了10个executor,运行完job0,竟然没有接着利用这10个Executor运行job1,大概等待了数个小时,等集群有了资源才整体分配executor来执行job1,这是什么原因?

第一个job需要10个executor,第二个作业需要100个,按道理第一个跑完,第二个可以开始部分跑,但实际上,一直等到申请到全部资源后才开始跑

三十九, WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

https://stackoverflow.com/questions/46907447/meaning-of-apache-spark-warning-calling-spill-on-rowbasedkeyvaluebatch

四十,java.lang.String is not a valid external type for schema of int

我需要在每个分区内给每行加上序号,前500W行的序号是1,后面的是行数,最开始是这样写的:

val structType = StructType(Array(
      StructField("batchNum", IntegerType),
      StructField("carNum", IntegerType),
      StructField("idx", IntegerType))
    )
    val rddRows: RDD[Row] = spark.read.parquet("hdfs://HD501/data/apply/")
      .rdd.mapPartitions(itr => {
      var i = 0;
      itr.map(row => {
        i = i + 1
        val buffer = Row.unapplySeq(row).get.toBuffer
        if (i < 5000000) {
          buffer.append(1)
        } else {
          buffer.append(i)
        }
        val schema: StructType = row.schema
          .add("idx", IntegerType)
        val newRow: Row = new GenericRowWithSchema(buffer.toArray, schema)
        newRow
      })
    })

    spark.createDataFrame(rddRows,structType)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

改成下面就可以了:

val structType = StructType(Array(
      StructField("batchNum", IntegerType),
      StructField("carNum", StringType),
      StructField("idx", IntegerType))
    )
    val rddRows: RDD[Row] = spark.read.parquet("hdfs://HDFS16501/data/apply/")
      .rdd.mapPartitions(itr => {
      var i = 0;
      itr.map(row => {
        i = i + 1
        var num = i;
        if (i < 5000000) {
          num = 1
        }
        Row.fromSeq(Seq(
          row.getAs[Int]("batchNum"),
          row.getAs[String]("carNum"),
          num
        ))
      })
    })

    spark.createDataFrame(rddRows,structType)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

四十,Exception in thread “main” org.sparkproject.guava.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.avro.Schema.createUnion([Lorg/apache/avro/Schema;)Lorg/apache/avro/Schema;

at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:155)
at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:249)
at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:288)
at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:278)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
 <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

四十一,Exception in thread “main” java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.rdd.RDDOperationScope$

at org.apache.spark.SparkContext.withScope(SparkContext.scala:786)
at org.apache.spark.SparkContext.parallelize(SparkContext.scala:803)
at org.apache.spark.api.java.JavaSparkContext.parallelize(JavaSparkContext.scala:136)
at org.apache.hudi.client.common.HoodieSparkEngineContext.map(HoodieSparkEngineContext.java:100)
at org.apache.hudi.metadata.FileSystemBackedTableMetadata.getAllPartitionPaths(FileSystemBackedTableMetadata.java:81)
at org.apache.hudi.common.fs.FSUtils.getAllPartitionPaths(FSUtils.java:291)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
 <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

<jackson.version>2.6.5</jackson.version>

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/899631
推荐阅读
相关标签
  

闽ICP备14008679号