我有一个要缓冲的传入数据流,因此,当我实际上要处理数据时,我可以基于critera从缓冲区中选择最合适的数据。
如果缓冲区为空,我想等待流中的第一个可用数据,然后返回。
在此示例中,我的标准是max:
class Repo {
final Stream<int> input;
List<int> buffer = [];
Repo(this.input) {
input.listen((num) => buffer.add(num));
}
Future<int> getNext() async {
if (buffer.isEmpty) {
await input.isEmpty;
}
var max_num = buffer.reduce(max);
buffer.remove(max_num);
return max_num;
}
}
此解决方案的我的问题:
getNext
呼叫正在等待input.isEmpty
,则所有呼叫将继续并且它们会过度使用缓冲区。buffer.remove
和buffer.add
可能“同时”发生,导致缓冲区不一致。什么是更好的解决方案?
我不认为您的第二个问题实际上是书面所写的问题。每个Dart隔离都是单线程的。当执行流的调用buffer.add
的侦听器回调时,由于它不会随await
一起产生,因此无法被getNext
中断。同样,一旦getNext
准备好执行buffer.remove
,就不能被buffer.add
回调中断。
关于第一个问题,我思考您可以使所有getNext
调用都在循环中等待:
Future<int> getNext() async {
while (buffer.isEmpty) {
await input.isEmpty;
}
var max_num = buffer.reduce(max);
buffer.remove(max_num);
return max_num;
}
[将事件添加到Stream
时,您的侦听器将向buffer
添加一个元素,并且应导致第一个未完成的getNext
调用之一跳出循环并处理buffer
。然后,将处理其他未完成的getNext
呼叫,看到buffer
(再次)为空,然后继续等待。