[我已经生活在岩石下几年了,所以我还没有接触过RxJS,我很好奇它如何处理如下情况。
此用例是一种情况,我正在打开一个连接并等待其建立,然后再处理任何传入的块。
const {Readable, Writable} = require('stream')
class DelayedWritable extends Writable {
constructor(options = {}){
options.objectMode = true;
super(options);
this.promise = new Promise((resolve, reject) => {
console.log('...waiting...')
setTimeout( function() {
console.log('...promise resolved...')
resolve(/* with client */)
}, 2000)
})
}
_write = function(chunk, encoding, done){
var output = `_write : ${chunk}`
console.time(output)
this.promise.then((client) => {
console.timeEnd(output)
done()
});
}
_final = function(done){
console.log('_final')
}
}
var readable = Readable.from(['one', 'two', 'three'])
var test = new DelayedWritable();
readable.pipe(test)
似乎更实用的方法是最佳选择。 RxJS似乎还提供了许多开箱即用的流控制,这将是很好的。
谢谢!
如果您已经熟悉Node.js流,我认为您不会发现很难在RxJs中掌握相同的概念:随时间而来的数据收集。
此用例是我在处理任何传入块之前打开连接并等待其建立的场景。
可以这样转换成RxJ:
interval(500)
.pipe(
take(5),
bufferTime(2000), // It takes 2s to establish the connection
)
.subscribe(console.log)
$
后缀用于表示src$
不是一个简单的变量,它是一个[[observable(随着时间的推移会发出值)。]interval
将根据指定的时间间隔发出值,并且bufferTime
将收集发出的值,直到通过指定的bufferTime
。发生这种情况时,您将以数组形式获取收集的值。
我特别喜欢RxJ的地方是,它带有许多内置的运算符,可让您操纵传入的数据。
ms
const src$ = from(['one', 'two', 'tree']);
src$
.pipe(
filter(v => v !== 'one'),
map(v => v.toUpperCase()),
toArray() // Group the values in an array once the source completes
)
.subscribe(val => console.log(val));
。
上面的例子似乎微不足道,我同意。您将不可避免地遇到更复杂的情况,并且很容易了解from(promise).subscribe()
。