赞
踩
参考:
Flink 源码
Error:(.., ..) could not find implicit value for evidence parameter of type
...
首先,这句错误信息代表的含义是:Scala无法找到隐式转换的参数,也就是他说的evidence parameter。在Flink Scala API中,通常有两种情况会造成这类报错。
第一种非常简单,属于上手Flink第一课的内容,获取运行环境StreamExecutionEnvironment.getExecutionEnvironment
:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment //如果是这么引用运行环境,大概率会报错
object TestTypeInformation {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[(String, Long)] = env.fromCollection(Seq(("A", 1L))) //这里就会报错了
//...
}
}
错误信息如下:
Error:(9, 68) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[(String, Long)]
val dataStream: DataStream[(String, Long)] = env.fromCollection(source)
造成他的问题就是上面所说,我们需要一个TypeInformation[(String, Long)],但是我们没有提供,也找不到任何的隐式值。所有创建流的API都需要这样的TypeInformation,不仅如此,Flink中的泛型T
都需要被包装成TypeInformation[T]
。Flink通过TypeInformation中的信息,来决定如何高效的序列化和反序列化这个泛型的对象。因为写泛型的人并不知道最终泛型会是那个类,所以我们需要另外去提供这个泛型所匹配的类的信息。
官方直接给出了解决方法:把StreamExecutionEnvironment
所在的整个包引入。也就是把:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
换成
import org.apache.flink.streaming.api.scala._
官方的说法是产生TypeInformation
的代码没有被引入。那这个代码到底在哪里?
实际上创建我们要找的代码在包对象中。也就是org.apache.flink.streaming.api
包下面的package.scala
。第一行代码就是了。这是一个隐式转换函数,输出就是一个TypeInformation。就是这个函数为我们提供了之前缺少的TypeInformation。Flink利用这个scala的macro,让代码在被编译的时候,就能够获得所有的参数和返回值是什么类型,然后包装进TypeInformation。
package object scala {
// We have this here so that we always have generated TypeInformationS when
// using the Scala API
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
//...
}
我们只用引入这个函数,就不会再出现这种错误了,不需要引入整个包。
import org.apache.flink.streaming.api.scala.{createTypeInformation, StreamExecutionEnvironment}
object TestTypeInformation {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[(String, Long)] = env.fromCollection(Seq(("A", 1L))) //这里不再会报错
//...
}
}
那为什么要引入整个包?我个人猜测是为了方便记,不需要记忆具体是那个方法,而且这个包内的API很常用,之后不需要再引入包中其他的类。
除了这个最基本的情况会报这个错误,另外一种情况是:我们自己定义的泛型被传入了flink中需要TypeInformation的泛型的地方。
假如我们自己定义了一个带泛型的方法,它所做的就是调用map方法,把每个tuple2中的第一个元素取出。map的API的泛型规定了它的输出是一个TypeInformation。
def map[R: TypeInformation](fun: T => R): DataStream[R] = {
而我们自己定义的泛型并没有规定输出他是一个TypeInformation。
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
object TestTypeInformation {
def selectFirst[T](input: DataStream[(T, T)]): DataStream[T] = {
input.map(_._1)
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.fromCollection(Seq((1L, 3L), (2L, 4L)))
val result = selectFirst(dataStream)
result.print()
env.execute()
}
}
错误信息就产生于map函数:
Error:(7, 14) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[T]
input.map(_._1)
意思跟第一种情况类似,就是map需要输出TypeInformation,但是我们找不到。这时我们只需要将我们自己的泛型限制为TypeInformation,就能够触发隐式转换了。
import org.apache.flink.api.common.typeinfo.TypeInformation
def selectFirst[T: TypeInformation](input: DataStream[(T, Long)]): DataStream[T] = {
input.map(_._1)
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。