在我正在使用的 Flink 作业中使用 Pub/Sub 源时,以下错误每 15 秒就会重复一次。 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pubsub/
Source: pub-sub-source -> filter -> (Sink: Docstore Sink, Map -> Sink: pinot-kafka-sink) (2/2)#3 (b7790c8ab117377fb8d85b1af23b1d11) switched from RUNNING to FAILED.
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 14.999620292s. [remote_addr=pubsub.googleapis.com/123.123.12.12:443]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.pull(SubscriberGrpc.java:1641)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:73)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:77)
at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.pull(BlockingGrpcPubSubSubscriber.java:67)
at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.run(PubSubSource.java:128)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
我正在创建的 PubSubSource 如下所示:
PubSubSource pubSubSource =
PubSubSource.newBuilder()
.withDeserializationSchema(new SomeDeserializer())
.withProjectName("some-project")
.withSubscriptionName("some-subscription")
.withCredentials(GoogleCredentials.fromStream(file))
.build();
是否需要设置任何配置,以便在 Pub/Sub 订阅中没有可用消息时不会出现这种情况?
我已将检查点间隔修改为小于 Pub/Sub 订阅的确认截止日期。还是会发生这种情况。
您遇到的截止日期似乎是
pullTimeout
,默认订阅者上设置为 15 秒(当未定义显式工厂时):
pubSubSubscriberFactory = new DefaultPubSubSubscriberFactory(
// subscriptionName
ProjectSubscriptionName.format(projectName, subscriptionName),
// retries
3,
// pullTimeout
Duration.ofSeconds(15),
// messagesPerPull
100
);
您可能需要考虑为您的
PubSubSource
配置定义自定义工厂,以通过源上的 withPubSubSubscriberFactory()
函数进一步调整这些超时:
PubSubSource.newBuilder()
.withPubSubSubscriberFactory(...)
根据源代码,您可能还希望确保检查点频率明显高于与 PubSub 相关的超时,以避免这方面的问题:
“PubSubSource 需要启用检查点,并且检查点频率必须远低于 PubSub 超时才能重试消息。”
您可能需要考虑将
pullTimeout
增加到更大的值(例如 30 秒、1 分钟),同时设置检查点频率,因此设置一定百分比(一半、四分之一等),看看是否有助于解决超时问题。