我试图找出如何以非阻塞的方式简单地启动一些长时间运行的shell命令,并按照它们完成的顺序异步处理它们的输出,即使这是另一个订单而不是它们开始,使用Python 3.4中提供的asyncio python库并转发。
我找不到这样做的简单例子,即使在asyncio documentation itself中,它也似乎是相当低级的。
使用get_lines()
协同程序,以异步方式获取shell命令输出并将协同程序传递给asyncio.as_completed()
,以按照它们完成的顺序获得结果:
#!/usr/bin/env python3.5
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT
async def get_lines(shell_command):
p = await asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
return (await p.communicate())[0].splitlines()
async def main():
# get commands output concurrently
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
.format(i=i, e=sys.executable))
for i in reversed(range(5))]
for f in asyncio.as_completed(coros): # print in the order they finish
print(await f)
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
create_subprocess_shell
正是您要找的。它将返回一个Process
实例,你可以wait()
,或communicate()
与。
我和你的情况完全一样。在我的情况下,我在几个repo目录中运行多个git fetch
命令。
在第一次试验中,代码看起来像这样(并且cmds
是['git', 'fetch']
):
async def run_async(path: str, cmds: List[str]):
process = await asyncio.create_subprocess_exec(*cmds, cwd=path)
await process.wait()
此函数在一个repo上运行,调用者为多个repos创建任务并运行事件loop
来完成它们。
虽然程序运行并且磁盘上的结果是正确的,但来自不同存储库的fetch
输出是交错的。原因是await process.wait()
可以在任何时候IO阻塞(文件,网络等)时将控制权交还给调用者(循环调度程序)。
一个简单的更改修复它:
async def run_async(path: str, cmds: List[str]):
"""
Run `cmds` asynchronously in `path` directory
"""
process = await asyncio.create_subprocess_exec(
*cmds, stdout=asyncio.subprocess.PIPE, cwd=path)
stdout, _ = await process.communicate()
stdout and print(stdout.decode())
这里的理由是重新定向stdout
,使其在一个地方。就我而言,我只是打印出来。如果您需要输出,可以最后返回。
此外,打印顺序可能与启动顺序不同,在我的情况下这很好。
源代码是here on github。为了给出一些上下文,该项目是一个命令行工具,用于管理多个git repos,它从任何工作目录委派git命令执行。只有不到200行代码,应该很容易阅读。