我想将
STDOUT
的 subprocess.Popen.communicate
写入 pd.DataFrame
。我采取了一些 SO 线程并将它们组合到此代码中:
import subprocess
import io
import pandas as pd
strings = ['Hello\tWorld!', 'This\tis', 'a\tTest!']
string = '\n'.join(strings)
cmd_grep = ['grep', 's']
process_grep = subprocess.Popen(cmd_grep, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
grep_stdout = process_grep.communicate(input=string.encode('utf-8'))[0].decode('utf-8')
grep_csv = io.StringIO()
for line in grep_stdout:
grep_csv.write(line)
grep_csv.seek(0)
grep_results = pd.read_csv(grep_csv,
sep='\t',
header=None,
names=['Word1', 'Word2'])
grep_csv.close()
grep_results
这适用于简单的输出。但如果我想过滤这些行,就像这样
if line.startswith('This'):
grep_csv.write(line)
它不再起作用了。这是因为我的
for line in grep_stdout:
不迭代行而是字符(您可以通过添加 print(line)
来看到这一点。知道吗,如何将行保持在一起?
communicate
可以防止 stdin
的写入以及 stdout
和 stderr
的读取发生死锁,代价是将整个输出流作为 bytes
对象引入。您可以使用自己的线程执行与 communicate
相同的操作来调整算法。
就您而言,您没有重定向
stderr
并且您希望如果可能的话逐行阅读 stdout
。由于这是 UTF8 编码,其中换行符仍然只是单个字符,因此您可以使用 stdout
上已有的阅读器来完成这项工作。委托写入后台线程并使用主线程作为标准输出。
import subprocess
import io
import pandas as pd
import threading
def stream_writer(data, stream):
"""write and close a stream"""
try:
while data:
count = stream.write(data)
data = data[count:]
finally:
stream.close()
strings = ['Hello\tWorld!', 'This\tis', 'a\tTest!']
string = '\n'.join(strings)
cmd_grep = ['grep', 's']
process_grep = subprocess.Popen(cmd_grep, stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
writer = threading.Thread(target=stream_writer, args=(string.encode('utf-8'),
process_grep.stdin))
writer.start()
# read and filter result then create dataframe
with io.StringIO() as grep_csv:
for bline in process_grep.stdout:
line = bline.decode("utf-8")
if not line.startswith('This'):
grep_csv.write(line)
grep_csv.seek(0)
writer.join()
process_grep.wait()
grep_results = pd.read_csv(grep_csv,
sep='\t',
header=None,
names=['Word1', 'Word2'])
print(grep_results)