是否有使用RxJS的此可写流的更多功能实现,它在处理之前等待一个诺言得以解决?

问题描述 投票:0回答:1

[我已经生活在岩石下几年了,所以我还没有接触过RxJS,我很好奇它如何处理如下情况。

此用例是一种情况,我正在打开一个连接并等待其建立,然后再处理任何传入的块。

const {Readable, Writable} = require('stream')

class DelayedWritable extends Writable {
  constructor(options = {}){
    options.objectMode = true;
    super(options);

    this.promise = new Promise((resolve, reject) => {
      console.log('...waiting...')
      setTimeout( function() {
        console.log('...promise resolved...')
        resolve(/* with client */)
      }, 2000) 
    }) 
  }
  _write = function(chunk, encoding, done){
    var output = `_write : ${chunk}`
    console.time(output)
    this.promise.then((client) => {
      console.timeEnd(output)
      done() 
    });
  }
  _final = function(done){
    console.log('_final')
  }

}

var readable = Readable.from(['one', 'two', 'three'])
var test = new DelayedWritable();
readable.pipe(test)

似乎更实用的方法是最佳选择。 RxJS似乎还提供了许多开箱即用的流控制,这将是很好的。

谢谢!

javascript node.js design-patterns rxjs stream
1个回答
1
投票

如果您已经熟悉Node.js流,我认为您不会发现很难在RxJs中掌握相同的概念:随时间而来的数据收集

此用例是我在处理任何传入块之前打开连接并等待其建立的场景。

可以这样转换成RxJ:

interval(500)
  .pipe(
    take(5),
    bufferTime(2000), // It takes 2s to establish the connection
  )
  .subscribe(console.log)

$后缀用于表示src$不是一个简单的变量,它是一个[[observable(随着时间的推移会发出值)。]interval将根据指定的时间间隔发出值,并且bufferTime将收集发出的值,直到通过指定的bufferTime。发生这种情况时,您将以数组形式获取收集的值。

我特别喜欢RxJ的地方是,它带有许多内置的运算符,可让您操纵传入的数据。

ms

const src$ = from(['one', 'two', 'tree']);

src$
  .pipe(
    filter(v => v !== 'one'),
    map(v => v.toUpperCase()),
    toArray() // Group the values in an array once the source completes
  )
  .subscribe(val => console.log(val));


将承诺转换为可观察的过程很简单:

StackBlitz


上面的例子似乎微不足道,我同意。您将不可避免地遇到更复杂的情况,并且很容易了解from(promise).subscribe()
© www.soinside.com 2019 - 2024. All rights reserved.