Flink Kafka:在没有收到消息的时间间隔后,正常关闭来自kafka源的消耗flink的消息

问题描述 投票:0回答:1

我已将flinkkafkaconsumer作为源添加到我的流执行环境。我想在特定时间内没有收到新消息时关闭/停止flink消耗数据(类似于kafka polltime)。当前,它无限期运行,并且阻止执行继续进行下一步(验证消息)。请建议是否有任何解决方法。

注意:我尝试使用反序列化的endofstream,由于stream实际上是不确定的,因此它无法工作。

提前感谢。

apache-kafka apache-flink kafka-consumer-api flink-streaming
1个回答
1
投票

如果这是用于测试,则一种方法是创建自己的“包装” FlinkKafkaConsumer的自定义源。您的源的run()方法将从线程中调用Kafka源的run()方法,传入包装实际收集器的收集器,并在收集到任何内容时更新“最后收集时间”。然后,在源的run()方法中对此进行轮询,并在时间过多时调用Kakfa源的cancel()方法,然后也退出。

说了这么多,通常对于单元测试,您想使用模拟源,让您可以精确控制正在生成的内容以及何时生成,而不是旋转Kafka系统。

© www.soinside.com 2019 - 2024. All rights reserved.