关闭scala.concurrent.Future的onFailure延续不能按预期工作

问题描述 投票:0回答:1

我遇到了一个问题,我有两个方法,第一个在循环中调用第二个,第二个创建Future,如下所示:

public class WorkersCoordinator {
    private static Logger LOGGER = 
        LoggerFactory.getLogger(WorkersCoordinator.class);

    private final Timeout timeout;

    private final ActorSystem system;

    private final List<Class<? extends BaseWorker>> workers;

    private final Map<Class, ActorRef> refMap;

    private final WorkResultPackageQueue pkgQueue;

    private final ActorFactory actorFactory;

    @Autowired
    public WorkersCoordinator(final ApplicationConfiguration config,
                             final ActorSystem system,
                             final ActorFactory actorFactory, 
                             final WorkerFactory workerFactory,
                             final WorkResultPackageQueue pkgQueue) {
        timeout = new Timeout(config.getTimeoutInMilliseconds(), 
                              TimeUnit.MILLISECONDS);

        this.system = system;
        this.actorFactory = actorFactory;
        this.pkgQueue = pkgQueue;

        refMap = Map.newHashMap();
        workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
    }

    public void delegateWorkers() {
        for (Class<? extends BaseWorker> worker : workers) {
            if (refMap.containsKey(worker) continue;
            sendWork(worker);
        }
    }

    private void sendWork(Class<? extends BaseWorker> worker) {
        // GetDataActor extends AbstractActor
        ActorRef actorRef = actorFactory.create(GetDataActor.class);
        Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);

        responseRef.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable failure) throws Throwable {
                LOGGER.error("Worker {} encountered a problem - cancelling.", 
                             worker.getSimpleName());
                if (refMap.containsKey(worker)) {
                    refMap.remove(worker);
                }
            }
        }, system.dispatcher());

        responseRef.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object msg) throws Throwable {
                if (msg instanceof WorkResultPackage) {
                    final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                    LOGGER.info(
                        "Received AssetDataPackage from {}, containing {} items",
                        reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                        reportPackage.getPkg().getData().size());

                    pkgQueue.enqueue(reportPackage);
                } else {
                    LOGGER.eror(
                        "Expected to receive WorkResultPackage Object but received: {}",
                        msg.getClass());
                        throw new UnknownObjectReceived(msg);
                }
            }
        }, system.dispatcher());

        refMap.put(worker, actorRef);
    }
}

问题是,我认为responseRef.onFailure制造的封闭不会像我期望的那样发挥作用。如果我用3个工作程序调用它,其中一个我配置为失败,则处理失败但是日志记录是不确定的,即使它应该始终是我标记为失败的那个工作程序将报告失败。我是这个技术堆栈(Java,Scala Futures和AKKA)的新手,以及我找到这个已建立的模式的已建立的代码库,所以我不知道我是否忽视了Java / Scala期货中的某些东西或误解了闭包。这里的关键点是我需要报告哪个工人失败并将其从refMap中移除,以便不再考虑进行中。更奇怪的是,即使错误的工人被报告为失败,所有工人似乎都要完成并从refMap中移除。

更新:在没有得到答案为什么关闭不起作用后,我做了一些调查,发现另一篇文章回答Java 8是否支持闭包:

Does Java 8 Support Closures?

