我正在开发一个项目,其中 Android 应用程序通过服务器发送事件 (SSE) 连接到 Ktor 服务器。每当 Arduino 传感器发出新数据时,服务器就会不断发送更新。但是,我的 Android 客户端仅收集一次发射,然后停止监听。我在邮递员上测试了后端的端点,它工作正常这是我的设置: 服务器端代码(Ktor):
fun Application.configureRouting() {
val dataFlow = MutableSharedFlow<WhetherModule>(replay = 1, extraBufferCapacity = Int.MAX_VALUE)
routing {
route("/whether") {
post {
val receive = call.receive<WhetherModule>()
val insert: WhetherModule? = whetherDaoImpl.insertData(receive)
if (insert != null) {
dataFlow.emit(insert)
call.respond(HttpStatusCode.Created, insert)
} else {
call.respond(HttpStatusCode.BadRequest, "Something went wrong")
}
}
}
sse("/events") {
try {
dataFlow.collect { data ->
send(ServerSentEvent(data = Json.encodeToString(data), event = "data-event"))
}
} catch (e: Exception) {
log.error("Error during SSE collection: ${e.message}")
}
}
}
}
客户端代码
class WhetherServer @Inject constructor(private val httpClient: HttpClient) {
fun getServerEvent() = flow<Resource<WhetherModule>> {
Log.d(
"receiveSSE",
"outside the fun block->" + Thread.currentThread().name
) //DefaultDispatcher-worker-3
httpClient.sse(
port = 8080,
path = "events"
) {
incoming.collect { event: ServerSentEvent ->
Log.d(
"receiveSSE",
"inside the fun block->" + Thread.currentThread().name
)
Log.d("ali osman", event.toString())
if (event.data != null) {
val decodeFromString = Json.decodeFromString<WhetherModule>(event.data!!)
emit(Resource.Success(decodeFromString))
}
}
}
}
.onCompletion { cause ->
val log : String = cause?.message ?: "goodbye"
Log.d("onCompletion", log)
}.catch { cause: Throwable ->
emit(Resource.Error(cause))
}
viewModel 代码
@HiltViewModel
class PostViewModel @Inject constructor(val postServer: PostServer) : ViewModel() {
private var _wheatherState = MutableStateFlow<Resource<WhetherModule>?>(null)
val _wheatherState = _wsValue.asStateFlow()
init {
viewModelScope.launch {
Log.d("viewModelScope", coroutineContext.toString())
// receivePost()
//receiveData()
receiveSSE ()
}
}
private suspend fun receiveSSE (){
postServer.getServerEvent2().collect { e->
_wsValue.value = e
}
}
}
在客户端,我尝试使用
channelFlow
来处理SSE收集逻辑,但没有解决问题。我怀疑这是因为服务器上的 sse()
函数在不同的协程作用域中运行,导致 ViewModel
的协程在第一次发射后终止 Flow
集合。我希望 Android 客户端能够在每次发出新数据时不断从服务器收集和接收数据。
有什么建议吗:)
如果您的 Android 客户端仅从 Ktor 服务器发送事件 (SSE) 服务器接收单个发射,则该问题可能源于与服务器配置或客户端实现相关的各种原因。