当前位置:   article > 正文

Flink开发问题汇总 (持续更新20210219)_cannot instantiate user function

cannot instantiate user function

采用scala语言

mac系统下通过brew安装时,本地默认安装地址 /usr/local/Cellar/apache-flink/1.5.1

一、flink里面能调用图算法吗?

在dataSet时可以调用,在dataStream时需要自己写方法,可以自己用scala实现,参考源码即可

官网链接

二、Cannot instantiate user function

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of com.meituan.flink.demo.local.testTopic$$anonfun$2 to field org.apache.flink.streaming.api.scala.DataStream$$anon$4.cleanFun$3 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$4
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

是commons-collections包冲突,使用正常的pom文件,exclude掉相关包就行

三、本地idea开发flink程序并本地运行读取HDFS文件

添加依赖:

参考博客

       <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

读写hdfs数据代码:

参考博客

DataSet<String> hdfslines=env.readTextFile("your hdfs path")
hdfslines.writeAsText("your hdfs path")
  • 1
  • 2

Flink 的 HDFS Connector
参考博客
这个Connector提供了一个sink来写分区文件到任何Hadoop FileSystem支持的任何文件系统中,为了使用这个Connector,请将下面的依赖添加到你的工程中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.10</artifactId>
  <version>1.3.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

四、The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed

此类异常,通过查看日志,一般就是某一个Flink App内存占用大,导致TaskManager(在Yarn上就是Container)被Kill掉。如果代码写的没问题,就确实是资源不够了,其实1G Slot跑多个Task(Slot Group Share)其实挺容易出现的。因此有两种选择。可以根据具体情况,权衡选择一个。

  1. 将该Flink App调度在Per Slot内存更大的集群上。
  2. 通过slotSharingGroup(“xxx”),减少Slot中共享Task的个数

参考博客1
参考博客2

五、java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e03_1553154249286_347659_01_000011 timed out.

TaskManager 心跳超时了,JobManager 的内存调大一点

六、could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[Int]

解决办法: import org.apache.flink.api.scala._

参考博客1
参考博客2

七、flink中使用event事件以及process处理状态数据

具体内容可以参考:http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

其中需要注意的是真实场景的数据时乱序的,所以需要用

//1、使用windowFunction要注意使用scala的api
import org.apache.flink.streaming.api.scala.function.WindowFunction


//2、设定处理时间为事件发生的时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//3、准备好对应的带时间戳数据后,设置以下乱序水印,其中时间戳必须是毫秒级时间戳
.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[originalData](Time.seconds(10)) {
        override def extractTimestamp(t: originalData): Long = {
          t.timestamp * 1000
        }
      })

//4、在重写keyedProcess里面的processElement和onTimer函数时,scala参数写法如下,奇怪的写法
    override def processElement(input: IpDomainUrlCount,
                                context: KeyedProcessFunction[Tuple, IpDomainUrlCount, String]#Context, 
                                collector: Collector[String]): Unit = {
    }
    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Tuple, IpDomainUrlCount, String]#OnTimerContext,
                         out: Collector[String]): Unit = {
      //如果上一步是以windowEnd为keyby的,则在这里直接获取对应的key,对比传入参数里面的timestamp:1563789420001
      //所以如果使用该timestamp时要记得减1
            val curKeyWindowEnd = ctx.getCurrentKey.getField(0).toString.toLong  //156378942000
    }

// 用于存储ipDomain数据的状态,待收齐同一个窗口的数据后,再触发信息熵计算,注意如何内部使用配置的参数,通过parameters.getLong
    private var itemState: ListState[IpDomainUrlCount] = null
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      curWindowSizeMinute = parameters.getLong("window.size.minute", 0L)
      //状态的注册
      val itemsStateDesc = new ListStateDescriptor("itemState-state", classOf[IpDomainUrlCount])
      itemState = getRuntimeContext.getListState(itemsStateDesc)
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

代码格式参考:
代码格式1
代码格式2

八、flink读取多个权限topic(>=1)

写个union的流程即可,包括生成消费各个topic的实例,再进行合并流量

九、flink内部使用一个外部类调用其接口报错未初始化

scala代码时,一般是三个原因:

1、该外部类在main函数开头进行了全局初始化,但是必须保证该类继承了序列化
2、该类可以在rich function里面的open函数里面加载,这样可不必继承序列化
3、重新定义一个object,在其内部定义一个函数,在函数里面初始化该外部类,这样flink内部调用该object时,因为每次初始化了外部类,所以可以不必保证继承序列化

使用gson时的序列化问题

十、The end timestamp of an event-time window cannot become earlier than the current watermark by merging

相关问题

public class LateEventFilter extends ProcessFunction<PageView, PageView> {
    @Override
    public void processElement(PageView value, Context ctx, Collector<PageView> out) throws Exception {
        if(ctx.timestamp() > ctx.timerService().currentWatermark()){
            out.collect(value);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

十一、map、filter和flatMap算子实例详解,重写函数

http://www.ksbst.com/article/2288563.html

十二、flink延迟处理数据,解决期间查询外部数据不及时问题

用process函数中的onTimer函数
https://blog.csdn.net/qq_27657429/article/details/109569645
https://www.zhihu.com/question/332577514/answer/840181621

十三、flink加载hdfs文件并转换为list,仅测试情况下可以使用

其实我原本的需求是,spark任务定期更新数据到HDFS中,然后想在flink的窗口处理中使用最新的HDFS数据,所以需要flink加载HDFS并变成常量,遗憾的是只在本地测试时可以行得通,线上无法转换成功,只能通过第三方存储实现,比如阿里云或者亚马逊云,或者tair或者redis
1提到使用DataStreamUtils.collect,同时注意java和scala引入的包名称
2使用DataStreamUtils.collect,但是注意在sc.excute后面执行
3真实工程需求,一个hdfs文件每天更新,需要在flink作业中使用
4flink读取kafka数据并写入HDFS
5官方文档读取file

十四、flink中DataStream.addSink方法示例

https://vimsky.com/zh-tw/examples/detail/java-method-org.apache.flink.streaming.api.datastream.DataStream.addSink.html

十五、flink中数据去重

  1. MapState 方式去重
  2. SQL 方式去重
  3. HyperLogLog 方式去重
  4. HyperLogLog 去重优化
  5. bitmap 精确去重

https://www.cnblogs.com/pucheung/
https://blog.csdn.net/valada/article/details/104367378
https://gitbook.cn/books/5e4a6cc338499d0e64018856/index.html

十六、flink中广播流的使用

假设有个这样的需求,一些动态定时变化配置会被使用在流的处理中,此时有两种方法,一种方法是将查询的配置写成sdk,在flink的map或者filter阶段调用SDK,第二种方法是使用广播流进行配置查询,然后与处理流进行合并处理
https://www.jianshu.com/p/a09827b1d6da 比较好的例子
https://segmentfault.com/a/1190000022787007
https://segmentfault.com/a/1190000022755892
https://www.cnblogs.com/felixzh/p/10216454.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/615129
推荐阅读
相关标签
  

闽ICP备14008679号