将回调转换为协程,并在主线程上进行初始调用

问题描述 投票:0回答:1

我想将回调转为协程,SDK声明API调用需要在主线程进行。该解决方案有效,但我不确定原则上是否正确。

大致长这样:

   override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> {

        return suspendCancellableCoroutine { continuation ->

            val observer =
                object : Observer<RegisterResponse<Profile?>> {
               fun onNext(t: Profile) {
                 continuation.resume(Success(t))
           }

     CoroutineScope(Dispatchers.Main).launch {
         userManager.register(email, observer)
       }
}
}

在我看来,SDK 想要在主线程上调用观察者回调,但我的进程是在 IO 线程上的视图模型范围内触发的(以避免阻塞主线程)。所以我猜观察者实际上是在 IO 线程上运行。

关于如何解决这个问题的想法?

kotlin callback kotlin-coroutines dispatcher
1个回答
0
投票

只是为了解决这个问题,如果这个库为你提供了一个 ObservableSource 引用而不是让你传递一个观察者,你可以在上面使用

awaitFirst()
,这当然比你自己实现更简单。


避免这样做:

CoroutineScope(Dispatchers.Main).launch
这与使用
GlobalScope.launch(Dispatchers.Main)
本质上没有什么不同。它创建了一个未绑定(从未取消)的范围,这是内存泄漏的常见来源。如果调用这个挂起函数的协程被取消,你启动的另一个协程将不会被通知和取消,因为它不是孩子。

其次,其他协程不等待它——内部协程的事件可以在未来某个时间到来。

为确保您在主线程上注册您的 API,请围绕整个函数使用

withContext(Dispatchers.Main)
调用。然后,
suspendCancellableCoroutine
lambda 块将在主线程上运行,因此您将在主线程上调用 API 注册函数。

关于实现这个的其他一些要点:

  • Observer 有一个
    onSubscribe
    功能,可以给你一个 Disposable,你可以用它来提前取消。您需要这样做才能支持取消。
  • 多次调用
    continuation.resume()
    会使协程崩溃,因此您需要一些保护措施,以防 API 让您感到意外并发出多个项目。
  • 我为可能的订阅结束而没有发出任何东西的情况添加了另一个保护措施。
  • onError
    中,我还检查了
    continuation.isActive
    以避免在发射单个项目后在订阅结束前发生错误的可能情况下多次恢复崩溃。

由于 Kotlin 协同程序库是开源的,您可以看到他们是如何实现的

Observable.await
在这里以获取如何正确执行此类操作的示例。

解决方案应该类似于:

override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> = withContext(Dispatchers.Main) {
    suspendCancellableCoroutine { continuation ->
        val observer = object : Observer<RegisterResponse<Profile?>> {
            lateinit var subscription: Disposable
            var seenValue = false

            override fun onSubscribe(disposable: Disposable) {
                this.disposable = disposable
                continuation.invokeOnCancellation { disposable.dispose() }
            }

            override fun onNext(t: Profile) {
                 if (!seenValue) {
                     seenValue = true
                     continuation.resume(Success(t))
                     subscription.dispose()
                 }
            }

            override fun onComplete() {
                if (continuation.isActive && !seenValue) {
                    continuation.resume(Error(NoSuchElementException("Observer completed without emitting any value.")))
                }
            }

            override fun onError(throwable: Throwable) {
                if (continuation.isActive) continuation.resume(Error(throwable))
            }
        }
        userManager.register(email, observer)
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.