rxjs:将值动态添加到可观察流的最佳方法是什么?

问题描述 投票:0回答:1

作为学习rxjs的一部分,我一直在使用from,interval等创建方法来测试节流阀和deboucne,等等,这些方法都是使用fromevent创建流。

现在我有一个实际的用例,我需要将值动态添加到一个空的可观察流中。我找不到任何方法的最佳使用方法,而不是使用上面的创建方法。目前,我正在使用BehaviourSubject使用next()将项目动态添加到流中。这是动态地向流中添加新项目的最佳/首选方法吗?

例如

import { BehaviorSubject, timer } from 'rxjs';
import { tap, mapTo, concatMap, } from 'rxjs/operators';

const subject = new BehaviorSubject(1);
const example = subject.pipe(
  concatMap(ev => timer(200).pipe(mapTo(ev))),
  tap((ev) => console.log(ev))
)
example.subscribe();

// add a flurry of values dynamically
subject.next(2);
subject.next(3);
subject.next(4);

// some time later add some more
setTimeout(function(){ 
  subject.next(5);
  subject.next(6);
  subject.next(7);
}, 5000);

https://stackblitz.com/edit/rxjs-behaviorsubject-simpleexample-gyrtw8?file=index.ts

谢谢

rxjs stream
1个回答
0
投票

如果您具有添加自定义变量的原始自定义逻辑,则可以创建自己的逻辑(而不是使用fromEvent, of, from, ...:]]


const myObservable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);

  setTimeout(() => {
    subscriber.next(4);
    subscriber.next(5);


    setTimeout(() => {
       subscriber.next(6);
    }, 2000);
  }, 1000);
});

但是,rxjs的创建功能应满足您99%的需求。上面的代码也可以这样写:

concat(
  of(1,2,3),
  of(4,5).pipe(
    delay(1000)
  ),
  of(6).pipe(
    delay(2000)
  )
)

UPD:关于主题

主题也是可观察的,因此在您的情况下,使用主题是适用的,但可能不是最佳选择。主题的想法是可以有一个以上的订阅者(使用主题值的订阅者),但是我不确定这是您的情况(顺便说一句-您可以提供真实的示例来帮助我们)了解您想要实现的目标)

© www.soinside.com 2019 - 2024. All rights reserved.