快速并行读取进程standardOutput和standardError而不阻塞

问题描述 投票:0回答:2

在 swift5 中想运行

Process()
读取
standardOutput
standardError
而不会阻塞,这样我就可以解析它们。

此示例代码一旦调用带有

for try await line in errorPipe.fileHandleForReading.bytes.lines
的行,程序执行就会被阻止。标准输出阅读器停止打印


import Foundation

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()
process.executableURL = URL(fileURLWithPath:"/sbin/ping")
process.arguments = ["google.com"]
process.standardOutput = outputPipe
process.standardError = errorPipe

try? process.run()

func processStdOut() async
{
  for i in 0..<5 {
    print("processStdOut X ", i)
    try? await Task.sleep(nanoseconds: 1_000_000_000)
  }

  do {
    for try await line in outputPipe.fileHandleForReading.bytes.lines {
      print("stdout Line: \(line)")
    }
  } catch {
    NSLog("processStdOut Error \(error.localizedDescription)")
  }
  NSLog("processStdOut finished")

}

func processStdErr() async
{
  for i in 0..<5 {
    print("processStdErr X ", i)
    try? await Task.sleep(nanoseconds: 2_000_000_000)
  }
  do {
    for try await line in errorPipe.fileHandleForReading.bytes.lines {
      print("stderr Line: \(line)")
    }
  } catch {
    NSLog("processStdErr Error \(error.localizedDescription)")
  }
  NSLog("processStdErr finished")
}

await withTaskGroup(of: Void.self) { group in
  group.addTask {
    await processStdErr()
  }
  group.addTask {
    await processStdOut()
  }
  group.addTask {
    process.waitUntilExit()
  }
}

请注意,如果通过断开 wifi 或网络强制数据进入 standardError,则 standardOutput 会再次解锁。

我还应该尝试什么吗?

swift multithreading macos asynchronous concurrency
2个回答
1
投票

大多数程序默认默认缓冲策略,并且由于您无法控制

/sbin/ping
如何处理输出,其中一个管道可能会阻止
FileHandle.AsyncBytes
实现(不确定原因)。我通过调用
.availableData
来同时使用两个管道,以避免阻塞。

import Foundation

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()

process.executableURL = URL(fileURLWithPath: "/sbin/ping")
process.arguments = ["-c", "10", "diariosur.es"]
process.standardOutput = outputPipe
process.standardError = errorPipe

try? process.run()

func processStdOut() async {
    print("stdout start")
    
    while process.isRunning {
        let data = outputPipe.fileHandleForReading.availableData
        if !data.isEmpty {
            if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
                print("stdout data: \(line)")
            }
        }
        
    }
    
    print("stdout finished")
}

func processStdErr() async {
    print("stderr start")
    
    while process.isRunning {
        let data = errorPipe.fileHandleForReading.availableData
        if !data.isEmpty {
            if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
                print("stderr data: \(line)")
            }
        }
    }
    
    print("stderr finished")
}
    
    await withTaskGroup(of: Void.self) { group in
        group.addTask {
            await processStdErr()
        }
        
        group.addTask {
            await processStdOut()
        }
    }
    
    process.waitUntilExit()
    

然后我得到以下(修剪后的)输出:

stderr start
stdout start
stdout data: PING diariosur.es (23.213.41.6): 56 data bytes
64 bytes from 23.213.41.6: icmp_seq=0 ttl=57 time=7.060 ms
stdout data: 64 bytes from 23.213.41.6: icmp_seq=1 ttl=57 time=6.562 ms
...
stdout data: 64 bytes from 23.213.41.6: icmp_seq=9 ttl=57 time=7.904 ms
stdout data: --- diariosur.es ping statistics ---
10 packets transmitted, 10 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 6.562/7.327/9.439/0.783 ms
stdout finished
stderr finished

我尝试使用默认为

curl
stderr
并且它也不会阻塞:

process.executableURL = URL(fileURLWithPath: "/usr/bin/curl")
process.arguments = ["-N", "--output", "test.zsync", "http://ubuntu.mirror.digitalpacific.com.au/releases/23.04/ubuntu-23.04-desktop-amd64.iso.zsync"]

编辑:

使用以下 C 程序进行测试:

#include <stdio.h>
#include <unistd.h>

int main() {
    for (int i = 1; i <= 100; ++i) {
        fprintf(stdout, "stdout: %d\n", i); 
        fflush(stdout); 
    
        if (i % 10 == 0) {
                fprintf(stderr, "stderr: %d\n", i); 
                fflush(stderr);
        }   
        usleep(100000);
    }   
    
    return 0;
}

它返回以下输出:

stdout start
stderr start
stdout data: stdout: 1
stdout data: stdout: 2
stdout data: stdout: 3
stdout data: stdout: 4
stdout data: stdout: 5
stdout data: stdout: 6
stdout data: stdout: 7
stdout data: stdout: 8
stdout data: stdout: 9
stderr data: stderr: 10
stdout data: stdout: 10
stdout data: stdout: 11
stdout data: stdout: 12
stdout data: stdout: 13
stdout data: stdout: 14
stdout data: stdout: 15
stdout data: stdout: 16
stdout data: stdout: 17
stdout data: stdout: 18
stdout data: stdout: 19
stderr data: stderr: 20

0
投票

是的,当在

bytes
上同时使用
standardOutput
时,标准
bytes
实现似乎也可以阻止
standardError

这是一个简单的

bytes
实现,不会阻塞,因为它利用了
readabilityHandler
:

extension Pipe {
    struct AsyncBytes: AsyncSequence {
        typealias Element = UInt8

        let pipe: Pipe

        func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
            AsyncStream { continuation in
                pipe.fileHandleForReading.readabilityHandler = { handle in
                    for byte in handle.availableData {
                        continuation.yield(byte)
                    }
                }

                continuation.onTermination = { _ in
                    pipe.fileHandleForReading.readabilityHandler = nil
                }
            }.makeAsyncIterator()
        }
    }

    var bytes: AsyncBytes { AsyncBytes(pipe: self) }
}

因此,当同时处理

standardOutput
standardError
时,以下不会遇到相同的问题:

let outputPipe = Pipe()
let errorPipe = Pipe()
let inputPipe = Pipe()

let process = Process()
process.executableURL = URL(fileURLWithPath: …)
process.standardOutput = outputPipe
process.standardError = errorPipe
process.standardInput = inputPipe

try process.run()

func processStandardOutput() async throws {
    for try await line in outputPipe.bytes.lines {
        …
    }
}

func processStandardError() async throws {
    for try await line in errorPipe.bytes.lines {
        …
    }
}

process.terminationHandler = { process in
    exit(process.terminationStatus)
}

try? await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask {
        try await processStandardOutput()
    }
    
    group.addTask {
        try await processStandardError()
    }
    
    …
}
© www.soinside.com 2019 - 2024. All rights reserved.