我想计算一个大型.fastq文件(5900万行)中大约500个模式的出现。这些模式都正好是20个字符长。
在Unix中,这很简单:
grep -F -o -f patterns.txt big_file.fastq | sort | uniq -c
但是,我希望避免编写临时模式文件,所以我使用python的子进程库创建了一个管道:
from subprocess import Popen, PIPE, STDOUT
p1 = Popen(["grep", "-F", "-o", "-f", "-", "big_file.fastq"], shell = False, stdin = PIPE, stdout = PIPE, stderr= STDOUT)
p2 = Popen(["sort"], shell = False, stdin = p1.stdout, stdout = PIPE, stderr = STDOUT)
p3 = Popen(["uniq", "-c"], shell = False, stdin = p2.stdout, stdout = PIPE, stderr = STDOUT)
然后我在此上调用communication(),提供一个编码的类似于io.StringIO文件的对象作为输入(我使用'-'将其传递给grep命令:]
import io
patterns_file = io.StringIO("\n".join(patterns_list))
p3.communicate(input = patterns_file.read().encode('utf-8'))[0]
当我像这样在uniq上调用communication()时,这可以正常工作。
但是,在测试时,我在管道的第一部分错误地将其称为:
p1.communicate(input = patterns_file.read().encode('utf-8'))[0]
这给了我完全错误的输出,包括比预期的20个字符短或长的匹配。
我不明白为什么会这样。不会在p1上仅调用管道的该部分而忽略其余部分吗?删除p2和p3会使p1正确grep。我觉得我缺少Popen的工作方式。
感谢您的任何帮助。
实例化Popen
对象时,它们所引用的子过程将立即启动。因此,即使仅在communicate()
上调用p1
,p2
和p3
也正在运行。
为什么这很重要?因为p2
仍将其stdin连接到p1
正在向其写入输出的FIFO!
[如果sort
上的p2
操作仍在读取内容,而您又要求Python程序直接读取相同的内容,则最终会导致p1
的输出在它们之间分配。可以预料会出现热闹:在两个程序之间进行拆分的唯一方法would n't会导致明显损坏的数据是p2
和communicate()
都在读取20字节倍数的块(同时足够小,以至于操作系统不会将它们分成多个系统调用);但是,用于无缓冲读取的典型块大小为4096的倍数,每次读取块时都会从记录边界产生4个字节的偏移量。
顺便说一下,对于many程序,这没有那么严重的影响,因为FIFO具有相对较小的缓冲区。一个为读取的每一行输入写一行输出的程序最终会很快被输出阻塞,因此将停止读取其他输入,直到其输出至少部分刷新为止。 sort
是一个例外,因为它需要先读取所有输入,然后才能知道其第一行输出!