关于Future.firstCompletedOf和垃圾收集机制

问题描述 投票:9回答:1

我在现实生活中的项目中遇到了这个问题,并通过测试代码和分析器进行了证明。我没有粘贴“ tl; dr”代码,而是向您显示图片,然后对其进行描述。enter image description here

简而言之,我正在使用Future.firstCompletedOf从2个Future中获取结果,这两个都没有共享的内容并且互不关心。即使,这是我要解决的问题,垃圾收集器在两个Result都完成之前无法回收第一个Future对象

所以,我对此背后的机制真的很好奇。有人可以从较低的层次上解释它,还是提供一些提示让我研究。

谢谢!

PS:是因为它们共享相同的ExecutionContext吗?

**更新**根据要求粘贴测试代码

object Main extends App{
  println("Test start")

  val timeout = 30000

  trait Result {
    val id: Int
    val str = "I'm short"
  }
  class BigObject(val id: Int) extends Result{
    override val str = "really big str"
  }

  def guardian = Future({
    Thread.sleep(timeout)
    new Result { val id = 99999 }
  })

  def worker(i: Int) = Future({
    Thread.sleep(100)
    new BigObject(i)
  })

  for (i <- Range(1, 1000)){
    println("round " + i)
    Thread.sleep(20)
    Future.firstCompletedOf(Seq(
      guardian,
      worker(i)
    )).map( r => println("result" + r.id))
  }

  while (true){
    Thread.sleep(2000)
  }
}
scala concurrency garbage-collection jvm future
1个回答
10
投票

让我们看看如何实现firstCompletedOf

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val completeFirst: Try[T] => Unit = p tryComplete _
  futures foreach { _ onComplete completeFirst }
  p.future
}

{ futures foreach { _ onComplete completeFirst }执行时,功能completeFirst保存在某处通过ExecutionContext.execute。该函数的确切存储位置无关紧要,我们只知道它必须存储在某个位置以便稍后可以选择它,并在线程可用时在线程池上执行。仅在将来完成时,才不再需要引用completeFirst

因为completeFirst结束于p,只要还有一个未来(来自futures)等待完成,便会引用p来防止对其进行垃圾收集(即使那样很有可能firstCompletedOf已经返回,从堆栈中删除了p

当第一个未来完成时,它将结果保存到承诺中(通过调用p.tryComplete)。因为承诺p保留了结果,所以只要p可以达到,结果就可以达到,并且只要p的至少一个未来还没有完成,就可以看到futures是可以达到的。这就是为什么在所有期货都未完成之前就无法收集结果的原因。

UPDATE:现在的问题是:它可以解决吗?我认为可以。我们要做的就是确保第一个将来以线程安全的方式完成对p的引用的“清除”操作,这可以通过使用AtomicReference进行示例来完成。像这样的东西:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val pref = new java.util.concurrent.atomic.AtomicReference(p)
  val completeFirst: Try[T] => Unit = { result: Try[T] =>
    val promise = pref.getAndSet(null)
    if (promise != null) {
      promise.tryComplete(result)
    }
  }
  futures foreach { _ onComplete completeFirst }
  p.future
}

我已经对其进行了测试,并且正如预期的那样,它确实允许在第一个将来完成时立即对结果进行垃圾收集。它在所有其他方面的行为都应相同。

© www.soinside.com 2019 - 2024. All rights reserved.