如何从 Python 迭代器提供子进程的标准输入?

问题描述 投票:0回答:4

我正在尝试使用Python中的

subprocess
模块与以流方式读取标准输入并写入标准输出的进程进行通信。我想让子进程从生成输入的迭代器读取行,然后从子进程读取输出行。输入线和输出线之间可能不存在一一对应的关系。如何从返回字符串的任意迭代器提供子进程?

这里是一些示例代码,提供了一个简单的测试用例,以及我尝试过的一些方法,但由于某种原因或其他原因不起作用:

#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)

# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")

那么当我从它的 stdout 逐行读取时,如何让我的子进程逐行从迭代器读取?

python io subprocess
4个回答
5
投票

最简单的方法似乎是从子进程中分叉并提供输入句柄。任何人都可以详细说明这样做的任何可能的缺点吗?或者有没有Python模块可以让它变得更简单、更安全?

#!/usr/bin/python
from subprocess import *
import os

def fork_and_input(input, handle):
    """Send input to handle in a child process."""
    # Make sure input is iterable before forking
    input = iter(input)
    if os.fork():
        # Parent
        handle.close()
    else:
        # Child
        try:
            handle.writelines(input)
            handle.close()
        # An IOError here means some *other* part of the program
        # crashed, so don't complain here.
        except IOError:
            pass
        os._exit()

# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)

for line in subproc.stdout:
    print line,

4
投票

从 Python 迭代器提供子进程的标准输入:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE

with Popen("sink", stdin=PIPE, bufsize=-1) as process:
    for chunk in input_iterator:
        process.stdin.write(chunk)

如果你想同时读取输出,那么你需要threads或async.io:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def writelines(writer, lines):
    # NOTE: can't use writer.writelines(lines) here because it tries to write
    # all at once
    with closing(writer):
        for line in lines:
            writer.write(line)
            await writer.drain()

async def main():
    input_iterator = (b"hello %d\n" % x for x in range(100000000))
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(writelines(process.stdin, input_iterator))
    async for line in process.stdout:
        sys.stdout.buffer.write(line)
    return await process.wait()

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()  # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with closing(loop):
    sys.exit(loop.run_until_complete(main()))

0
投票

按照这个食谱它是子进程的一个附加组件,支持异步I/O。不过,这仍然要求您的子进程使用其输出的一部分来响应每个输入行或一组行。


0
投票

https://github.com/uktrade/iterable-subprocess (完全公开:由我创建)可以做到这一点。例如:

from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))

with iterable_subprocess(['cat'], input_iterator) as output:
    for chunk in output:
        print(chunk)

虽然这不会输出字符串行,但会输出字节块,不一定会分成行。要制作可迭代的行,您可以在 https://stackoverflow.com/a/70639580/1319998

集成答案的变体
import io
from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))

class FileLikeObject(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def readable(self):
        return True
    def read(self, _):
        return next(self.it, b'')

with iterable_subprocess(['cat'], input_iterator) as output:
    for line in io.TextIOWrapper(FileLikeObject(output), newline="", encoding="utf-8"):
        print(line)
© www.soinside.com 2019 - 2024. All rights reserved.