赞
踩
我在flink1.8中使用了flinksqlapi。我有两个流表Table1和Table2。
如果我们将receivedTime定义为表中接收数据的时间,那么我希望 joinTable1和Table2(在某个id上),并且只保留Table1.receivedTime>Table2.receivedTime的行。
首先,我尝试使用Flink SQL CURRENT_TIMESTAMP执行此操作:
NEW_TABLE1 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
但是当前的时间戳似乎总是返回 query被计算的时间戳。(看起来CURRENT_TIMESTAMP在此时被替换为当前日期,而不是一个动态值)。我觉得这种行为很奇怪,正常吗?
我尝试的第二个解决方案是使用Flink的处理时间:
NEW_TABLE1 : SELECT *, proctime as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, proctime as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
但在本例中,处理时间似乎是在执行 query时计算的。然后,在我的JOIN query中,两个处理时间总是相等的。
做我想做的事的正确方法是什么?
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。