现在我们在Flink中使用带有花式窗口的SQL,我试图将“衰减移动平均值”称为“未来Flink版本中Table API和SQL的可能性。”从他们的SQL roadmap/preview 2017-03 post:
table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)
这是我的尝试(受到the calcite decaying example的启发):
SELECT
lb_index one_key,
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
SUM(Y *
EXP(
proctime -
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
时间是处理时间,我们通过从AppendStream表创建write_position作为proctime获得:
tEnv.registerTable(
"write_position",
tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))
我收到这个错误:
Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'.
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'
我已经尝试将proctime转换为我所知道的其他任何类型(试图达到NUMERIC承诺的土地),而我却找不到如何使它工作。
我错过了什么吗? proctime是一些非常特殊的'系统变更号'时间,你无法转换?如果是这样,仍然必须有一些方法将它与HOP_START(proctime,...)值进行比较。
您可以使用timestampDiff减去两个时间点(请参阅docs)。你这样使用它
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
其中timepointunit可以是SECOND,MINUTE,HOUR,DAY,MONTH或YEAR。
我没有用处理时间来尝试这个,但它确实适用于事件时间字段,所以希望它会。