在Node.js中,你可以设置一个服务器,只要服务器还活着,并且在事件循环中处于活跃状态,这个进程就不会终止。我想知道是否可以使用反应式扩展来做这样的事情?我知道如何设置一个命名管道,并利用它与另一个进程中的Node服务器进行通信,但我不知道如何通过等待按键以外的其他方式来防止程序终止。我想把该服务器作为反应式管道的一部分来实现。能否在流水线完成之前阻塞主线程?
open System
open System.Threading
open FSharp.Control.Reactive
module Observable =
/// Blocks the thread until the observable completes.
let waitUnit on_next (o : IObservable<_>) =
use t = new ManualResetEventSlim()
use __ = Observable.subscribeWithCompletion on_next (fun _ -> t.Set()) o
t.Wait()
Observable.interval (TimeSpan.FromSeconds(1.0))
|> Observable.take 3
|> Observable.waitUnit (printfn "%i...")
printfn "Done."
以上是使用 F# Rx绑定但C#版本也会类似。该库本身有 Observable.wait
属于 IObservable<'a> -> 'a
但这个方法的缺点是会在空序列上抛出一个异常,并且会在内存中保留最新的值,直到观测值被处理掉。如果最终值不重要的话,这个有正确的语义。
我看了一下引擎盖下的内容,发现 wait
用途 ManualResetEventSlim
来阻止它所在的线程,所以这应该没问题。
很抱歉,我不能在F#中给出答案,但在C#中这很简单。
如果我有这个例子:
Observable
.Range(0, 10)
.Subscribe(x => Console.WriteLine(x));
...而我想等到它完成,我可以这样改写:
Observable
.Range(0, 10)
.Do(x => Console.WriteLine(x))
.ToArray()
.Wait();
...或者
await Observable
.Range(0, 10)
.Do(x => Console.WriteLine(x))
.ToArray();
正如Marko在评论中指出的那样,这种方法会创建一个可能相当大的数组。为了解决这个问题,你可以将 .ToArray()
与 .LastAsync()
(该名称是在普遍使用 Async
以示 Task
- 在这种情况下,它只是将观测到的最后一个元素作为一个 IObservable<T>
).
该 .Wait()
确实抛出了一个空的观测值,但是 await
版本没有。