我可以使用 suspendCoroutine 或 suspendCancellableCoroutine 来处理回调(将回调转换为挂起函数):但是如何多次发送结果。使用流量也不起作用。
override suspend fun foo(p0: SomeType): String = suspendCancellableCoroutine { continuation ->
val process = object: CallbackFunction() {
override fun onData(bytes:Long) {
// how to post onData which is called multiple times
}
override fun onComplete() {
continuation.resumeWith(Result.success())
}
override fun onFailure(exp:Exception) {
continuation.resumeWithException(exp)
}
}
}
如果我在 onData 上发布延续,它将会崩溃,因为它已经恢复了。
请尝试帮助解决这个问题。
挂起函数只能返回一项,因此需要使用流。
在我看来,您正在接收一系列长字节,并且需要将它们合并到一个字符串中。因此,您可以创建一个返回
Flow<Long>
的函数,然后在挂起函数中创建并使用此 Flow。
private fun getFooBytes(p0: SomeType): Flow<Long> = callbackFlow {
val callback = object: CallbackFunction() {
override fun onData(bytes:Long) {
trySend(bytes)
}
override fun onComplete() {
channel.close() // complete the flow
}
override fun onFailure(exp:Exception) {
cancel(CancellationException("Callback failed", exp)) // end flow prematurely
}
}
withContext(Dispatchers.Main) { // only if registerCallback() isn't thread-safe
someApi.registerCallback(callback)
}
awaitClose {
withContext(Dispatchers.Main) { // only if unregisterCallback() isn't thread-safe
someApi.unregisterCallback(callback)
}
}
}.buffer(Channel.UNLIMITED) // ensures trySend() will always succeed
然后在挂起函数内消耗流量:
override suspend fun foo(p0: SomeType): String {
val longs = getFooBytes(p0)
.toList() // this collects the flow under the hood
val byteBuffer = ByteBuffer.allocate(longs.size * Long.SIZE_BYTES)
with(byteBuffer.asLongBuffer()) { longs.forEach(::put) }
return String(
byteBuffer.array(),
Charsets.UTF_8 // for example
)
}