赞
踩
flink源码以及样例等:https://github.com/apache/flink
flink各个版本开发文档:https://nightlies.apache.org/flink/
这StreamExecutionEnvironment是所有 Flink 程序的基础(创建批处理请使用ExecutionEnvironment)。创建一个执行环境,表示当前执行程序的上下文,类似于SparkContext.
1:StreamExecutionEnvironment的API解释
protected static void envSetConfig( LocalStreamEnvironment env) { env.setBufferTimeout(1000); env.setMaxParallelism(10);//设置最大并行度 env.setParallelism(8);//设置并行度 //设置重启策略 env.enableCheckpointing(5000);//设置重启策略必须先开启enableCheckpointing env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(5, Time.milliseconds(100),Time.milliseconds(100))); //设置状态后端的存储位置。包括内存,文件系统等 //1:内存 env.setStateBackend(new MemoryStateBackend()); //2:文件系统时必须指定checkpoint存储路径 env.setStateBackend(new FsStateBackend("C:\\Users\\Administrator.SC-201905261418\\Desktop\\testData\\flink")); //设置时间特性,用于窗口和水印使用 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); }
为了保证task任务或者算子执行过程中的失败能够恢复,启用检查点存储算子的执行状态(快照方式)。失败时从最新的快照进行恢复。
相关的配置参数如下:
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置checkpoint的最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置一次checkpoint完成的最长时间,超过取消此次checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
//只允许一个检查点同时进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 启用作业取消后保留的外部化检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
在Java/Scala API上可以通过 stream.keyBy(…) 得到 KeyedStream,在Python API上可以通过 stream.key_by(…) 得到 KeyedStream。
所有支持的状态类型如下所示:
ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。
MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
状态必须通过RichFunction函数才可以创建,创建StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称、状态所持有值的类型。
使用示例:
stream.keyBy(s -> s).flatMap(new RichFlatMapFunction<Integer, Object>() { //状态作为实例变量进行定义 private ValueState<Integer> vlaueSateTest; @Override //open方法只执行一次 public void open(Configuration parameters) throws Exception { //声明状态:StateDescriptor 包含状态名称和有关状态所存值的类型 //1:声明valuestate ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>( "vlaueSateTest", //状态的名字。必须保证唯一性,后续通过name获取 Integer.class); //存储值的类型。 //可以设置状态的过期时间等。 //StateTtlConfig用于配置状态相关参数 StateTtlConfig build = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.minutes(1)).cleanupFullSnapshot().build(); stateDescriptor.enableTimeToLive(build); //获取状态必须从getRuntimeContext进行get this.vlaueSateTest = getRuntimeContext().getState(stateDescriptor); //2:声明 ListState<IntValue> listStateTest = getRuntimeContext().getListState(new ListStateDescriptor<IntValue>("listStateTest", ValueTypeInfo.INT_VALUE_TYPE_INFO)); } @Override public void flatMap(Integer integer, Collector<Object> collector) throws Exception { //做其他的基于状态的计算,比如更新状态存的值 vlaueSateTest.update(integer); } @Override public void close() throws Exception { } }).print();
每个算子状态绑定到一个并行算子实例,作用范围限定为算子任务,同一并行任务的状态是共享的,并行处理的所有数据都可以访问到相同的状态。Kafka Connector就是使用算子状态的很好的一个例子,Kafka consumer的每个并行实例都维护一个主题分区和偏移,作为算子状态。当并行性发生变化时,算子状态接口支持在并行运算符实例之间重新分配状态。可以有不同的方案来进行这种再分配。
因为同一个并行任务处理的所有数据都可以访问到当前的状态,所以就相当于本地变量
默认情况下,状态保存在TaskManagers的内存中,检查点存储在JobManager的内 存中。检查点的保存位置由配置的配置状态后台存储位置决定。
Flink 为 state 提供了三种开箱即用的后端存储方式(state backend):
Memory State Backend
File System (FS) State Backend
RocksDB State Backend
三种状态下的checkpoint存储位置如下:
设置状态后台
StreamExecutionEnvironment.setStateBackend(…)
MemoryStateBackend 将工作状态数据保存在 taskmanager 的 java 内存中。key/value 状态和 window 算子使用哈希表存储数值和触发器。进行快照时(checkpointing),生成的快照数据将和 checkpoint ACK 消息一起发送给 jobmanager,jobmanager 将收到的所有快照保存在 java 内存中。
MemoryStateBackend 现在被默认配置成异步的,这样避免阻塞主线程的 pipline 处理。
MemoryStateBackend 的状态存取的速度都非常快,但是不适合在生产环境中使用。
每个 state 的默认大小被限制为 5 MB(这个值可以通过MemoryStateBackend 构造函数设置)
每个 task 的所有 state 数据 (一个 task 可能包含一个 pipline 中的多个 Operator) 大小不能超过 RPC 系统的帧大小(akka.framesize,默认 10MB)
jobmanager 收到的 state 数据总和不能超过 jobmanager 内存
下图表示了状态存储位置
FsStateBackend 需要配置一个 checkpoint 路径,例如“hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”,我们一般配置为 hdfs 目录
FsStateBackend 将工作状态数据保存在 taskmanager 的 java 内存中。进行快照时,再将快照数据写入上面配置的路径,然后将写入的文件路径告知 jobmanager。jobmanager 中保存所有状态的元数据信息(在 HA 模式下,元数据会写入 checkpoint 目录)。
FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的 pipline 处理。可以通过 FsStateBackend 构造函数取消该模式:
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend("hdfs://hadoop162:8020/flink/checkpoints/fs"));
RocksDBStateBackend 也需要配置一个 checkpoint 路径,例如:“hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”,一般配置为 hdfs 路径。
RocksDB 是一种可嵌入的持久型的 key-value 存储引擎,提供 ACID 支持。由 Facebook 基于 levelDB 开发,使用 LSM 存储引擎,是内存和磁盘混合存储。
RocksDBStateBackend 将工作状态保存在 taskmanager 的 RocksDB 数据库中;checkpoint 时,RocksDB 中的所有数据会被传输到配置的文件目录,少量元数据信息保存在 jobmanager 内存中( HA 模式下,会保存在 checkpoint 目录)。
RocksDBStateBackend 使用异步方式进行快照。
RocksDBStateBackend 的限制:
由于 RocksDB 的 JNI bridge API 是基于 byte[] 的,RocksDBStateBackend 支持的每个 key 或者每个 value 的最大值不超过 2^31 bytes((2GB))。
要注意的是,有 merge 操作的状态(例如 ListState),可能会在运行过程中超过 2^31 bytes,导致程序失败。
RocksDBStateBackend 适用于以下场景:
超大状态、超长窗口(天)、大键值状态的作业
适合高可用模式
使用 RocksDBStateBackend 时,能够限制状态大小的是 taskmanager 磁盘空间(相对于 FsStateBackend 状态大小限制于 taskmanager 内存 )。这也导致 RocksDBStateBackend 的吞吐比其他两个要低一些。因为 RocksDB 的状态数据的读写都要经过反序列化/序列化。
RocksDBStateBackend 是目前三者中唯一支持增量 checkpoint 的。
Flink DataStream API让用户灵活且高效编写Flink流式程序。主要分为DataSource模块、Transformation模块以及DataSink模块。
Lambda 表达式是 JDK8 的一个新特性,可以取代大部分的匿名内部类,可以用于在对接口更方便的操作。Lambda 只能对接口中使用一个实现方法时才能使用。
语法形式为 () -> {}
其中 () 用来描述参数列表,{} 用来描述方法体,-> 为 lambda运算符
所有的匿名函数都有rich版本,它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。比正常函数多了一个初始化的过程
public static class MyRichMapFunction extends RichMapFunction<Integer, Integer> { // 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次 @Override public void open(Configuration parameters) throws Exception { System.out.println("open ... 执行一次"); } // 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次 @Override public void close() throws Exception { System.out.println("close ... 执行一次"); } @Override public Integer map(Integer value) throws Exception { System.out.println("map ... 一个元素执行一次"); return value * value; } }
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态.
source在flink中用于加载数据源。
// fromElements元素集合转换 val elementDataStream = env.fromElements( Tuple2('aa', 1L),Tuple2('bb', 2L) ) // fromCollection数组转换(Java) String[] collections = new String[] { "aa", "bb" }; DataStream<String> collectionDatastream = env.fromCollection( Arrays.asList(collections) ); // List列表转换(Java) List<String> arrays = new ArrayList<>(); arrays.add("aa") arrays.add("bb") DataStream<String> arrayDataStream = env.fromCollection(arrays)
DataStream<String> socketTextStream = env.socketTextStream("localhost", 9000, "\n");
transformation算子:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/overview/
采用一个数据元并生成一个数据元.如数据值+1
参数:lambda表达式或MapFunction实现类
source数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<Integer> data = new ArrayList<>();
data.add(4);
data.add(2);
data.add(3);
data.add(1);
data.add(1);
//fromCollection加载数据源:source1:从collection接口
DataStreamSource<Integer> dataStreamSource = env.fromCollection(
data
);
map函数处理对每个值+1
1、lambda表达式:SingleOutputStreamOperator<Integer> map3 = dataStreamSource.map(s-> s+1); 2、MapFunction实现类之匿名内部类: private static void soutMap(DataStreamSource<Integer> dataStreamSource) { SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer integer) throws Exception { return integer + 1; } }); map.print(); } 3、MapFunction实现类: private class mapFunction implements MapFunction<Integer,Integer>{ @Override public Integer map(Integer o) throws Exception { return 0+1; } }
消费一个元素并产生零个或多个元素,
flatmap等于是map处理后加flat扁平化处理。对于1个值的处理上都是相同的,但是flatmap可以对map进来的值进行更多的处理,比如扁平化处理。
flatmap和map的区别:
1:输入:相同点都是输入是1个参数
2:输出:flatmap返回个数1对0或多,map只能1对1
3:返回值:可以看到map是有返回值的但是flatmap可以没有,需要返回的数据需要通过collector去返回。
source数据用上面map样例的数据,此处可以看做使用方式相同
private static SingleOutputStreamOperator<Object> soutFlatMap(DataStreamSource<Integer> dataStreamSource) { SingleOutputStreamOperator<Object> stream= dataStreamSource.flatMap(new FlatMapFunction<Integer, Object>() { @Override public void flatMap(Integer integer, Collector<Object> collector) throws Exception { collector.collect(integer+1); } }); return stream; } private static void soutMap(DataStreamSource<Integer> dataStreamSource) { SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer integer) throws Exception { return integer + 1; } }); map.print(); }
从下面一行数据来对比map和flatmap的区别,可以发现flatmap使用比map更便捷了,对map直接加了扁平化处理。比如对字符串的split用flatmap更好。
ArrayList<String> data2 = new ArrayList<>(); data2.add("2 3 4"); data2.add("3 4 2"); data2.add("1 2 1"); data2.add("1 1 1"); DataStreamSource<String> dataStreamSource2 = env.fromCollection( data2 ).setParallelism(1); soutFlatMap2(dataStreamSource2); SingleOutputStreamOperator<String[]> map2 = dataStreamSource2.map(new MapFunction<String, String[]>() { @Override public String[] map(String s) throws Exception { String[] split = s.split(" "); return split; } }); SingleOutputStreamOperator<Object> process = map2.process(new ProcessFunction<String[], Object>() { @Override public void processElement(String[] strings, Context context, Collector<Object> collector) throws Exception { for (String string : strings) { collector.collect(string); } } }); process.print(); } private static void soutFlatMap2(DataStreamSource<String> dataStreamSource2) { SingleOutputStreamOperator<Object> flatMapStream2 = dataStreamSource2.flatMap(new FlatMapFunction<String, Object>() { @Override public void flatMap(String s, Collector<Object> collector) throws Exception { for (String s1 : s.split(" ")) { collector.collect(s1); } } }); flatMapStream2.print(); }
把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中.一个分区中可以有多重不同的key.
在内部是使用的是key的hash分区来实现的.
Tuple2<String, Integer> integerIntegerTuple1 = new Tuple2<>("1", 2); Tuple2<String, Integer> integerIntegerTuple2 = new Tuple2<>("2", 3); Tuple2<String, Integer> integerIntegerTuple3 = new Tuple2<>("1", 2); Tuple2<String, Integer> integerIntegerTuple4= new Tuple2<>("1", 2); ArrayList<Tuple2<String, Integer>> objects = new ArrayList<>(); objects.add(integerIntegerTuple2); objects.add(integerIntegerTuple1); objects.add(integerIntegerTuple3); objects.add(integerIntegerTuple4); DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromCollection( objects ).setParallelism(1); /* KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = dataStreamSource.keyBy(0); tuple2TupleKeyedStream.sum(1).print();*/ dataStreamSource.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception { //System.out.println(t1); return Tuple2.of(t2.f0,t1.f1+t2.f1); } }).print();
结果:对相同key的value进行相加。
上面的运行和注释掉的内容sum会产生相同的结果。
(2,3)
(1,2)
(1,4)
(1,6)
Filter DataStream→DataStream 描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。
dataStreamSource.filter(s ->s>2).print();
[4, 2, 3, 1, 1]
结果
4
3
对相同key的value操作,一般用在keyedStream之后。
Window KeyedStream→WindowedStream 描述:可以在已经分区的KeyedStream上定义Windows。 Windows根据某些特征 (例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。 有关窗口的完 整说明,请参见windows。示例如下: // Last 5 seconds of data dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seco nds(5))); WindowAll DataStream→AllWindowedStream 描述:Windows可以在常规DataStream上定义。 Windows根据某些特征(例如, 在最后5秒内到达的数据)对所有流事件进行分组。 有关窗口的完整说明,请参见 windows。警告:在许多情况下,这是非并行转换。 所有记录将收集在windowAll 算子的一个任务中。 dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5) )); Window Apply WindowedStream→DataStream AllWindowedStream→DataStream 描述:将一般函数应用于整个窗口。 Window Reduce WindowedStream→DataStream 描述:将函数缩减函数应用于窗口并返回缩小的值。 Window Fold WindowedStream→DataStream 算子描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列 (1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”: 广播 DataStream→DataStream 描述:向每个分区广播数据元。dataStream.broadcast()
比如利用TumblingProcessingTimeWindows窗口函数统计5分钟内的数据量
Stream .map(new MapFunction<String, Tuple2<JSONObject, Long>>() {
@Override
public Tuple2<JSONObject, Long> map(String text) throws Exception {
return Tuple2.of(JSONObject.parseObject(text), 1L);
}
}).name("转text")
.keyBy(s -> s.f0.getJSONObject("data"))
.window(TumblingProcessingTimeWindows.of(Time.seconds(300)))
.sum(1).name("统计5分钟数据量")
.process(*)
Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作。
flink内置了一些sink也可以自定义。
.addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "topic_sensor", new SimpleStringSchema()));
.addSink(new ElasticsearchSink.Builder<WaterSensor>(esHosts, new ElasticsearchSinkFunction<WaterSensor>() {
@Override
public void process(WaterSensor element, RuntimeContext ctx, RequestIndexer indexer) {
// 1. 创建es写入请求
IndexRequest request = Requests
.indexRequest("sensor")
.type("_doc")
.id(element.getId())
.source(JSON.toJSONString(element), XContentType.JSON);
// 2. 写入到es
indexer.add(request);
}
}).build());
.addSink(new RichSinkFunction<WaterSensor>() { private PreparedStatement ps; private Connection conn; @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test?useSSL=false", "root", "aaaaaa"); ps = conn.prepareStatement("insert into sensor values(?, ?, ?)"); } @Override public void close() throws Exception { ps.close(); conn.close(); } @Override public void invoke(WaterSensor value, Context context) throws Exception { ps.setString(1, value.getId()); ps.setLong(2, value.getTs()); ps.setInt(3, value.getVc()); ps.execute(); } });
window必须在keyby分区后的数据才能使用
WindowAll 对于dataStream可以直接使用,适用于所有数据。
水印是用来保证一段时间内数据的顺序的
水印就是衡量数据处理进度的一个时间戳。可以通过水印保证水印前数据的有序性。
每当元素进入到Flink,会根据其事件时间,生成一个新的时间戳,即水印。对于t时间的水印,意味着Flink不会再接收t之前的数据,那么t之前的数据就可以进行排序产出顺序流了。
对于水印中迟到数据的处理,flink允许对迟到数据的处理。
1:创建水印
// 创建水印生产策略
WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 最大容忍的延迟时间
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
// 指定时间戳位事件时间即数据中的time
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
2:使用水印
// 指定水印和时间戳
stream.assignTimestampsAndWatermarks(wms )
水印+窗口+获取运行信息
stream
.assignTimestampsAndWatermarks(wms) // 指定水印和时间戳
.keyBy(WaterSensor: :getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
//context可以获取运行信息
String msg = "当前key: " + key
+ "窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd()/1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据 ";
out.collect(msg);
}
})
.print();
在流式数据中,数据是连续的,通常是无限的,对流中的所有元素进行计数是不可能的,所以在流上的聚合需要由window来划定范围,例如过去五分钟内用户浏览量的计算或者最后100个元素的和。window就是一种可以把无限数据切割为有限数据块的手段。
窗口可以由时间或者数量来做区分
按照窗口聚合的种类可以大致分为:
窗口的时间可以通过下面的几种时间单位来定义:
毫秒,Time.milliseconds(n)
秒,Time.seconds(n)
分钟,Time.minutes(n)
小时,Time.hours(n)
天,Time.days(n)
示例
//滚动窗口 stream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(2)))
//滑动窗口 stream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(4)))
//会话窗口 stream.keyBy(0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
//全局窗口 stream.keyBy(0).window(GlobalWindows.create()) //如果不加这个程序是启动不起来的
.trigger(CountTrigger.of(3)) .sum(1) .print();
由ExecutionEnvironment.ge tExecutionEnvironment()获取上下文环境进行加载数据
wordcount示例:
public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).groupBy(0).sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }
明明flink的相关依赖已经加到pom文件中,依赖也已经导入,但是执行main函数还是报相关的类没有。查看pom.xml发现问题
解决:
在项目的pom.xml中注释掉
<scope>provided</scope>
去掉即可。注释为<!--<scope>provided</scope>-->
,因为provided表明该包只在编译和测试的时候用
WindowedStream<Tuple2<String, Integer>, Integer, TimeWindow> window = map.keyBy(obj -> obj.f1).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Dstream.assignTimestampsAndWatermarks
这种错误多使用在创建tuple后导包错误了。使用了错误的API引用。
应该import org.apache.flink.api.java.tuple.Tuple2,结果import scala.Tuple2。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。