我混合了运行“遗留”代码(进程管道和回调块)的异步/等待(Actor)。我有一个与此类似的情况,我的演员的函数返回一个值,同时在完成某些工作后还存储一个延迟回调(演员在等待工作完成时不应阻塞,演员的目的只是支持 id 生成和内部需要线程安全的更新)。理想情况下,我想将回调构造为符合人体工程学的“异步”变体 我正在使用演员来保证线程安全。其中一个异步函数存储一个回调,一旦工作完成或发生超时,就会调用该回调。 目前陷入僵局以及“丑陋”的嵌套。
typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?
actor Processor {
let id = Int
var callbacks = [Int:Block]()
let process : Process // external process which communicates of stdin/stdout
init(processPath: String) async throws {
process = Process()
process.launchPath = processPath
process.standardInput = Pipe()
process.standardOutput = Pipe()
(process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
// does not support async/await directly, wrapping in a task, perhaps better mechanism?
Task {[weak self] in
await self?.handleOutput(data: handler.availableData)
}
}
}
struct Result: Codable {
id: Int ,
output: String
}
func handleOutput(data: Data) {
if let decoded = try? JSONDecoder().decode(Result.self, from: data),
let id = decoded.id ,
let callback = pop(id: id) {
await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
}
}
func work(text: String, completion: @escaping Block) async -> WorkArgs {
id += 1 // increment the running id
// store the callback
callbacks[id] = completion
// timeout if the result doesn't come back in a reasonable amount of time.
// I will perform a check after 1 second, and pop the callback if needed
// problematic here..., how to wait while also not blocking this function
Task { [weak self] in
// invalidate if needed
try await Task.sleep(nanoseconds: 1_000_000_000)
// dead lock here:
if let bl = await self?.pop(id: id) {
print("invalidated the task \(id)")
await bl(nil)
}
}
// call out to the external process with this convention, it will run async and print a result with the same id
let command = "\(id) \(text)\n"
let d = command(using: .utf8)
try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
}
// pop this callback
func pop(id: Int) -> Block? {
return callbacks.removeValue(forKey: id)
}
}
struct WorkArgs {
let id: Int
let date: Date
}
actor IdGen {
private var id : Int64 = 0
func next() -> Int64 {
id += 1
return id
}
}
actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
func push(_ key: Int, block: @escaping (String) -> Void) {
pendingCallbacks[key] = block
}
func pop(_ key: Int64) -> AutoCompleteBlock? {
return pendingCallbacks.removeValue(forKey: key)
}
}
在讨论超时问题之前,我们可能应该讨论一下如何在 Swift 并发中包装
Process
。
一种模式是使用
AsyncSequence
(即 AsyncStream
)来表示 stdout
:
actor ProcessWithStream {
private let process = Process()
private let stdin = Pipe()
private let stdout = Pipe()
private let stderr = Pipe()
private var buffer = Data()
init(url: URL) {
process.standardInput = stdin
process.standardOutput = stdout
process.standardError = stderr
process.executableURL = url
}
func start() throws {
try process.run()
}
func terminate() {
process.terminate()
}
func send(_ string: String) {
guard let data = "\(string)\n".data(using: .utf8) else { return }
stdin.fileHandleForWriting.write(data)
}
func stream() -> AsyncStream<Data> {
AsyncStream(Data.self) { continuation in
stdout.fileHandleForReading.readabilityHandler = { handler in
continuation.yield(handler.availableData)
}
process.terminationHandler = { handler in
continuation.finish()
}
}
}
}
然后您可以
for await
直播:
let process = ProcessWithStream(url: url)
override func viewDidLoad() {
super.viewDidLoad()
Task {
try await startStream()
print("done")
}
}
@IBAction func didTapSend(_ sender: Any) {
let string = textField.stringValue
Task {
await process.send(string)
}
textField.stringValue = ""
}
func startStream() async throws {
try await process.start()
let stream = await process.stream()
for await data in stream {
if let string = String(data: data, encoding: .utf8) {
print(string, terminator: "")
}
}
}
这是一个简单的方法。它看起来不错(因为我打印的响应没有终止符)。
但是需要小心,因为
readabilityHandler
并不总是会用某些特定输出的完整 Data
来调用。它可能会被分解或分成对 readabilityHandler
的单独调用。
另一种模式是使用
lines
,这可以避免 readabilityHandler
对于给定的输出可能被多次调用的问题:
actor ProcessWithLines {
private let process = Process()
private let stdin = Pipe()
private let stdout = Pipe()
private let stderr = Pipe()
private var buffer = Data()
private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
init(url: URL) {
process.standardInput = stdin
process.standardOutput = stdout
process.standardError = stderr
process.executableURL = url
}
func start() throws {
lines = stdout.fileHandleForReading.bytes.lines
try process.run()
}
func terminate() {
process.terminate()
}
func send(_ string: String) {
guard let data = "\(string)\n".data(using: .utf8) else { return }
stdin.fileHandleForWriting.write(data)
}
}
然后你可以这样做:
let process = ProcessWithLines(url: url)
override func viewDidLoad() {
super.viewDidLoad()
Task {
try await startStream()
print("done")
}
}
@IBAction func didTapSend(_ sender: Any) {
let string = textField.stringValue
Task {
await process.send(string)
}
textField.stringValue = ""
}
func startStream() async throws {
try await process.start()
guard let lines = await process.lines else { return }
for try await line in lines {
print(line)
}
}
这可以避免响应中线中断。
你问:
如何支持... Swift Actor 超时
该模式是将请求包装在
Task
中,然后启动一个单独的任务,该任务将在 Task.sleep
间隔后取消先前的任务。
但是在这种情况下,这将变得非常复杂,因为您必须将其与单独的
Process
协调,否则仍将继续进行,而不知道 Task
已被取消。从理论上讲,这可能会导致问题(例如,流程积压等)。
我建议将超时逻辑集成到由
Process
调用的应用程序中,而不是尝试让调用者处理它。这是可以完成的(例如,可以编写流程应用程序来捕获和处理 SIGINT
,然后调用者可以在 interrupt
上调用
Process
)。但它会很复杂,而且很可能很脆弱。