如何从可观察的monix中获取下一个元素?

问题描述 投票:1回答:2

我有一个随意记录的数据流,但是在程序的某个状态下,我需要从该数据流中获取数据,而不是到目前为止所观察到的最新数据(我可以做到这一点),而是随后的最新数据:

val dataStream : Observable[Data] = ...
dataStream
 .doOnNext(logger.trace(_))
 .subscribe()

// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))

如何实现def awaitOneElement(Observable[Data]): Data = ???

我知道这在习惯上可能是不正确的,但是肮脏的同步等待正是我所需要的。我也同意Observable[Data] => Future[Data],在下一步中将以Await包装。

scala stream observable monix
2个回答
0
投票

如果仅需要一个元素,则可以使用Consumer.headConsumer.headOption

val first: Task[Option[Int]] =
  Observable.empty[Int].consumeWith(Consumer.headOption)]

现在您可以将任务转换为将来的任务。如果需要所有元素,则可以将foldLeft与累加器一起使用:

Consumer.foldLeft[T,Vector[T]](Vector.empty[T])((vec, t) => vec :+ t )

或实施自定义使用者并在需要时触发回调。来自文档:https://monix.io/docs/3x/reactive/consumer.html


0
投票

我的解决方案:

  implicit val scheduler                     = mx
  val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
  val lastSubject: ConcurrentSubject[A, A]   = ConcurrentSubject.publish[A]

  o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
    .subscribe()

  lastSubject
    .filter(filter)
    .doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
    .subscribe()

  def getNext(duration: Duration): Try[A] = {
    val promise = Promise[A]()
    pendingPromise.set(Some(promise))
    Try(Await.result(promise.future, duration))
  }
}

当调用getNext时,将创建一个Promise并返回future。当“可观察”中发生所需事件时,将兑现并删除承诺。

© www.soinside.com 2019 - 2024. All rights reserved.