我有一个有趣的用例,我想用Flink测试。我有一个Message
流,它是PASS
或FAIL
。现在,如果消息是FAIL
类型,我有一个下游ProcessFunction
,它保存Message
状态,然后将pause
命令发送到依赖于此的所有内容。当我收到与之前收到的PASS
相关联的FAIL
消息(通过消息ID键入)时,我将resume
命令发送到我之前暂停的所有内容。
现在我计划使用状态TTL使存储的FAIL
状态到期并在一定超时后恢复所有内容,即使我没有收到具有相同消息ID的PASS
消息。这可以单独用Flink完成,还是需要一些外部定时器来向我的程序发送超时消息?
为了让它在Flink工作,我想到了这样的事情:
对于每个Message
,添加时间戳并将其传递给流程函数,该函数在发送之前一直等到current_ts - timestamp == timeout
以恢复模块暂停的所有内容。有没有更好的方法,或者你们认为这样可以吗?
似乎使用计时器使状态到期(通过在onTimer方法中调用state.clear())而不是使用状态TTL会更直接。相同的onTimer方法也可以安排事情同时恢复。