我正在使用 Beam 的
KafkaIO.Read
函数创建流处理管道,并将其作为 Flink 作业运行。简单来说,该作业从 Kafka 接收消息并将其插入数据库。
通常,它运行没有任何问题,但在特定条件下,我观察到异常行为。
我们在高可用性(HA)模式下使用 Flink 和多个 JobManager。我们想要确认当领导者 JobManager 崩溃时作业是否继续处理而不会出现问题。由于我们是在 Kubernetes 上操作,所以我们使用了 delete pod 命令来杀死 Leader JobManager。
之后,我们确认备用JobManager正常恢复并重新执行作业。到目前为止,一切都很好。然而,恢复后,由于某种原因,Kafka 消息开始被处理两次。
这不是由于检查点和 Kafka 偏移进度之间的差异而导致的部分重复处理。恢复后添加的所有新 Kafka 消息均被处理两次。
经过进一步调查,我们发现虽然JobManager上只运行了一个作业,但TaskManager似乎执行了两次该作业。 (Flink UI 中只显示了一个作业,并且它正在以应用程序模式运行。)
我们还确认,在问题发生期间重新启动TaskManager可以恢复正常运行。
我的假设是,在 JobManager 关闭之前,它需要停止在 TaskManager 上运行的作业进程,但由于它在崩溃之前无法做到这一点,所以出现了问题。原因是,当我们通过停止接收器目标 PostgreSQL 进行测试时,JobManager 上的作业失败,即使 JobManager 崩溃,也不会出现相同的问题。因此,看起来当关闭不出意外时,它正确地停止了 TaskManager 上运行的作业进程。
基于上述假设,我认为如果我优雅地关闭JobManager,问题可能会得到解决。但是,由于我了解 Flink 基本上已正常关闭,所以我不确定如何修复来解决它。
最后,我想确认以下几点:
以下是 KafkaIO 部分和 Flink/Beam 配置的代码片段。
Beam KafkaIO 部分
pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read()
.withBootstrapServers(options.getBootstrapServerList())
.withTopic("test-topic")
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "test_group"))
.withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"))
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
梁构建.gradle
dependencies {
// App dependencies.
implementation "org.apache.beam:beam-sdks-java-core:2.56.0"
implementation "org.apache.beam:beam-runners-direct-java:2.56.0"
implementation "org.slf4j:slf4j-jdk14:1.7.32"
implementation "org.apache.kafka:kafka-clients:2.8.1"
implementation "org.apache.beam:beam-sdks-java-io-kafka:2.56.0"
implementation 'org.apache.beam:beam-sdks-java-io-jdbc:2.56.0'
implementation 'org.apache.beam:beam-runners-flink-1.17:2.56.0'
runtimeOnly "org.postgresql:postgresql:42.2.27"
runtimeOnly "org.hamcrest:hamcrest:2.2"
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.12.3'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.3'
// Tests dependencies.
testImplementation "junit:junit:4.13.2"
testImplementation 'org.hamcrest:hamcrest:2.2'
}
flink 配置清单文件
flink-conf.yaml: |+
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
kubernetes.namespace: test-namespace
kubernetes.service-account: flink-service-account
high-availability.type: kubernetes
high-availability.storageDir: hdfs://xx.xx.xx.xx:9000/flink/blue/ha
kubernetes.cluster-id: cluster20
jobmanager.execution.failover-strategy: full
restart-strategy.type: fixed-delay
restart-strategy.fixed-delay.delay: 5 s
restart-strategy.fixed-delay.attempts: 10
execution.checkpointing.interval: 10s
state.checkpoints.dir: hdfs://xx.xx.xx.xx:9000/flink/app_mode/checkpoints
state.checkpoint-storage: filesystem
state.checkpoints.num-retained: 3
parallelism.default: 6
taskmanager.numberOfTaskSlots: 6
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 2048m
(IP 地址、命名空间名称等被屏蔽) (flink版本为1.17.2)
我对 Apache-Beam 不熟悉,但我相信这个问题可能与 Kafka 没有及时提交偏移量有关。将数据写入数据库后,偏移量可能不会立即提交。 Kafka 会等待批次中的所有消息(由于 max.poll.records 设置,默认为 500 条消息)都被处理完毕。如果系统在提交偏移量之前失败或重新启动,这可能会导致重新处理。
要减少失败时处理的重复项数量,您可以:
有关更多信息,您可以参考这篇文章,它解释得很好偏移管理