HA 模式下 JobManager 故障转移后 Flink 作业两次处理 Kafka 消息

问题描述 投票:0回答:1

我正在使用 Beam 的

KafkaIO.Read
函数创建流处理管道,并将其作为 Flink 作业运行。简单来说,该作业从 Kafka 接收消息并将其插入数据库。 通常,它运行没有任何问题,但在特定条件下,我观察到异常行为。

我们在高可用性(HA)模式下使用 Flink 和多个 JobManager。我们想要确认当领导者 JobManager 崩溃时作业是否继续处理而不会出现问题。由于我们是在 Kubernetes 上操作,所以我们使用了 delete pod 命令来杀死 Leader JobManager。

之后,我们确认备用JobManager正常恢复并重新执行作业。到目前为止,一切都很好。然而,恢复后,由于某种原因,Kafka 消息开始被处理两次。

这不是由于检查点和 Kafka 偏移进度之间的差异而导致的部分重复处理。恢复后添加的所有新 Kafka 消息均被处理两次。

经过进一步调查,我们发现虽然JobManager上只运行了一个作业,但TaskManager似乎执行了两次该作业。 (Flink UI 中只显示了一个作业,并且它正在以应用程序模式运行。)

我们还确认,在问题发生期间重新启动TaskManager可以恢复正常运行。

我的假设是,在 JobManager 关闭之前,它需要停止在 TaskManager 上运行的作业进程,但由于它在崩溃之前无法做到这一点,所以出现了问题。原因是,当我们通过停止接收器目标 PostgreSQL 进行测试时,JobManager 上的作业失败,即使 JobManager 崩溃,也不会出现相同的问题。因此,看起来当关闭不出意外时,它正确地停止了 TaskManager 上运行的作业进程。

基于上述假设,我认为如果我优雅地关闭JobManager,问题可能会得到解决。但是,由于我了解 Flink 基本上已正常关闭,所以我不确定如何修复来解决它。

最后,我想确认以下几点:

  1. 出现此问题的原因是什么?
  2. 可以采取什么措施来解决这个问题? 如果有人熟悉这个问题,如果您能分享您的见解,我将不胜感激。

以下是 KafkaIO 部分和 Flink/Beam 配置的代码片段。

Beam KafkaIO 部分

pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read()
                        .withBootstrapServers(options.getBootstrapServerList())
                        .withTopic("test-topic")
                        .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test_group"))
                        .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"))
                        .commitOffsetsInFinalize()
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        .withoutMetadata())

梁构建.gradle

dependencies {
    // App dependencies.
    implementation "org.apache.beam:beam-sdks-java-core:2.56.0"
    implementation "org.apache.beam:beam-runners-direct-java:2.56.0"
    implementation "org.slf4j:slf4j-jdk14:1.7.32"
    implementation "org.apache.kafka:kafka-clients:2.8.1"
    implementation "org.apache.beam:beam-sdks-java-io-kafka:2.56.0"
    implementation 'org.apache.beam:beam-sdks-java-io-jdbc:2.56.0'
    implementation 'org.apache.beam:beam-runners-flink-1.17:2.56.0'
    runtimeOnly "org.postgresql:postgresql:42.2.27"
    runtimeOnly "org.hamcrest:hamcrest:2.2"

    implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.12.3'
    implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.3'

    // Tests dependencies.
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

flink 配置清单文件

flink-conf.yaml: |+
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    kubernetes.namespace: test-namespace
    kubernetes.service-account: flink-service-account
    high-availability.type: kubernetes
    high-availability.storageDir: hdfs://xx.xx.xx.xx:9000/flink/blue/ha
    kubernetes.cluster-id: cluster20
    jobmanager.execution.failover-strategy: full
    restart-strategy.type: fixed-delay
    restart-strategy.fixed-delay.delay: 5 s
    restart-strategy.fixed-delay.attempts: 10
    execution.checkpointing.interval: 10s
    state.checkpoints.dir: hdfs://xx.xx.xx.xx:9000/flink/app_mode/checkpoints
    state.checkpoint-storage: filesystem
    state.checkpoints.num-retained: 3
    parallelism.default: 6
    taskmanager.numberOfTaskSlots: 6
    jobmanager.memory.process.size: 2048m
    taskmanager.memory.process.size: 2048m

(IP 地址、命名空间名称等被屏蔽) (flink版本为1.17.2)

apache-flink apache-beam
1个回答
0
投票

我对 Apache-Beam 不熟悉,但我相信这个问题可能与 Kafka 没有及时提交偏移量有关。将数据写入数据库后,偏移量可能不会立即提交。 Kafka 会等待批次中的所有消息(由于 max.poll.records 设置,默认为 500 条消息)都被处理完毕。如果系统在提交偏移量之前失败或重新启动,这可能会导致重新处理。

要减少失败时处理的重复项数量,您可以:

  1. 减少 max.poll.records:这将限制每个批次中的消息数量,这意味着如果批次中出现问题,将会有更少的消息面临重复的风险。
  2. 手动提交偏移量:每次数据库写入后,手动提交Kafka偏移量,以确保消息在重新平衡后不会被重新处理。

有关更多信息,您可以参考这篇文章,它解释得很好偏移管理

© www.soinside.com 2019 - 2024. All rights reserved.