AsyncRetryStrategy asyncRetryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
.build();
// apply the async I/O transformation with retry
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
这是来自 Flink 官方docs
的示例假设我们在 AsyncDatabaseRequest asyncInvoke 方法中进行了 5 个异步 API 调用,flink 是否针对输出流元素测试 AsyncRetryStrategy 中的谓词
在本例中,其类型为 Tuple2
重试是基于每条记录的。参见Flink的
AsyncWaitOperator.processElement()
,每个元素调用一次。如果启用重试,则它使用 RetryableResultHandlerDelegator
,它实现 ResultFuture
API,特别是 completeExceptionally()
调用,它将触发对失败的单个记录进行重试。