赞
踩
1、lag的FIX的代码: LeadLagAggFunction. 可以写sql测试代码。
/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala
- @Test
- def testProcTimeBoundedPartitionedRowsOver2(): Unit = {
- val t = failingDataSource(TestData.tupleData5)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, b, lead(b, 2, 3) over (partition by a order by b), lag(b, 1, 3) over (partition " +
- "by a order by b) FROM MyTable ";
-
- val sink = new TestingAppendSink
- tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
-
- env.execute()
- println( sink.getAppendResults.sorted)
- }
2、单元测试代码:
OverWindowITCase.scala 可以查询相关测试
- @Test
- def testLagFunc(): Unit = {
- checkResult(
- "SELECT a, b, lag(b, 2, 3) over (partition by a order by b) FROM Table6",
- Seq(
- row(1, 1.1, 3.0),
- row(5, -5.9, 3.0),
- row(5, -2.8, 3.0),
- row(5, 0.7, -5.9),
- row(5, 2.71, -2.8),
- row(5, 3.9, 0.7),
- row(4, 3.14, 3.0),
- row(4, 3.14, 3.0),
- row(4, 3.15, 3.14),
- row(4, 3.16, 3.14),
- row(2, -2.4, 3.0),
- row(2, 2.5, 3.0),
- row(3, -9.77, 3.0),
- row(3, 0.0, 3.0),
- row(3, 0.08, -9.77)))
-
- checkResult(
- "SELECT a, b, lag(b, -2, 3) over (partition by a order by b) FROM Table6",
- Seq(
- row(1, 1.1, 3.0),
- row(5, -5.9, 0.7),
- row(5, -2.8, 2.71),
- row(5, 0.7, 3.9),
- row(5, 2.71, 3.0),
- row(5, 3.9, 3.0),
- row(4, 3.14, 3.15),
- row(4, 3.14, 3.16),
- row(4, 3.15, 3.0),
- row(4, 3.16, 3.0),
- row(2, -2.4, 3.0),
- row(2, 2.5, 3.0),
- row(3, -9.77, 0.08),
- row(3, 0.0, 3.0),
- row(3, 0.08, 3.0)))
-
- checkResult(
- "SELECT a, b, lag(b, -2, b) over (partition by a order by b) FROM Table6",
- Seq(
- row(1, 1.1, 1.1),
- row(5, -5.9, 0.7),
- row(5, -2.8, 2.71),
- row(5, 0.7, 3.9),
- row(5, 2.71, 2.71),
- row(5, 3.9, 3.9),
- row(4, 3.14, 3.15),
- row(4, 3.14, 3.16),
- row(4, 3.15, 3.15),
- row(4, 3.16, 3.16),
- row(2, -2.4, -2.4),
- row(2, 2.5, 2.5),
- row(3, -9.77, 0.08),
- row(3, 0.0, 0.0),
- row(3, 0.08, 0.08)))
-
- checkResult(
- "SELECT a, b, lag(b, a, 3) over (partition by a order by b) FROM Table6",
- Seq(
- row(1, 1.1, 3.0),
- row(5, -5.9, 3.0),
- row(5, -2.8, 3.0),
- row(5, 0.7, 3.0),
- row(5, 2.71, 3.0),
- row(5, 3.9, 3.0),
- row(4, 3.14, 3.0),
- row(4, 3.14, 3.0),
- row(4, 3.15, 3.0),
- row(4, 3.16, 3.0),
- row(2, -2.4, 3.0),
- row(2, 2.5, 3.0),
- row(3, -9.77, 3.0),
- row(3, 0.0, 3.0),
- row(3, 0.08, 3.0)))
-
- checkResult(
- "SELECT a-1, b, lag(b, a-1, 3) over (partition by a order by b) FROM Table6",
- Seq(
- row(0, 1.1, 1.1),
- row(4, -5.9, 3.0),
- row(4, -2.8, 3.0),
- row(4, 0.7, 3.0),
- row(4, 2.71, 3.0),
- row(4, 3.9, -5.9),
- row(3, 3.14, 3.0),
- row(3, 3.14, 3.0),
- row(3, 3.15, 3.0),
- row(3, 3.16, 3.14),
- row(1, -2.4, 3.0),
- row(1, 2.5, -2.4),
- row(2, -9.77, 3.0),
- row(2, 0.0, 3.0),
- row(2, 0.08, -9.77)))
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。