使用 GCP PubSub 源在 Flink 作业上获取 http2 异常

问题描述 投票:0回答:2

我有一个使用 GCP PubSub 作为源的 flink 作业。尽管我能够处理在 pubsub 主题上收到的消息,但我发现它存在一些问题:

  1. 之前处理过的消息再次出现(表示未确认)
  2. 我在我的 flink 作业中看到 grpc http2 异常以及检查点日志

异常堆栈跟踪:

io.grpc.StatusRuntimeException: INTERNAL: http2 exception
    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
    at com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
    at org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
    at org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
    at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:330)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1092)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1057)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1080)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil.headerListSizeExceeded(Http2CodecUtil.java:245)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.headerSizeExceeded(DefaultHttp2FrameReader.java:694)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.addFragment(DefaultHttp2FrameReader.java:710)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:481)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:491)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1526)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1275)
    at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1322)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
    at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

以下似乎是根本原因

Caused by: io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception: Header size exceeded max allowed size (10240)

这是我配置 PubSubSource 对象的方法:

    PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .withPubSubSubscriberFactory(100, Duration.ofSeconds(90), 3)
            .build();

检查点间隔为 60 秒(小于 ack 截止时间)

以上2个问题可能是什么原因造成的?

apache-flink flink-streaming google-cloud-pubsub
2个回答
1
投票

我将

maxInboundMetadataSize
进一步增加到1MB,然后它就开始工作了。

public class CustomPubSubSubscriberFactory implements PubSubSubscriberFactory {
  private final int retries;
  private final Duration timeout;
  private final int maxMessagesPerPull;
  private final String projectSubscriptionName;

  public CustomPubSubSubscriberFactory(
      String projectSubscriptionName, int retries, Duration pullTimeout, int maxMessagesPerPull) {
    this.retries = retries;
    this.timeout = pullTimeout;
    this.maxMessagesPerPull = maxMessagesPerPull;
    this.projectSubscriptionName = projectSubscriptionName;
  }

  @Override
  public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
    ManagedChannel channel =
        NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
            .negotiationType(NegotiationType.TLS)
            .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
            .maxInboundMetadataSize(1048576)
            .build();

    PullRequest pullRequest =
        PullRequest.newBuilder()
            .setMaxMessages(maxMessagesPerPull)
            .setSubscription(projectSubscriptionName)
            .build();
    SubscriberGrpc.SubscriberBlockingStub stub =
        SubscriberGrpc.newBlockingStub(channel)
            .withCallCredentials(MoreCallCredentials.from(credentials));
    return new BlockingGrpcPubSubSubscriber(
        projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
  }
}

调用为

PubSubSource pubSubSource =
        PubSubSource.newBuilder()
            .withDeserializationSchema(new SomeDeserializer())
            .withProjectName("some-project")
            .withSubscriptionName("some-subscription")
            .withCredentials(GoogleCredentials.fromStream(file))
            .withPubSubSubscriberFactory(
                new CustomPubSubSubscriberFactory(
                    ProjectSubscriptionName.format(
                        "some-project", "some-subscription"),
                    3,
                    Duration.ofSeconds(90),
                    100))            
            .build();


0
投票

向@Amrith M 答案添加更多详细信息,以帮助将来面临类似问题的社区成员。

出现上述错误可能是由于以下原因:

  • 超出 gRPC 标头大小限制。

    -> 显然,客户端可以使用 maxInboundMetadataSize() 来增加 gRPC 标头大小限制

  • 之前较高,但 gRPC 客户端配置将其降低至 10240。

    -> Flink 连接器可以间接进行更改。

  • 一些奇怪的网络问题。

欲了解更多信息,请参阅link1link2link3

© www.soinside.com 2019 - 2024. All rights reserved.