当前位置:   article > 正文

flink 聚合函数代码研究_flink sql lag

flink sql lag

1、lag的FIX的代码: LeadLagAggFunction. 可以写sql测试代码。

 

/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverWindowITCase.scala

  1. @Test
  2. def testProcTimeBoundedPartitionedRowsOver2(): Unit = {
  3. val t = failingDataSource(TestData.tupleData5)
  4. .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
  5. tEnv.registerTable("MyTable", t)
  6. val sqlQuery = "SELECT a, b, lead(b, 2, 3) over (partition by a order by b), lag(b, 1, 3) over (partition " +
  7. "by a order by b) FROM MyTable ";
  8. val sink = new TestingAppendSink
  9. tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
  10. env.execute()
  11. println( sink.getAppendResults.sorted)
  12. }

 

 

2、单元测试代码:

OverWindowITCase.scala 可以查询相关测试

 

  1. @Test
  2. def testLagFunc(): Unit = {
  3. checkResult(
  4. "SELECT a, b, lag(b, 2, 3) over (partition by a order by b) FROM Table6",
  5. Seq(
  6. row(1, 1.1, 3.0),
  7. row(5, -5.9, 3.0),
  8. row(5, -2.8, 3.0),
  9. row(5, 0.7, -5.9),
  10. row(5, 2.71, -2.8),
  11. row(5, 3.9, 0.7),
  12. row(4, 3.14, 3.0),
  13. row(4, 3.14, 3.0),
  14. row(4, 3.15, 3.14),
  15. row(4, 3.16, 3.14),
  16. row(2, -2.4, 3.0),
  17. row(2, 2.5, 3.0),
  18. row(3, -9.77, 3.0),
  19. row(3, 0.0, 3.0),
  20. row(3, 0.08, -9.77)))
  21. checkResult(
  22. "SELECT a, b, lag(b, -2, 3) over (partition by a order by b) FROM Table6",
  23. Seq(
  24. row(1, 1.1, 3.0),
  25. row(5, -5.9, 0.7),
  26. row(5, -2.8, 2.71),
  27. row(5, 0.7, 3.9),
  28. row(5, 2.71, 3.0),
  29. row(5, 3.9, 3.0),
  30. row(4, 3.14, 3.15),
  31. row(4, 3.14, 3.16),
  32. row(4, 3.15, 3.0),
  33. row(4, 3.16, 3.0),
  34. row(2, -2.4, 3.0),
  35. row(2, 2.5, 3.0),
  36. row(3, -9.77, 0.08),
  37. row(3, 0.0, 3.0),
  38. row(3, 0.08, 3.0)))
  39. checkResult(
  40. "SELECT a, b, lag(b, -2, b) over (partition by a order by b) FROM Table6",
  41. Seq(
  42. row(1, 1.1, 1.1),
  43. row(5, -5.9, 0.7),
  44. row(5, -2.8, 2.71),
  45. row(5, 0.7, 3.9),
  46. row(5, 2.71, 2.71),
  47. row(5, 3.9, 3.9),
  48. row(4, 3.14, 3.15),
  49. row(4, 3.14, 3.16),
  50. row(4, 3.15, 3.15),
  51. row(4, 3.16, 3.16),
  52. row(2, -2.4, -2.4),
  53. row(2, 2.5, 2.5),
  54. row(3, -9.77, 0.08),
  55. row(3, 0.0, 0.0),
  56. row(3, 0.08, 0.08)))
  57. checkResult(
  58. "SELECT a, b, lag(b, a, 3) over (partition by a order by b) FROM Table6",
  59. Seq(
  60. row(1, 1.1, 3.0),
  61. row(5, -5.9, 3.0),
  62. row(5, -2.8, 3.0),
  63. row(5, 0.7, 3.0),
  64. row(5, 2.71, 3.0),
  65. row(5, 3.9, 3.0),
  66. row(4, 3.14, 3.0),
  67. row(4, 3.14, 3.0),
  68. row(4, 3.15, 3.0),
  69. row(4, 3.16, 3.0),
  70. row(2, -2.4, 3.0),
  71. row(2, 2.5, 3.0),
  72. row(3, -9.77, 3.0),
  73. row(3, 0.0, 3.0),
  74. row(3, 0.08, 3.0)))
  75. checkResult(
  76. "SELECT a-1, b, lag(b, a-1, 3) over (partition by a order by b) FROM Table6",
  77. Seq(
  78. row(0, 1.1, 1.1),
  79. row(4, -5.9, 3.0),
  80. row(4, -2.8, 3.0),
  81. row(4, 0.7, 3.0),
  82. row(4, 2.71, 3.0),
  83. row(4, 3.9, -5.9),
  84. row(3, 3.14, 3.0),
  85. row(3, 3.14, 3.0),
  86. row(3, 3.15, 3.0),
  87. row(3, 3.16, 3.14),
  88. row(1, -2.4, 3.0),
  89. row(1, 2.5, -2.4),
  90. row(2, -9.77, 3.0),
  91. row(2, 0.0, 3.0),
  92. row(2, 0.08, -9.77)))
  93. }

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号