Flink SQL中跳跃窗口的指数衰减移动平均值:转换时间

问题描述 投票:0回答:1

现在我们在Flink中使用带有花式窗口的SQL,我试图将“衰减移动平均值”称为“未来Flink版本中Table API和SQL的可能性。”从他们的SQL roadmap/preview 2017-03 post

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

这是我的尝试(受到the calcite decaying example的启发):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

时间是处理时间,我们通过从AppendStream表创建write_position作为proctime获得:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

我收到这个错误:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

我已经尝试将proctime转换为我所知道的其他任何类型(试图达到NUMERIC承诺的土地),而我却找不到如何使它工作。

我错过了什么吗? proctime是一些非常特殊的'系统变更号'时间,你无法转换?如果是这样,仍然必须有一些方法将它与HOP_START(proctime,...)值进行比较。

apache-flink flink-streaming windowing apache-calcite flink-sql
1个回答
1
投票

您可以使用timestampDiff减去两个时间点(请参阅docs)。你这样使用它

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

其中timepointunit可以是SECOND,MINUTE,HOUR,DAY,MONTH或YEAR。

我没有用处理时间来尝试这个,但它确实适用于事件时间字段,所以希望它会。

© www.soinside.com 2019 - 2024. All rights reserved.