赞
踩
采用scala语言
mac系统下通过brew安装时,本地默认安装地址 /usr/local/Cellar/apache-flink/1.5.1
在dataSet时可以调用,在dataStream时需要自己写方法,可以自己用scala实现,参考源码即可
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: cannot assign instance of com.meituan.flink.demo.local.testTopic$$anonfun$2 to field org.apache.flink.streaming.api.scala.DataStream$$anon$4.cleanFun$3 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$4 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
是commons-collections包冲突,使用正常的pom文件,exclude掉相关包就行
添加依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
读写hdfs数据代码:
DataSet<String> hdfslines=env.readTextFile("your hdfs path")
hdfslines.writeAsText("your hdfs path")
Flink 的 HDFS Connector
参考博客
这个Connector提供了一个sink来写分区文件到任何Hadoop FileSystem支持的任何文件系统中,为了使用这个Connector,请将下面的依赖添加到你的工程中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.10</artifactId>
<version>1.3.0</version>
</dependency>
此类异常,通过查看日志,一般就是某一个Flink App内存占用大,导致TaskManager(在Yarn上就是Container)被Kill掉。如果代码写的没问题,就确实是资源不够了,其实1G Slot跑多个Task(Slot Group Share)其实挺容易出现的。因此有两种选择。可以根据具体情况,权衡选择一个。
TaskManager 心跳超时了,JobManager 的内存调大一点
解决办法: import org.apache.flink.api.scala._
具体内容可以参考:http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/
其中需要注意的是真实场景的数据时乱序的,所以需要用
//1、使用windowFunction要注意使用scala的api import org.apache.flink.streaming.api.scala.function.WindowFunction //2、设定处理时间为事件发生的时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //3、准备好对应的带时间戳数据后,设置以下乱序水印,其中时间戳必须是毫秒级时间戳 .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[originalData](Time.seconds(10)) { override def extractTimestamp(t: originalData): Long = { t.timestamp * 1000 } }) //4、在重写keyedProcess里面的processElement和onTimer函数时,scala参数写法如下,奇怪的写法 override def processElement(input: IpDomainUrlCount, context: KeyedProcessFunction[Tuple, IpDomainUrlCount, String]#Context, collector: Collector[String]): Unit = { } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, IpDomainUrlCount, String]#OnTimerContext, out: Collector[String]): Unit = { //如果上一步是以windowEnd为keyby的,则在这里直接获取对应的key,对比传入参数里面的timestamp:1563789420001 //所以如果使用该timestamp时要记得减1 val curKeyWindowEnd = ctx.getCurrentKey.getField(0).toString.toLong //156378942000 } // 用于存储ipDomain数据的状态,待收齐同一个窗口的数据后,再触发信息熵计算,注意如何内部使用配置的参数,通过parameters.getLong private var itemState: ListState[IpDomainUrlCount] = null override def open(parameters: Configuration): Unit = { super.open(parameters) curWindowSizeMinute = parameters.getLong("window.size.minute", 0L) //状态的注册 val itemsStateDesc = new ListStateDescriptor("itemState-state", classOf[IpDomainUrlCount]) itemState = getRuntimeContext.getListState(itemsStateDesc) }
写个union的流程即可,包括生成消费各个topic的实例,再进行合并流量
scala代码时,一般是三个原因:
1、该外部类在main函数开头进行了全局初始化,但是必须保证该类继承了序列化
2、该类可以在rich function里面的open函数里面加载,这样可不必继承序列化
3、重新定义一个object,在其内部定义一个函数,在函数里面初始化该外部类,这样flink内部调用该object时,因为每次初始化了外部类,所以可以不必保证继承序列化
public class LateEventFilter extends ProcessFunction<PageView, PageView> {
@Override
public void processElement(PageView value, Context ctx, Collector<PageView> out) throws Exception {
if(ctx.timestamp() > ctx.timerService().currentWatermark()){
out.collect(value);
}
}
}
http://www.ksbst.com/article/2288563.html
用process函数中的onTimer函数
https://blog.csdn.net/qq_27657429/article/details/109569645
https://www.zhihu.com/question/332577514/answer/840181621
其实我原本的需求是,spark任务定期更新数据到HDFS中,然后想在flink的窗口处理中使用最新的HDFS数据,所以需要flink加载HDFS并变成常量,遗憾的是只在本地测试时可以行得通,线上无法转换成功,只能通过第三方存储实现,比如阿里云或者亚马逊云,或者tair或者redis
1提到使用DataStreamUtils.collect,同时注意java和scala引入的包名称
2使用DataStreamUtils.collect,但是注意在sc.excute后面执行
3真实工程需求,一个hdfs文件每天更新,需要在flink作业中使用
4flink读取kafka数据并写入HDFS
5官方文档读取file
https://www.cnblogs.com/pucheung/
https://blog.csdn.net/valada/article/details/104367378
https://gitbook.cn/books/5e4a6cc338499d0e64018856/index.html
假设有个这样的需求,一些动态定时变化配置会被使用在流的处理中,此时有两种方法,一种方法是将查询的配置写成sdk,在flink的map或者filter阶段调用SDK,第二种方法是使用广播流进行配置查询,然后与处理流进行合并处理
https://www.jianshu.com/p/a09827b1d6da 比较好的例子
https://segmentfault.com/a/1190000022787007
https://segmentfault.com/a/1190000022755892
https://www.cnblogs.com/felixzh/p/10216454.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。