在 apache beam ReadFromKafka 中的一个主题中可以确定分区位置之前的超时

问题描述 投票:0回答:1

我正在从事 Google 数据流工作,我正在使用 apache beam

ReadFromKafka
来消费主题消息。我正在消耗 4 个主题。管道曾经工作正常,在我们向 kafka 集群添加一个新代理并触发重新平衡后,消费者开始在一个特定主题上失败,但成功地继续为该主题工作 其他 3 个主题。

The error is: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition topic_name-0 could be determined 

这是我的代码:

(
            pcoll
            | f"Read From Kafka {self.transf_label}"
            >> ReadFromKafka(
                consumer_config=self.kafka_consumer_config,
                topics=_topic_list,
                with_metadata=True,
            )
            | f"Extract KVT {self.transf_label}" >> Map(extract_key_value_topic)
            | f"Decode messages {self.transf_label}" >> ParDo(DecodeKafkaMessage())
        )

在日志中我可以看到

[Consumer clientId=consumer-group-dummy-4, groupId=consumer-group-dummy] Subscribed to partition(s): topic_name-0

但几秒钟后,它因超时错误而失败,同时不断拉取其他主题的消息

这是配置:

{
        "bootstrap.servers": BROKERS,
        "security.protocol": SECURITY_PROTOCOL,
        "sasl.mechanism": SASL_MECHANISM,
        "group.id": "consumer-group-dummy",
        "session.timeout.ms": "60000",
        "sasl.jaas.config": f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{SASL_USERNAME}" password="{SASL_PASSWORD}";',
        "auto.offset.reset": "latest",

    }

需要考虑的一件事是我们的其他 Kafka 消费者应用程序没有抛出任何问题,这种行为只有在 apache beam pipeline 中才会被注意到。此外,当我尝试使用 Kafka 汇合库和简单的 python 脚本在本地触发具有相同配置和相同消费者组的消费者时,它似乎工作正常并拉取消息。

我在 Apache flink 中发现了这个 bug,在 Java 的 Beam Kafka 连接器中是否也一样?

python java apache-kafka google-cloud-dataflow apache-beam
1个回答
0
投票

我的问题是新添加的代理没有在内部路由,因此应用程序很难到达持有导致超时的分区的代理。

请注意,此问题可能是由其他几个因素引起的,以下是我在搜索时遇到的一些因素:

其他因素:

过时的元数据:

控制节点负责更新主题元数据。如果主题分区的领导层发生了移动,如果消费者不知道哪个代理是该分区的当前领导层,则消费者将无法读取它。过时的元数据意味着控制器节点没有完成其工作,应该被替换。 zkCli.sh 删除 /controller 以强制选举新控制器

集群缩放/除垢:

详情这里

© www.soinside.com 2019 - 2024. All rights reserved.