如何捕获最大重启次数后RestartSource的错误? 我想在源失败最大次数后做一些事情。 我可以在日志中看到源重新启动。 我尝试添加 withAttributes 但它从未被调用。
return RestartSource.onFailuresWithBackoff(restartSettings, () -> Consumer
.committableSource(getConsumerSettings(), topics)
.log("error on receiver topic")
.mapMaterializedValue(ctrl -> {
control = ctrl;
return NotUsed.getInstance();
})
.withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
log.error("Stream has failed", e);
return (Supervision.Directive) Supervision.stop();
})));
如有任何建议,我们将不胜感激。
好的,我成功了。在这种情况下,属性似乎从未被调用,但这会导致整个流失败,因此结果将在附加的流和接收器完成上。就我而言,我有类似的事情:
streamCompletion =
createSource()
...
runWith(Sink.ignore(), actorSystem);
然后我们可以在streamCompletion上得到失败信息,如下所示:
streamCompletion.whenComplete((done, throwable) -> {
if (throwable != null) {
log.error("Some error occurred {} {}", done, throwable.getMessage(), throwable);
}
});