Apache Flink AsyncRetryStrategy 与 RichAsyncFunction

问题描述 投票:0回答:1
AsyncRetryStrategy asyncRetryStrategy =
    new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
        .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
        .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
        .build();

// apply the async I/O transformation with retry
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);

这是来自 Flink 官方docs

的示例

假设我们在 AsyncDatabaseRequest asyncInvoke 方法中进行了 5 个异步 API 调用,flink 是否针对输出流元素测试 AsyncRetryStrategy 中的谓词 在本例中,其类型为 Tuple2,并重试整个 asyncInvoke 方法,或者它是否会针对 5 个异步 API 调用中每个调用的输出测试谓词,并重试 5 个中失败的那些?

apache-flink flink-streaming
1个回答
0
投票

重试是基于每条记录的。参见Flink的

AsyncWaitOperator.processElement()
,每个元素调用一次。如果启用重试,则它使用
RetryableResultHandlerDelegator
,它实现
ResultFuture
API,特别是
completeExceptionally()
调用,它将触发对失败的单个记录进行重试。

© www.soinside.com 2019 - 2024. All rights reserved.