简短的回答,我相信它。然而,它谈到了final或有效的final变量。因此,我更新了我的代码如下。希望这能让理解闭包的人们帮助我理解他们为什么不能像我以前那样工作(在C#和JavaScript中)。我只是发布了sendWork(...)的更新,以突出我试图无济于事的努力。

private void sendWork(Class<? extends BaseWorker> worker) {
    // GetDataActor extends AbstractActor
    ActorRef actorRef = actorFactory.create(GetDataActor.class);
    Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);

    Consumer<Throwable> onFailureClosure = (ex) -> {
            LOGGER.error("Worker {} encountered a problem - cancelling.", 
                         worker.getSimpleName());
            if (refMap.containsKey(worker)) {
                refMap.remove(worker);
            }
    }

    responseRef.onFailure(new OnFailure() {
        @Override
        public void onFailure(Throwable failure) throws Throwable {
            onFailureClosure.accept(failure);
        }
    }, system.dispatcher());

    responseRef.onSuccess(new OnSuccess<Object>() {
        @Override
        public void onSuccess(Object msg) throws Throwable {
            if (msg instanceof WorkResultPackage) {
                final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                LOGGER.info(
                    "Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());

                pkgQueue.enqueue(reportPackage);
            } else {
                LOGGER.eror(
                    "Expected to receive WorkResultPackage Object but received: {}",
                    msg.getClass());
                    throw new UnknownObjectReceived(msg);
            }
        }
    }, system.dispatcher());

    refMap.put(worker, actorRef);
}
java concurrency closures akka future
1个回答
0
投票

代码存在一个基本问题,可能导致您所看到的行为:代码在并发环境中改变数据,而没有对该数据的任何保护措施。未来的回调可以随时执行,并且可以并行运行。有多个未来的回调检查和改变相同的数据可能会导致奇怪的行为。

Java中处理可变数据并发访问的典型方法是使用同步和锁。幸运的是,既然您正在使用Akka,那么有一种更好的方法。基本上,重构WorkersCoordinator是一个演员,并使用连续消息处理的演员行为到safely handle the mutable state

为了进一步简化问题,你可以在这种情况下放弃使用ask,而是使用tell在actor之间进行通信。我猜这里使用期货试图捕捉错误,但更好的处理错误的方法是Akka的supervisor strategy。也就是说,如果WorkersCoordinator是演员,如果每个GetDataActor实例都是WorkersCoordinator的孩子,那么后者可以决定如何处理前者出现的错误。例如,如果在GetDataActor中抛出异常,协调器可以决定记录错误,然后停止子进程。

以下是包含上述想法的替代WorkersCoordinator

public class WorkersCoordinator extends AbstractActor {
  private static Logger LOGGER = ...
  private final List<Class<? extends BaseWorker>> workers;
  private final Map<ActorRef, Class> refMap;
  private final WorkResultPackageQueue pkgQueue;

  public WorkersCoordinator(final WorkerFactory workerFactory,
                            final WorkResultPackageQueue pkgQueue) {
    this.pkgQueue = pkgQueue;
    this.refMap = Map.newHashMap();
    this.workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
  }

  static Props props(WorkerFactory factory, WorkResultPackageQueue queue) {
    return Props.create(WorkersCoordinator.class, factory, queue);
  }

  static public class Delegate {}

  private static SupervisorStrategy strategy =
    new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder.
      matchAny(t -> {
         ActorRef failedChild = getSender();
         LOGGER.error("This child failed: {}", failedChild);
         refMap.remove(failedChild);
         stop();
      })
      .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }

  @Override
  public void preStart() {
    for (worker : workers) {
       ActorRef child = getContext().actorOf(Props.create(GetDataActor.class));
       refMap.put(child, worker);
    }
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(Delegate.class, d -> {
        refMap.forEach((actor, msg) -> actor.tell(msg, getSelf()));
      })
      .match(WorkResultPackage.class, p -> {
        LOGGER.info("Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());

        pkgQueue.enqueue(p);
        ActorRef dataActor = getSender();
        refMap.remove(dataActor);
      })
      .matchAny(
        msg -> LOGGER.eror("Expected to receive WorkResultPackage Object but received: {}", msg)
      )
      .build();
  }
}

关于上述代码的一些注意事项:

  • 而不是使用ActorFactory,它似乎是一些自定义类,而是使用Props
  • refMap是倒置的,所以ActorRef现在是关键,而工作的类类型就是价值。这允许我们基于refMapActorRef中删除一个条目,无论是在儿童演员成功响应的情况下还是在孩子抛出异常的情况下。
  • 为简单起见,我删除了@Autowired注释。有关演员依赖注入的更多信息,请访问here
  • 要创建并启动WorkersCoordinator,请调用其props方法。为了启动工作,演员需要一个自定义的Delegate消息:一旦演员收到此消息,它将遍历refMap,向地图中的每个孩子发送与该孩子相关联的工作单元。
WorkerFactory factory = ...
WorkResultPackageQueue queue = ...
ActorRef coordinator = actorSystem.actorOf(WorkersCoordinator.props(factory, queue));

Delegate doWork = new Delegate();
coordinator.tell(doWork, ActorRef.noSender());
© www.soinside.com 2019 - 2024. All rights reserved.