我的目标是用 Dart 创建一种网络爬虫。为此,我想维护一个任务队列,其中存储需要爬网的元素(例如 URL)。这些元素在爬行函数中爬行,该函数返回需要处理的更多元素的列表。因此,这些元素被添加到队列中。示例代码:
import "dart:collection";
final queue = Queue<String>();
main() async{
queue
..add("...")
..add("...")
..add("...");
while (queue.isNotEmpty) {
results = await crawl(queue.removeFirst());
queue.addAll(results);
}
}
Future<List<String>> crawl(String x) async {
...
res = await http.get(x)
...
return results;
}
这段粗略的代码一次只处理一个元素。不过,我想要一个工作池(例如 5 个),它们从队列中取出元素并同时处理它们,然后将结果添加回队列。由于瓶颈是 HTTP 请求,我认为对多个工作线程进行 Future.wait() 调用可以加快执行速度。但是我不想让服务器超载,因此我也想限制工作人员的数量。
这可以用基本的异步原语和信号量来实现吗?我希望尽可能避免隔离,以使解决方案尽可能简单。
我不知道是否已经有一个包提供了此功能,但由于编写自己的逻辑并不那么复杂,我做了以下示例:
import 'dart:async';
import 'dart:collection';
import 'dart:math';
class TaskRunner<A, B> {
final Queue<A> _input = Queue();
final StreamController<B> _streamController = StreamController();
final Future<B> Function(A) task;
final int maxConcurrentTasks;
int runningTasks = 0;
TaskRunner(this.task, {this.maxConcurrentTasks = 5});
Stream<B> get stream => _streamController.stream;
void add(A value) {
_input.add(value);
_startExecution();
}
void addAll(Iterable<A> iterable) {
_input.addAll(iterable);
_startExecution();
}
void _startExecution() {
if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
return;
}
while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
print('Concurrent workers: $runningTasks');
task(_input.removeFirst()).then((value) async {
_streamController.add(value);
while (_input.isNotEmpty) {
_streamController.add(await task(_input.removeFirst()));
}
runningTasks--;
print('Concurrent workers: $runningTasks');
});
}
}
}
Random _rnd = Random();
Future<List<String>> crawl(String x) =>
Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));
void main() {
final runner = TaskRunner(crawl, maxConcurrentTasks: 3);
runner.stream.forEach((listOfString) {
if (listOfString.length == 1) {
print('DONE: ${listOfString.first}');
} else {
print('PUTTING STRINGS ON QUEUE: $listOfString');
runner.addAll(listOfString);
}
});
runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}
哪个输出:
Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70
我确信课程的可用性可以提高,但我认为核心概念很容易理解。这些概念是我们定义一个
Queue
,每次向这个Queue
添加内容时,我们都会检查是否可以开始执行新的异步任务。否则我们就跳过它,因为我们确保每个当前运行的异步任务都会在“关闭”之前检查Queue
上的更多内容。
结果由
Stream
返回,您可以订阅它,例如根据我在示例中显示的结果,向 TaskRunner
添加更多内容。返回数据的顺序是基于它们完成的顺序。
重要的是,这不是在多线程中运行任务的方法。所有代码都在单个 Dart 隔离线程中运行,但由于 HTTP 请求是 IO 延迟的,因此尝试生成多个
Future
并等待结果是有必要的。
类似于js中的p-limit包。 Dart 中现在有一个 p_limit 包。 (免责声明,我是在 Dart 中实现它的作者)。 查看文档
只需使用您想要的并发任务数量来实例化它,然后创建一个 Future 列表,每个 Future 都用下面生成的
limit
函数包装。
import 'package:p_limit/p_limit.dart';
void main() async {
// Example concurrency of 3 futures at once
final limit = PLimit<http.Response>(3);
final queue = Queue<String>();
queue
..add("http://www.exampleone.com/")
..add("http://www.exampletwo.com/")
..add("http://www.examplethree.com/")
..add("http://www.examplefour.com/");
final futures = queue.map((url) {
// wrap the function we are calling in the limit function we defined above
return limit(() => http.get(Uri.parse(url)));
});
// Only three futures are run at once (as defined above)
final results = await Future.wait(futures);
print(results);
}