我已将flinkkafkaconsumer作为源添加到我的流执行环境。我想在特定时间内没有收到新消息时关闭/停止flink消耗数据(类似于kafka polltime)。当前,它无限期运行,并且阻止执行继续进行下一步(验证消息)。请建议是否有任何解决方法。
注意:我尝试使用反序列化的endofstream,由于stream实际上是不确定的,因此它无法工作。
提前感谢。
如果这是用于测试,则一种方法是创建自己的“包装” FlinkKafkaConsumer
的自定义源。您的源的run()
方法将从线程中调用Kafka源的run()
方法,传入包装实际收集器的收集器,并在收集到任何内容时更新“最后收集时间”。然后,在源的run()
方法中对此进行轮询,并在时间过多时调用Kakfa源的cancel()
方法,然后也退出。
说了这么多,通常对于单元测试,您想使用模拟源,让您可以精确控制正在生成的内容以及何时生成,而不是旋转Kafka系统。