为什么不能在最后一个产量受损的输出之外的管道组件上调用communication()?

问题描述 投票:0回答:1

我想计算一个大型.fastq文件(5900万行)中大约500个模式的出现。这些模式都正好是20个字符长。

在Unix中,这很简单:

grep -F -o -f patterns.txt big_file.fastq | sort | uniq -c

但是,我希望避免编写临时模式文件,所以我使用python的子进程库创建了一个管道:

from subprocess import Popen, PIPE, STDOUT

p1 = Popen(["grep", "-F", "-o", "-f", "-", "big_file.fastq"], shell = False, stdin = PIPE, stdout = PIPE, stderr= STDOUT)
p2 = Popen(["sort"], shell = False, stdin = p1.stdout, stdout = PIPE, stderr = STDOUT)
p3 = Popen(["uniq", "-c"], shell = False, stdin = p2.stdout, stdout = PIPE, stderr = STDOUT)

然后我在此上调用communication(),提供一个编码的类似于io.StringIO文件的对象作为输入(我使用'-'将其传递给grep命令:]

import io

patterns_file = io.StringIO("\n".join(patterns_list))
p3.communicate(input = patterns_file.read().encode('utf-8'))[0]

当我像这样在uniq上调用communication()时,这可以正常工作。

但是,在测试时,我在管道的第一部分错误地将其称为:

p1.communicate(input = patterns_file.read().encode('utf-8'))[0]

这给了我完全错误的输出,包括比预期的20个字符短或长的匹配。

我不明白为什么会这样。不会在p1上仅调用管道的该部分而忽略其余部分吗?删除p2和p3会使p1正确grep。我觉得我缺少Popen的工作方式。

感谢您的任何帮助。

python grep subprocess pipe popen
1个回答
0
投票

实例化Popen对象时,它们所引用的子过程将立即启动。因此,即使仅在communicate()上调用p1p2p3也正在运行。

为什么这很重要?因为p2仍将其stdin连接到p1正在向其写入输出的FIFO!

[如果sort上的p2操作仍在读取内容,而您又要求Python程序直接读取相同的内容,则最终会导致p1的输出在它们之间分配。可以预料会出现热闹:在两个程序之间进行拆分的唯一方法would n't会导致明显损坏的数据是p2communicate()都在读取20字节倍数的块(同时足够小,以至于操作系统不会将它们分成多个系统调用);但是,用于无缓冲读取的典型块大小为4096的倍数,每次读取块时都会从记录边界产生4个字节的偏移量。


顺便说一下,对于many程序,这没有那么严重的影响,因为FIFO具有相对较小的缓冲区。一个为读取的每一行输入写一行输出的程序最终会很快被输出阻塞,因此将停止读取其他输入,直到其输出至少部分刷新为止。 sort是一个例外,因为它需要先读取所有输入,然后才能知道其第一行输出!

© www.soinside.com 2019 - 2024. All rights reserved.