我有一个随意记录的数据流,但是在程序的某个状态下,我需要从该数据流中获取数据,而不是到目前为止所观察到的最新数据(我可以做到这一点),而是随后的最新数据:
val dataStream : Observable[Data] = ...
dataStream
.doOnNext(logger.trace(_))
.subscribe()
// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))
如何实现def awaitOneElement(Observable[Data]): Data = ???
?
我知道这在习惯上可能是不正确的,但是肮脏的同步等待正是我所需要的。我也同意Observable[Data] => Future[Data]
,在下一步中将以Await
包装。
如果仅需要一个元素,则可以使用Consumer.head
或Consumer.headOption
:
val first: Task[Option[Int]] =
Observable.empty[Int].consumeWith(Consumer.headOption)]
现在您可以将任务转换为将来的任务。如果需要所有元素,则可以将foldLeft与累加器一起使用:
Consumer.foldLeft[T,Vector[T]](Vector.empty[T])((vec, t) => vec :+ t )
或实施自定义使用者并在需要时触发回调。来自文档:https://monix.io/docs/3x/reactive/consumer.html
我的解决方案:
implicit val scheduler = mx
val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
val lastSubject: ConcurrentSubject[A, A] = ConcurrentSubject.publish[A]
o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
.subscribe()
lastSubject
.filter(filter)
.doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
.subscribe()
def getNext(duration: Duration): Try[A] = {
val promise = Promise[A]()
pendingPromise.set(Some(promise))
Try(Await.result(promise.future, duration))
}
}
当调用getNext时,将创建一个Promise
并返回future。当“可观察”中发生所需事件时,将兑现并删除承诺。