赞
踩
一,Task not serializable
原因:用了mysql的jdbc,其connect需要在各个服务器上单据创建,不能集群共享一个数据连接。
下图中的driver和excutor不在同一台服务器,connection不能共享。
第二种:使用了实现某个接口匿名内部类,这个内部类没有任何序列化的标识,所以报错,这种情况下不能使用匿名内部类,创建一个类,实现需要的接口,同时实现Serializable接口:
static class MyMapFunction implements scala.Function1<Row, Teacher>,Serializable { @Override public Teacher apply(Row row) { String info = row.getString(0); String[] split = info.split(","); return new Teacher(split[0],split[1],split[2]); } @Override public <A> Function1<A, Teacher> compose(Function1<A, Row> g) { return Function1.super.compose(g); } @Override public <A> Function1<Row, A> andThen(Function1<Teacher, A> g) { return Function1.super.andThen(g); } }
val dfFromRddPerson = rddPerson.toDF
原因是样例类不能在使用样例类的方法内部使用
报错:
Asked to remove non-existent executor 203
类似出现了死循环,一直打如上日志。
百度有人说时没有指定driver的ip,driver运行在本地,在SparkConf设置driver的ip
val conf= new SparkConf().setAppName(this.getClass.getName)
.setMaster("spark://node1:7077")
.set("spark.driver.host","192.168.239.1")
配置完成后就可以了。
--------- 参考文献
报错:
Could not parse Master URL: 'yarn'
查源码发现,有这么一段代码:
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(
s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
}
serviceLoaders.headOption
}
往上跟,发现当spark作业需要外部集群支持时,需要ExternalClusterManager,对于yarn集群来说,就是YarnClusterManager,这个类在spark-yarn包中。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
添加了依赖之后,还是报错,在代码里加了这样一句代码:
val value = this.getClass.getClassLoader.loadClass("org.apache.spark.scheduler.cluster.YarnClusterManager")
发现这句代码报错:找不到对应的类。
明明已经有了对应的包怎么还不行,想起之前的问题,要配置idea。
/usr/local/hadoop$ hadoop pipes -Dhadoop.pipes.java.recordreader=true -Dhadoop.pipes.java.recordwriter=true -input someFile.txt -output /out -program bin/wordcount
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.
13/12/14 20:12:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
13/12/14 20:12:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
13/12/14 20:12:07 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:08 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:09 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:10 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:11 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:12 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:13 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
13/12/14 20:12:14 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032.
原因:第一次打包时没有在resource目录下添加yarn-site.xml文件,后面添加了这个文件,但是没有重新打包,所以服务器上找不到对应的配置文件。
Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:45)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:163)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:161)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:501)
at spark2.SparkWithYarn$.main(SparkWithYarn.scala:34)
at spark2.SparkWithYarn.main(SparkWithYarn.scala)
第一种解决办法:在yarn-site.xml中关闭timeline服务
<property>
<name>yarn.timeline-service.enabled</name>
<value>true</value>
</property>
第二种解决办法:在pom文件中指定jersey-core、jersey-client的版本。
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.9</version>
</dependency>
8/05/22 17:13:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
报错信息如下
Cannot assign requested address: bind: Service 'sparkDriver' failed
解决方案配置环境变量:
SPARK_LOCAL_IP=127.0.0.1
原因不明。
Exception in thread "main" java.sql.SQLException: No suitable driver found for jdbc:hive2://localhost:10000/mytest
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at spark2.sql.JdbcSparkPort10000$.main(JdbcSparkPort10000.scala:7)
at spark2.sql.JdbcSparkPort10000.main(JdbcSparkPort10000.scala)
1,首先,“ jdbc:hive2://node3:10000/mytest”jdbc前面加了空格。
2,其次,没有添加依赖。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
两种方法:方法一
idea启动spark工程,报错
原因是工程有4个模块,其中一个模块要依赖另一个模块,没有依赖成功。
CAST(value AS STRING)
写错了,写成了 CAST(value) AS STRING
调用kuduContext创建表的api报错:
Cannot resolve overloaded method 'createTable'
不知道是为什么。
解决方法,把光标移动到报红的代码,按键:alt+enter:
按照提示,回车选择“Convert to infix expression”
后来找到原因了:是因为import的包不对
报错的信息告诉我们,左外关联有隐式的笛卡尔积,我在网上找了一下,有一篇文章提到这个错误,原因是其中两个DataFrame有相同的血缘关系。参考文献
我检查了下代码,发现可能有血缘关系的有这两个DataFrame:
val codesDF: DataFrame = getKuduSource(spark, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY)
val codeCustomerDF: DataFrame = codesDF
.where($"type" === CodeTypeMapping.CustomerType)
.select($"code", $"codeDesc")
val codeChannelDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
.select($"code", $"codeDesc")
codeCustomerDF 和 codeChannelDF都来自codesDF。
但之前我也有过这种写法,没报错,所以把之前的代码找出来,对比了下,发现区别在于给select出来的两列都取了别名:
val codeCustomerDF: DataFrame = codesDF
.where($"type" === CodeTypeMapping.CustomerType)
.select($"code".as("type"), $"codeDesc".as("name"))
val codeChannelDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
.select($"code".as("type"), $"codeDesc".as("name"))
照着上面改了代码,果然可以运行,但不知为什么,百思不得其解。
spark写入kudu时先判断表是否创建,如果未创建就先创建表,后插入数据。
第一次执行时把字段类型搞错了,执行过程中报错,但此时已经创建表。
报错之后,把字段类型修正过来,执行还是报错,原因在于第一次已经创建表,且是按照错误的类型创建的。第二次执行时虽然代码改过来了,在代码层面数据和字段类型是一致的,但是数据的实际类型和之前创建的表的字段类型不一致,所以报错。
解决办法,把表删掉,重新自动创建。
Exception in thread "main" java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
at org.apache.spark.sql.execution.SparkPlan.org$apache$spark$sql$execution$SparkPlan$$decodeUnsafeRows(SparkPlan.scala:276)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$1.apply(SparkPlan.scala:368)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeTake$1.apply(SparkPlan.scala:368)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:368)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
原因:https://blog.csdn.net/m0_37914799/article/details/84992275
解决办法:添加依赖:
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
自定义数据源报错:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.LogicalRelation$.apply(LogicalRelation.scala:77)
at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:415)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at SparkTest$.main(SparkTest.scala:37)
at SparkTest.main(SparkTest.scala)
原因是:DefaultSource创建HBaseRelation时传递schema为空,尽管在HBaseRelation没有用到这个变量,也不能穿孔
愿意是将查询结果转化为Row时,没有将之从bytes转换为string
scala.MatchError: [B@53a3605e (of class [B)
原因是主机名中有空格
21/08/02 15:16:03 ERROR client.StaticHostProvider: Unable to connect to server: bigdatacdh01.cn:2181 java.net.UnknownHostException: bigdatacdh01.cn at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:907) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1302) at java.net.InetAddress.getAllByName0(InetAddress.java:1255) at java.net.InetAddress.getAllByName(InetAddress.java:1171) at java.net.InetAddress.getAllByName(InetAddress.java:1105) at org.apache.zookeeper.client.StaticHostProvider.<init>(StaticHostProvider.java:60) at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:445) at org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:380) at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.checkZk(RecoverableZooKeeper.java:141) at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.<init>(RecoverableZooKeeper.java:128) at org.apache.hadoop.hbase.zookeeper.ZKUtil.connect(ZKUtil.java:137) at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:185) at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:153) at org.apache.hadoop.hbase.client.ZooKeeperKeepAliveConnection.<init>(ZooKeeperKeepAliveConnection.java:43) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveZooKeeperWatcher(ConnectionManager.java:1823) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:104) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:931) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:658) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:186) at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:241) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:240) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:125) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924) at com.lcy.models.tags.BaseModel$class.readBusinessData(BaseModel.scala:89) at com.lcy.models.tags.PayTypeModel$.readBusinessData(PayTypeModel.scala:4) at com.lcy.models.tags.BaseModel$class.execute(BaseModel.scala:186) at com.lcy.models.tags.PayTypeModel$.execute(PayTypeModel.scala:4) at com.lcy.models.tags.PayTypeModel$.main(PayTypeModel.scala:14) at com.lcy.models.tags.PayTypeModel.main(PayTypeModel.scala) 21/08/02 15:16:03 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=bigdatacdh01.cn:2181 sessionTimeout=90000 watcher=hconnection-0x3b2114ce0x0, quorum=bigdatacdh01.cn:2181, baseZNode=/hbase 21/08/02 15:16:03 ERROR client.StaticHostProvider: Unable to connect to server: bigdatacdh01.cn:2181
val basicRuleMap: Map[String, String] = basicDF .filter($"level" === 5) .collect().map(row => { // todo 不知道为什么如果key是Int,后面使用map.getOrElse时获取不到正确的值 // row.getAs[Int]("rule")->row.getAs[String]("name") row.getAs[String]("rule")->row.getAs[String]("name") }).toMap val centerNameMap = kMeansModel.clusterCenters.zipWithIndex.map(row => { (row._1.toArray.sum, row._2) }).sortWith((kv1, kv2) => kv1._1 > kv2._1) .zipWithIndex.map( kv => { println("kv_2:"+kv._2) // 如果basicRuleMap的key是Int则获取不到值,必须是String (kv._1._2, basicRuleMap.getOrElse(kv._2.toString, "")) } ).toMap
org.apache.spark.sql.AnalysisException: cannot resolve '`total`' given input columns: [CAST(count(label) AS `total` AS BIGINT), label, CAST(sum(female) AS `cnt_female` AS DOUBLE), CAST(sum(male) AS `cnt_male` AS DOUBLE), userId];;
'Project [userId#83, UDF('total, 'cnt_male, 'cnt_female) AS usg#117]
+- Aggregate [userId#83, label#45], [userId#83, label#45, cast(count(label#45) as bigint) AS CAST(count(label) AS `total` AS BIGINT)#108L, cast(sum(cast(male#89 as bigint)) as double) AS CAST(sum(male) AS `cnt_male` AS DOUBLE)#109, cast(sum(cast(female#90 as bigint)) as double) AS CAST(sum(female) AS `cnt_female` AS DOUBLE)#110]
+- Project [userId#83, label#45, CASE WHEN (label#45 = 0) THEN 1 ELSE 0 END AS male#89, CASE WHEN (label#45 = 1) THEN 1 ELSE 0 END AS female#90]
+- Project [memberId#31 AS userId#83, color#50, product#55, label#45]
+- Join Inner, (orderSn#32 = cOrderSn#14)
:- Project [cOrderSn#14, CASE WHEN (ogColor#15 = 蓝色) THEN 17 WHEN (ogColor#15 = 金色) THEN 8 WHEN (ogColor#15 = 月光银) THEN 11 WHEN (ogColor#15 = 银色) THEN 20 WHEN (ogColor#15 = 黑色) THEN 2 WHEN (ogColor#15 = 梦境极光【布朗灰】) THEN 5 WHEN (ogColor#15 = 香槟金) THEN 14 WHEN (ogColor#15 = 梦境极光【卡其金】) THEN 4 WHEN (ogColor#15 = 香槟色) THEN 13 WHEN (ogColor#15 = 樱花粉) THEN 16 WHEN (ogColor#15 = 金属灰) THEN 7 WHEN (ogColor#15 = 香槟金色) THEN 1 WHEN (ogColor#15 = 布鲁钢) THEN 10 WHEN (ogColor#15 = 玫瑰金) THEN 19 WHEN (ogColor#15 = 金属银) THEN 18 WHEN (ogColor#15 = 乐享金) THEN 9 WHEN (ogColor#15 = 白色) THEN 3 WHEN (ogColor#15 = 时尚光谱【浅金棕】) THEN 12 WHEN (ogColor#15 = 灰色) THEN 15 WHEN (ogColor#15 = 粉色) THEN 6 ELSE 0 END AS color#50, CASE WHEN (productType#16 = 电热水器) THEN 23 WHEN (productType#16 = 前置过滤器) THEN 8 WHEN (productType#16 = 滤芯) THEN 17 WHEN (productType#16 = 电饭煲) THEN 26 WHEN (productType#16 = 嵌入式厨电) THEN 11 WHEN (productType#16 = Haier/海尔冰箱) THEN 2 WHEN (productType#16 = 燃气灶) THEN 20 WHEN (productType#16 = 冰吧) THEN 5 WHEN (productType#16 = 料理机) THEN 14 WHEN (productType#16 = 挂烫机) THEN 13 WHEN (productType#16 = Leader/统帅冰箱) THEN 4 WHEN (productType#16 = 电水壶/热水瓶) THEN 22 WHEN (productType#16 = 净水机) THEN 7 WHEN (productType#16 = 波轮洗衣机) THEN 16 WHEN (productType#16 = 电风扇) THEN 25 WHEN (productType#16 = 吸尘器/除螨仪) THEN 10 WHEN (productType#16 = 4K电视) THEN 1 WHEN (productType#16 = 空气净化器) THEN 28 WHEN (productType#16 = 烤箱) THEN 19 WHEN (productType#16 = 破壁机) THEN 27 WHEN (productType#16 = 取暖电器) THEN 9 WHEN (productType#16 = 烟灶套系) THEN 18 WHEN (productType#16 = 微波炉) THEN 12 WHEN (productType#16 = LED电视) THEN 3 WHEN (productType#16 = 燃气热水器) THEN 21 WHEN (productType#16 = 智能电视) THEN 15 WHEN (productType#16 = 冷柜) THEN 6 WHEN (productType#16 = 电磁炉) THEN 24 ELSE 0 END AS product#55, CASE WHEN (((((((ogcolor#15 = 樱花粉) || (ogcolor#15 = 白色)) || (ogcolor#15 = 香槟色)) || (ogcolor#15 = 香槟金)) || (productType#16 = 料理机)) || (productType#16 = 挂烫机)) || (productType#16 = 吸尘器/除螨仪)) THEN 1 ELSE 0 END AS label#45]
: +- LogicalRDD [cOrderSn#14, ogColor#15, productType#16]
+- Relation[memberId#31,orderSn#32] com.lcy.models.datasource.HBaseRelation@6700374f
原因在于cast现在as后面,导致as没生效。
错误写法:
count("label").as("total").cast(LongType)
count("label").cast(LongType).as("total")
Caused by: java.io.NotSerializableException: com.asinking.profit.ods.ods_s3_to_hudi_eg2$$Lambda$853/535708280
解决方案,把匿名内部类改为实现了Seriable的类
static class MyFunction implements Function1<String, Row>, Serializable { @Override public Row apply(String item) { final String[] items = item.split(","); return RowFactory.create(items[0], items[1], items[2], items[3]); } @Override public <A> Function1<A, Row> compose(Function1<A, String> g) { return null; } @Override public <A> Function1<String, A> andThen(Function1<Row, A> g) { return null; } } }
Exception in thread "main" java.lang.ExceptionInInitializerError at org.apache.spark.unsafe.array.ByteArrayMethods.<clinit>(ByteArrayMethods.java:54) at org.apache.spark.internal.config.package$.<init>(package.scala:1006) at org.apache.spark.internal.config.package$.<clinit>(package.scala) at org.apache.spark.SparkConf$.<init>(SparkConf.scala:639) at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala) at org.apache.spark.SparkConf.set(SparkConf.scala:94) at org.apache.spark.SparkConf.set(SparkConf.scala:83) at com.asinking.utils.SparkUtils.sparkConf(SparkUtils.java:11) at com.asinking.ch.ods.OdsBase.exec(OdsBase.java:42) at com.asinking.ch.ods.OdsCHFinanceshipmentEvent.main(OdsCHFinanceshipmentEvent.java:6) Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make private java.nio.DirectByteBuffer(long,int) accessible: module java.base does not "opens java.nio" to unnamed module @2362f559 at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Constructor.checkCanSetAccessible(Constructor.java:188) at java.base/java.lang.reflect.Constructor.setAccessible(Constructor.java:181) at org.apache.spark.unsafe.Platform.<clinit>(Platform.java:56) ... 10 more
应该是jdk版本的问题,不能用openjdk
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.<init>(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
at org.apache.hudi.MergeOnReadSnapshotRelation.$anonfun$buildFileIndex$1(MergeOnReadSnapshotRelation.scala:151)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
原因是服务器上的spark jar是亚马逊改造过的包,而其上的hudi包是没改造过的,依赖的是apache的spark包,这两个spark包的部分代码不一致。
当DF的schema是从一个class推断出来的情况下,schema中的字段都是非空,如果出现空字段就会报错。
spark读Hudi的过程中,报错,原因是Hudi中有非法数据,导致报错
如下写法会导致后面的Select报错
Seq<String> conditionSeq = JavaConverters
.asScalaIteratorConverter(Arrays.asList("event_type","financial_event_group_id","company_id")
.listIterator()).asScala().toSeq();
Dataset<Row> finalDS = newDS.join(oldDataSet, oldDataSet.col("event_type").equalTo(newDS.col("event_type")),"full").select(columns(oldDataSet,newDS));
正解如下,join条件不要用Seq,Seq会导致关联字段在结果中去重,即结果只有一列
Dataset<Row> finalDS = newDS.join(oldDataSet,
oldDataSet.col(Constant.COMPANY_ID).equalTo(newDS.col(Constant.COMPANY_ID))
.and(oldDataSet.col(Constant.EVENT_TYPE).equalTo(newDS.col(Constant.EVENT_TYPE)))
.and(oldDataSet.col(Constant.EVENT_GROUP_ID).equalTo(newDS.col(Constant.EVENT_GROUP_ID)))
// .and(oldDataSet.col("index").equalTo(newDS.col("index")))
, Constant.FULL_JOIN).select(columns(oldDataSet, newDS));
exitCode对应不同的错误,exitCode=13是由于集群模式冲突
SparkSession用了.master(MASTER)
或者.setMaster(MASTER)
。而我现在要用的时yarn模式。
page
’ given input columns往下翻日志,看哪一行代码报错:
查看代码猜测是查询的字段在目标表中不存在
company_id
, ‘financial_event_group_id’, hudi.financial_event_group_id
) IN (named_struct(‘col1’, ‘90136216048254976’
原因是company_id被识别为long类型,后面的值不能加单引号
首先要依赖hadoop-cos包,其次pom不能是provided
由于小文件产生的spark job performance问题
resDf.map( x => x.toString().replace("B35C300672","B35C300672_1").replace(",",Consts.starrocksSep))
解决办法:
implicit val encoder=org.apache.spark.sql.Encoders.STRING
scala 2.12的问题,后面我们只需要将所有 DF 需要 foreachPartition 的地方,前面都加上 rdd. 就可以了
implicit val encoder=org.apache.spark.sql.Encoders.STRING
c.sid_seller
’ given input columns:
这个错可能有多种原因,我这次是因为在left join时on中的条件使用了在之后才定义的表,如下:
spark读取parquet时报如上错误,原因是数据是从tar压缩包解压出来的,而压缩包是不全的,重新上传一次压缩包
第一个job需要10个executor,第二个作业需要100个,按道理第一个跑完,第二个可以开始部分跑,但实际上,一直等到申请到全部资源后才开始跑
我需要在每个分区内给每行加上序号,前500W行的序号是1,后面的是行数,最开始是这样写的:
val structType = StructType(Array( StructField("batchNum", IntegerType), StructField("carNum", IntegerType), StructField("idx", IntegerType)) ) val rddRows: RDD[Row] = spark.read.parquet("hdfs://HD501/data/apply/") .rdd.mapPartitions(itr => { var i = 0; itr.map(row => { i = i + 1 val buffer = Row.unapplySeq(row).get.toBuffer if (i < 5000000) { buffer.append(1) } else { buffer.append(i) } val schema: StructType = row.schema .add("idx", IntegerType) val newRow: Row = new GenericRowWithSchema(buffer.toArray, schema) newRow }) }) spark.createDataFrame(rddRows,structType)
改成下面就可以了:
val structType = StructType(Array( StructField("batchNum", IntegerType), StructField("carNum", StringType), StructField("idx", IntegerType)) ) val rddRows: RDD[Row] = spark.read.parquet("hdfs://HDFS16501/data/apply/") .rdd.mapPartitions(itr => { var i = 0; itr.map(row => { i = i + 1 var num = i; if (i < 5000000) { num = 1 } Row.fromSeq(Seq( row.getAs[Int]("batchNum"), row.getAs[String]("carNum"), num )) }) }) spark.createDataFrame(rddRows,structType)
at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:155)
at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:249)
at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:288)
at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:278)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
at org.apache.spark.SparkContext.withScope(SparkContext.scala:786)
at org.apache.spark.SparkContext.parallelize(SparkContext.scala:803)
at org.apache.spark.api.java.JavaSparkContext.parallelize(JavaSparkContext.scala:136)
at org.apache.hudi.client.common.HoodieSparkEngineContext.map(HoodieSparkEngineContext.java:100)
at org.apache.hudi.metadata.FileSystemBackedTableMetadata.getAllPartitionPaths(FileSystemBackedTableMetadata.java:81)
at org.apache.hudi.common.fs.FSUtils.getAllPartitionPaths(FSUtils.java:291)
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<jackson.version>2.6.5</jackson.version>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。