使用 Dart Futures 的生产者-消费者:有没有办法让 Dart“任务”在等待数据结构上的活动时挂起?

问题描述 投票:0回答:1

我正在尝试了解 Dart 的异步/等待如何工作的一些想法/直觉。

这是一个简单的 Dart 程序,其中有:

  • a Producer 函数将字符串添加到
    pipe
    (a
    List<String>
    )。
  • a consumer 函数每 100 毫秒读取一次
    pipe
    ,删除所有字符串。

通过“未来化”(正确的词是什么?)它们,这两个函数都可以作为“事件循环上的任务”同时运行。无需同步对

pipe
的访问,但 consumer 函数必须调用
Future.delayed()
来让出 CPU,以便 Producer 能够运行。

有没有办法执行等待

pipe
直到它不为空,而不是每 100 毫秒检查一次?我应该使用 Dart
stream
来实现吗?

import 'dart:async';
import 'dart:math';

final pipe = <String>[];
final rand = Random();

String now() {
  return '${DateTime.now()}';
}

Future producer() async {
  print('Producer starts at ${now()}');
  for (int i = 0; i < 5; i++) {
    int delay_ms = 500 + rand.nextInt(3000);
    print('Producer delaying by $delay_ms ms ...');
    await Future.delayed(Duration(milliseconds: delay_ms));
    final payload = 'payload $i';
    pipe.add(payload);
    print("Producer produced '$payload' at ${now()}");
  }
  pipe.add('DONE');
  print('Producer ends at ${now()}');
}

Future consumer() async {
  print('Consumer starts at ${now()}');
  String? received;
  while (received != 'DONE') {
    // do nothing for 100ms, giving the producer CPU time
    // --- but is there a better way? ---
    await Future.delayed(const Duration(milliseconds: 100));
    received = null;
    while (pipe.isNotEmpty && received != 'DONE') {
      received = pipe.removeAt(0);
      print("Consumer received '$received' at ${now()}");
    }
  }
  print('Consumer ends at ${now()}');
}


Future<void> main() async {
  print('main(): Starting');

  final Future producerFuture = producer().then((_) {
    print('Producer finished');
  });

  print('main(): Producer is running');

  final Future consumerFuture = consumer().then((_) {
    print('Consumer finished');
  });

  print('main(): Consumer is running');

  final Future commonFuture = Future.wait([producerFuture, consumerFuture]);

  await commonFuture;

  print('main(): Done');
}

运行示例可能是:

main(): Starting
Producer starts at 2024-05-28 15:39:38.021731
Producer delaying by 2092 ms ...
main(): Producer is running
Consumer starts at 2024-05-28 15:39:38.040345
main(): Consumer is running
Producer produced 'payload 0' at 2024-05-28 15:39:40.125837
Producer delaying by 2526 ms ...
Consumer received 'payload 0' at 2024-05-28 15:39:40.170401
Producer produced 'payload 1' at 2024-05-28 15:39:42.652816
Producer delaying by 2167 ms ...
Consumer received 'payload 1' at 2024-05-28 15:39:42.695814
Producer produced 'payload 2' at 2024-05-28 15:39:44.820820
Producer delaying by 3091 ms ...
Consumer received 'payload 2' at 2024-05-28 15:39:44.917827
Producer produced 'payload 3' at 2024-05-28 15:39:47.912795
Producer delaying by 2285 ms ...
Consumer received 'payload 3' at 2024-05-28 15:39:47.947775
Producer produced 'payload 4' at 2024-05-28 15:39:50.198882
Producer ends at 2024-05-28 15:39:50.199027
Producer finished
Consumer received 'payload 4' at 2024-05-28 15:39:50.271304
Consumer received 'DONE' at 2024-05-28 15:39:50.271424
Consumer ends at 2024-05-28 15:39:50.271494
Consumer finished
main(): Done
dart async-await producer-consumer
1个回答
0
投票

流非常适合执行您所描述的任务。很容易调整函数

producer
以返回
Stream<String>
:

Stream<String> streamProducer() async* {
  print('Stream starts at ${now()}');
  for (int i = 0; i < 5; i++) {
    int delay_ms = 500 + rand.nextInt(3000);
    print('Stream delaying by $delay_ms ms ...');
    await Future.delayed(Duration(milliseconds: delay_ms));
    final payload = 'payload $i';
    yield payload;
  }
  print('Stream ends at ${now()}');
}

注意关键字

yield
(而不是 return)和
async
后面的星号。 在您的函数
main
中,您将拥有例如:

final stream = streamProducer();
stream.listen(print);

下面列出了示例控制台输出:

$ dart bin/stream_example.dart 
main(): Starting
Stream starts at 2024-05-28 17:25:09.912208
Stream delaying by 1679 ms ...
payload 0
Stream delaying by 1766 ms ...
payload 1
Stream delaying by 893 ms ...
payload 2
Stream delaying by 1574 ms ...
payload 3
Stream delaying by 1281 ms ...
payload 4
Stream ends at 2024-05-28 17:25:17.139823

有关更多详细信息,请参阅在 Dart 中创建流

© www.soinside.com 2019 - 2024. All rights reserved.