赞
踩
我的电脑是8线程,当我运行下面这段代码时,可以输出结果,但是当把并行度注释掉时,就不输出结果了,这是为什么呢?输入数据一样,watermark应该都是一样的啊
输入数据为:
1585721697000,xiao,8
1585721700000,xiao,10
1585721705000,xiao,4
1585721715000,xiao,9
case class Line(id:Long,name:String,age:Int) object EventTimeWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(1) //指定时间类型 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val value: DataStream[String] = env.socketTextStream("localhost", 9999) //指定wotermark 分配器 val value1: DataStream[String] = value.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)) { override def extractTimestamp(t: String): Long = { t.split(",")(0).trim.toLong } }) val value5: DataStream[Line] = value1.map(line => { val arr: Array[String] = line.split(",") Line(arr(0).toLong, arr(1), arr(2).toInt) }) //根据name分组 val value2: KeyedStream[Line, String] = value5.keyBy(_.name) //10秒一个滚动窗口 val value3: WindowedStream[Line, String, TimeWindow] = value2.timeWindow(Time.seconds(10)) val value4: DataStream[Line] = value3.sum(2) value4.print("sum age") env.execute() } }
最后确定,当从socket读入数据后,不能先进行map操作,而是先分配时间戳和watermark,这样就能正确输出结果了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。