我正在寻找一种以编程方式将转换器添加到已经在收听的现有流中的方法。
示例:
Stream numbers = new Stream.fromIterable([0,1,2,3]);
numbers.listen((number) => print(number));
现在响应某些UI事件,我想通过添加映射转换器来修改此流,就像我最初写的一样:
numbers.where((number) => number % 2 == 0);
所有现有的监听器从现在开始应该只接收偶数,而不会受到干扰。如何做到这一点?
而不是像“如何动态地将转换器插入流中那样思考”,一种可能的方式是像“如何动态地控制已经注入的转换器”那样思考。
这是使用StreamTransformer
的示例:
var onlySendEvenNumbers = false; // controlled by some UI event handler
var originalStream = makeStreamOfStuff();
originalStream = originalStream.transform(new StreamTransformer.fromHandlers(
handleData: (int value, EventSink<int> sink) {
if (onlySendEvenNumber) {
if (value.isEven) {
sink.add(value);
}
} else {
sink.add(value);
}
}));
originalStream.listen(print); // listen on events like normal
我想到的一种方法是使用调用另一个函数的函数过滤Stream
:
var filter = (n) => true;
Stream numbers = new String.fromIterable([0, 1, 2, 3]).where((n) => filter(n));
然后,当您想更改过滤条件时:
filter = (n) => n % 2 == 0;
一个具体示例:
import 'dart:async';
main() {
var filter = (n) => true;
Stream numbers = new Stream.periodic(new Duration(seconds: 1), (n) => n)
.where((n) => filter(n));
numbers.listen((n) => print(n));
new Future.delayed(new Duration(seconds: 4)).then((_) {
filter = (n) => n % 2 == 0;
});
}
这将打印:
0
1
2
3
4
6
8
10
12
依此类推,仅对于偶数,在4秒钟后。
rxdart的combineLatest2
呢?
它合并两个流,并在改变两个流时每次发射。
您可以使用Switch类在有条件的情况下打开/关闭。
class XsBloc {
Api _api = Api();
BehaviorSubject<List<X>> _xs = BehaviorSubject();
BehaviorSubject<Switcher> _switcher =
BehaviorSubject<Switcher>.seeded(Switcher(false, []));
XsBloc() {
Observable.combineLatest2<List<X>, Switcher, List<X>>(
_api.xs(), _switcher, (xs, s) {
if (s.isOn == true) {
return xs.where((x) => s.conditions.contains(x.id)).toList();
} else {
return xs;
}
}).listen((x) => _xs.add(x));
}
Stream<List<X>> get xs => _xs;
ValueObservable<Switcher> get switcher =>
_switcher.stream;
Function(Switcher) get setSwitcher => _switcher.sink.add;
}
class Switcher {
final bool isOn;
final List<String> conditions;
Switcher(this.isOn, this.conditions);
}
var bloc = XsBloc();
bloc.setSwitcher(true, ['A', 'B']);
bloc.setSwitcher(false, []);
bloc.setSwitcher(true, []);