如何在 swift actor 中支持带超时的异步回调

问题描述 投票:0回答:1

我混合了运行“遗留”代码(进程管道和回调块)的异步/等待(Actor)。我有一个与此类似的情况,我的演员的函数返回一个值,同时在完成某些工作后还存储一个延迟回调(演员在等待工作完成时不应阻塞,演员的目的只是支持 id 生成和内部需要线程安全的更新)。理想情况下,我想将回调构造为符合人体工程学的“异步”变体 我正在使用演员来保证线程安全。其中一个异步函数存储一个回调,一旦工作完成或发生超时,就会调用该回调。 目前陷入僵局以及“丑陋”的嵌套。

typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?

actor Processor {
  let id = Int
  var callbacks = [Int:Block]()
  let process : Process // external process which communicates of stdin/stdout
  init(processPath: String) async throws {
    process = Process()
    process.launchPath = processPath
    process.standardInput = Pipe()
    process.standardOutput = Pipe()
    (process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
       // does not support async/await directly, wrapping in a task, perhaps better mechanism?
       Task {[weak self] in
          await self?.handleOutput(data: handler.availableData)
       }
    }
  }

  struct Result: Codable { 
     id: Int ,
     output: String
  }
  func handleOutput(data: Data) {
     if let decoded = try? JSONDecoder().decode(Result.self, from: data),
            let id = decoded.id ,
            let callback = pop(id: id) {
            await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
        }
  }
  func work(text: String, completion: @escaping Block) async -> WorkArgs {
     id += 1 // increment the running id
     // store the callback
     callbacks[id] = completion
     // timeout if the result doesn't come back in a reasonable amount of time. 
     // I will perform a check after 1 second, and pop the callback if needed
     // problematic here..., how to wait while also not blocking this function
     Task { [weak self] in
       // invalidate if needed
       try await Task.sleep(nanoseconds: 1_000_000_000)
       // dead lock here:
       if let bl = await self?.pop(id: id) {
            print("invalidated the task \(id)")
            await bl(nil)
       }
     }
     // call out to the external process with this convention, it will run async and print a result with the same id
     let command = "\(id) \(text)\n"
     let d = command(using: .utf8)

     try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
     
  }
  // pop this callback
  func pop(id: Int) -> Block? {
        return callbacks.removeValue(forKey: id)
  }
}

struct WorkArgs {
  let id: Int
  let date: Date
}

actor IdGen {
    private var id : Int64 = 0
    func next() -> Int64 {
        id += 1
        return id
    }
}

actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
    func push(_ key: Int, block: @escaping  (String) -> Void) {
        pendingCallbacks[key] = block
    }
    func pop(_ key: Int64) -> AutoCompleteBlock? {
        return pendingCallbacks.removeValue(forKey: key)
    }
}
swift async-await actor swift-concurrency
1个回答
5
投票

在讨论超时问题之前,我们可能应该讨论一下如何在 Swift 并发中包装

Process

  • 一种模式是使用

    AsyncSequence
    (即
    AsyncStream
    )来表示
    stdout

    actor ProcessWithStream {
        private let process = Process()
        private let stdin = Pipe()
        private let stdout = Pipe()
        private let stderr = Pipe()
        private var buffer = Data()
    
        init(url: URL) {
            process.standardInput = stdin
            process.standardOutput = stdout
            process.standardError = stderr
            process.executableURL = url
        }
    
        func start() throws {
            try process.run()
        }
    
        func terminate() {
            process.terminate()
        }
    
        func send(_ string: String) {
            guard let data = "\(string)\n".data(using: .utf8) else { return }
            stdin.fileHandleForWriting.write(data)
        }
    
        func stream() -> AsyncStream<Data> {
            AsyncStream(Data.self) { continuation in
                stdout.fileHandleForReading.readabilityHandler = { handler in
                    continuation.yield(handler.availableData)
                }
                process.terminationHandler = { handler in
                    continuation.finish()
                }
            }
        }
    }
    

    然后您可以

    for await
    直播:

    let process = ProcessWithStream(url: url)
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        Task {
            try await startStream()
            print("done")
        }
    }
    
    @IBAction func didTapSend(_ sender: Any) {
        let string = textField.stringValue
        Task {
            await process.send(string)
        }
        textField.stringValue = ""
    }
    
    func startStream() async throws {
        try await process.start()
        let stream = await process.stream()
    
        for await data in stream {
            if let string = String(data: data, encoding: .utf8) {
                print(string, terminator: "")
            }
        }
    }
    

    这是一个简单的方法。它看起来不错(因为我打印的响应没有终止符)。

    但是需要小心,因为

    readabilityHandler
    并不总是会用某些特定输出的完整
    Data
    来调用。它可能会被分解或分成对
    readabilityHandler
    的单独调用。

  • 另一种模式是使用

    lines
    ,这可以避免
    readabilityHandler
    对于给定的输出可能被多次调用的问题:

    actor ProcessWithLines {
        private let process = Process()
        private let stdin = Pipe()
        private let stdout = Pipe()
        private let stderr = Pipe()
        private var buffer = Data()
        private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
    
        init(url: URL) {
            process.standardInput = stdin
            process.standardOutput = stdout
            process.standardError = stderr
            process.executableURL = url
        }
    
        func start() throws {
            lines = stdout.fileHandleForReading.bytes.lines
            try process.run()
        }
    
        func terminate() {
            process.terminate()
        }
    
        func send(_ string: String) {
            guard let data = "\(string)\n".data(using: .utf8) else { return }
            stdin.fileHandleForWriting.write(data)
        }
    }
    

    然后你可以这样做:

    let process = ProcessWithLines(url: url)
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        Task {
            try await startStream()
            print("done")
        }
    }
    
    @IBAction func didTapSend(_ sender: Any) {
        let string = textField.stringValue
        Task {
            await process.send(string)
        }
        textField.stringValue = ""
    }
    
    func startStream() async throws {
        try await process.start()
        guard let lines = await process.lines else { return }
    
        for try await line in lines {
            print(line)
        }
    }
    

    这可以避免响应中线中断。


你问:

如何支持... Swift Actor 超时

该模式是将请求包装在

Task
中,然后启动一个单独的任务,该任务将在
Task.sleep
间隔后取消先前的任务。

但是在这种情况下,这将变得非常复杂,因为您必须将其与单独的

Process
协调,否则仍将继续进行,而不知道
Task
已被取消。从理论上讲,这可能会导致问题(例如,流程积压等)。

我建议将超时逻辑集成到由

Process
调用的应用程序中,而不是尝试让调用者处理它。这是可以完成的(例如,可以编写流程应用程序来捕获和处理
SIGINT
,然后调用者可以在 interrupt
 上调用 
Process
)。但它会很复杂,而且很可能很脆弱。

© www.soinside.com 2019 - 2024. All rights reserved.