我有一个 Flink 作业,它从 Kafka 主题(有 6 个分区)读取数据,通过窗口(当前为 24 小时窗口)处理每个事件,然后将窗口事件接收到 Blob 存储容器。
我使用批量模式下的 Flink FileSink 来写入 parquet 文件。零件文件已创建,但从未最终确定。我已将状态后端设置为使用 RocksDB,并且我的检查点文件存储是另一个 Blob 存储容器。
部件文件已创建,但从未在 Blob 存储上最终确定,即使在读取 Kafka 上的所有消息之后也是如此。所有检查点都失败了。
检查点失败的原因是什么?
以并行度 1 运行作业时,将读取 Kafka 消息、处理窗口(我有日志条目需要确认)并创建部分文件,但从未最终确定。
检查点失败,并显示一条消息,表明检查点在完成之前已过期。
我尝试过增加超时(增加到一个小时),将并发检查点限制为 1 并将检查点之间的暂停设置为 30 分钟(作为示例),但似乎没有任何帮助。
我也尝试过增加并行度,但这没有帮助。
该作业在一个任务管理器上运行,我在日志中没有看到任何可能指向错误的内容(我启用了调试模式)。
这可能是因为接收器提交者提交的时间比默认检查点 10 分钟超时间隔长。 参数是“execution.checkpointing.timeout”。 或 env.getCheckpointConfig().setCheckpointTimeout(n); 但是,我会分析为什么接收器无法处理此负载并尝试对其进行调整。 流中的接收器及其逻辑是什么,如果是 RDBMS,请考虑是否可以调整 JDBC 驱动程序以更有效地加载数据或调整数据库和受影响的表。