我正在尝试使用Python中的
subprocess
模块与以流方式读取标准输入并写入标准输出的进程进行通信。我想让子进程从生成输入的迭代器读取行,然后从子进程读取输出行。输入线和输出线之间可能不存在一一对应的关系。如何从返回字符串的任意迭代器提供子进程?
这里是一些示例代码,提供了一个简单的测试用例,以及我尝试过的一些方法,但由于某种原因或其他原因不起作用:
#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))
# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)
# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")
那么当我从它的 stdout 逐行读取时,如何让我的子进程逐行从迭代器读取?
最简单的方法似乎是从子进程中分叉并提供输入句柄。任何人都可以详细说明这样做的任何可能的缺点吗?或者有没有Python模块可以让它变得更简单、更安全?
#!/usr/bin/python
from subprocess import *
import os
def fork_and_input(input, handle):
"""Send input to handle in a child process."""
# Make sure input is iterable before forking
input = iter(input)
if os.fork():
# Parent
handle.close()
else:
# Child
try:
handle.writelines(input)
handle.close()
# An IOError here means some *other* part of the program
# crashed, so don't complain here.
except IOError:
pass
os._exit()
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)
for line in subproc.stdout:
print line,
从 Python 迭代器提供子进程的标准输入:
#!/usr/bin/env python3
from subprocess import Popen, PIPE
with Popen("sink", stdin=PIPE, bufsize=-1) as process:
for chunk in input_iterator:
process.stdin.write(chunk)
如果你想同时读取输出,那么你需要threads或async.io:
#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def writelines(writer, lines):
# NOTE: can't use writer.writelines(lines) here because it tries to write
# all at once
with closing(writer):
for line in lines:
writer.write(line)
await writer.drain()
async def main():
input_iterator = (b"hello %d\n" % x for x in range(100000000))
process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
asyncio.ensure_future(writelines(process.stdin, input_iterator))
async for line in process.stdout:
sys.stdout.buffer.write(line)
return await process.wait()
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(main()))
按照这个食谱它是子进程的一个附加组件,支持异步I/O。不过,这仍然要求您的子进程使用其输出的一部分来响应每个输入行或一组行。
有 https://github.com/uktrade/iterable-subprocess (完全公开:由我创建)可以做到这一点。例如:
from iterable_subprocess import iterable_subprocess
input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))
with iterable_subprocess(['cat'], input_iterator) as output:
for chunk in output:
print(chunk)
虽然这不会输出字符串行,但会输出字节块,不一定会分成行。要制作可迭代的行,您可以在 https://stackoverflow.com/a/70639580/1319998
集成答案的变体import io
from iterable_subprocess import iterable_subprocess
input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))
class FileLikeObject(io.IOBase):
def __init__(self, it):
self.it = iter(it)
def readable(self):
return True
def read(self, _):
return next(self.it, b'')
with iterable_subprocess(['cat'], input_iterator) as output:
for line in io.TextIOWrapper(FileLikeObject(output), newline="", encoding="utf-8"):
print(line)