使用 GCP PubSub 源时在 Flink 作业上获取 DEADLINE_EXCEEDED

问题描述 投票:0回答:1

在我正在使用的 Flink 作业中使用 Pub/Sub 源时,以下错误每 15 秒就会重复一次。 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pubsub/

Source: pub-sub-source -> filter -> (Sink: Docstore Sink, Map -> Sink: pinot-kafka-sink) (2/2)#3 (b7790c8ab117377fb8d85b1af23b1d11) switched from RUNNING to FAILED.
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 14.999620292s. [remote_addr=pubsub.googleapis.com/123.123.12.12:443]
    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
    at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.pull(SubscriberGrpc.java:1641)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:73)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:67)
    at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.run(PubSubSource.java:128)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)

我正在创建的 PubSubSource 如下所示:

PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .build();

是否需要设置任何配置,以便在 Pub/Sub 订阅中没有可用消息时不会出现这种情况?

我已将检查点间隔修改为小于 Pub/Sub 订阅的确认截止日期。还是会发生这种情况。

apache-flink flink-streaming google-cloud-pubsub
1个回答
0
投票

您遇到的截止日期似乎是

pullTimeout
,默认订阅者上设置为 15 秒(当未定义显式工厂时):

pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(
    // subscriptionName
    ProjectSubscriptionName.format(projectName, subscriptionName),
    // retries
    3,
    // pullTimeout
    Duration.ofSeconds(15),
    // messagesPerPull
    100
);

您可能需要考虑为您的

PubSubSource
配置定义自定义工厂,以通过源上的
withPubSubSubscriberFactory()
函数进一步调整这些超时:

PubSubSource.newBuilder()
    .withPubSubSubscriberFactory(...)

根据源代码,您可能还希望确保检查点频率明显高于与 PubSub 相关的超时,以避免这方面的问题:

“PubSubSource 需要启用检查点,并且检查点频率必须远低于 PubSub 超时才能重试消息。”

您可能需要考虑将

pullTimeout
增加到更大的值(例如 30 秒、1 分钟),同时设置检查点频率,因此设置一定百分比(一半、四分之一等),看看是否有助于解决超时问题。

© www.soinside.com 2019 - 2024. All rights reserved.