我有一个带有近100个日志文件的目录,每个文件的重量为10~15 GB。要求是逐行读取每个文件(顺序根本不重要),清理行json并将其转储到后端elasticsearch存储以进行索引。
这是我的工作人员做这项工作
# file = worker.php
echo " -- New PHP Worker Started -- "; // to get how many times gnu-parallel initiated the worker
$dataSet = [];
while (false !== ($line = fgets(STDIN))) {
// convert line text to json
$l = json_decode($line);
$dataSet[] = $l;
if(sizeof($dataSet) >= 1000) {
//index json to elasticsearch
$elasticsearch->bulkIndex($dataSet);
$dataSet = [];
}
}
在here和here的答案的帮助下,我几乎在那里,它正在工作(有点),但只需要确保它实际上正在做我认为它正在做的事情。
只有一个文件我可以处理如下
parallel --pipepart -a 10GB_input_file.txt --round-robin php worker.php
这很好用。添加--round-robin确保php worker进程只启动一次,然后它只是继续接收数据作为管道(穷人的队列)。
因此对于4CPU机器,它会激活4个php工作人员并快速处理所有数据。
要对所有文件执行相同操作,请参阅此处
find /data/directory -maxdepth 1 -type f | parallel cat | parallel --pipe -N10000 --round-robin php worker.php
哪种看起来像工作但我有一种直觉,这是一种错误的并行方式为所有文件嵌套。
其次,因为它不能使用--ppartpart,我认为它更慢。
第三,一旦工作完成,我看到在4cpu机器上,只有4名工人被启动并完成了工作。这是正确的行为吗?不应该为每个文件启动4个工作人员吗?只是想确保我没有错过任何数据。
知道如何以更好的方式完成这项工作吗?
如果它们的大小大致相同,为什么不简单地给每个文件分配一个文件:
find /data/directory -maxdepth 1 -type f |
parallel php worker.php '<' {}
另一种方法是在每个上使用--pipepart
:
do_one() {
parallel --pipepart -a "$1" --block -1 php worker.php
}
export -f do_one
find /data/directory -maxdepth 1 -type f | parallel -j1 do_one
如果启动php worker.php
不需要很长时间,那么最后一个可能更好,因为如果文件的大小非常不同,它会更均匀地分布,因此如果最后一个文件很大,你最终不会等待一个完成处理的过程。