Kotlin 协程 + opentelemetry 失去上下文并获得 NPE

问题描述 投票:0回答:1

我正在尝试了解如何使用 spring webflux 和 opentelemetry 与 kotlin 协程一起工作。

当我尝试在协程中执行一些日志时,我在

kotlin.coroutines.jvm.internal.ContinuationImpl.getContext
上收到 NPE。但是,如果我从 runBlocking 或没有协程执行日志 - 一切都会很好。

我尝试将不同的范围传递给协程,例如

MDCContext()
Context.current().asContextElement()
,但它没有解决我的问题。

堆栈跟踪

2024-09-28 16:46:05.531 [reactor-http-nio-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [05106c0b-1]  500 Server Error for HTTP GET "/hello" [traceId=82be7888152bd850658435ee9fc73b97, customId=]
java.lang.NullPointerException: null
    at kotlin.coroutines.jvm.internal.ContinuationImpl.getContext(ContinuationImpl.kt:105)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLiftFuseable] :
    reactor.core.publisher.Mono.flatMap(Mono.java:3179)
    org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:185)
Error has been observed at the following site(s):
    *________Mono.flatMap ⇢ at org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:185)
    *__________Mono.defer ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:260)
    *___________Mono.then ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:260)
    |_      Mono.doOnNext ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:261)
    |_ Mono.onErrorResume ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:262)
    |_ Mono.onErrorResume ⇢ at org.springframework.web.reactive.DispatcherHandler.handleResultMono(DispatcherHandler.java:168)
    |_       Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handleResultMono(DispatcherHandler.java:172)
    *__________Mono.error ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handleException(RequestMappingHandlerAdapter.java:317)
    *__________Mono.error ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handleException(RequestMappingHandlerAdapter.java:317)
    *________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:154)
    *__________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:106)
    *__________checkpoint ⇢ io.opentelemetry.instrumentation.spring.webflux.v5_3.TelemetryProducingWebFilter [DefaultWebFilterChain]
    *__________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:106)
    |_     Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
    |_ Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:85)
    |_     Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
    *__________Mono.error ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler$CheckpointInsertingHandler.handle(ExceptionHandlingWebHandler.java:106)
    |_         checkpoint ⇢ HTTP GET "/hello" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at kotlin.coroutines.jvm.internal.ContinuationImpl.getContext(ContinuationImpl.kt:105)
        at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:260)
        at service.app.MyReactiveController.hello$suspendImpl(MyReactiveController.kt:31)
        at service.app.MyReactiveController.hello(MyReactiveController.kt)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297)
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onComplete(TracingSubscriber.java:92)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:189)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onComplete(TracingSubscriber.java:92)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:121)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:259)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:865)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2331)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:339)
        at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onSubscribe(FluxConcatMapNoPrefetch.java:164)
        at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at io.opentelemetry.instrumentation.spring.webflux.v5_3.TelemetryProducingWebFilter$TelemetryWrappedMono.subscribe(TelemetryProducingWebFilter.java:70)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
        at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1176)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:715)
        at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:481)
        at reactor.netty.http.server.HttpServerOperations.handleDefaultHttpRequest(HttpServerOperations.java:829)
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:774)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1583)

所以,这是我的配置和解决此问题的最低代码。

控制器

import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.coroutineScope
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController

@RestController
class MyReactiveController  {

    private val logger = KotlinLogging.logger {}

    @GetMapping("/hello")
    suspend fun hello(): String {
        logger.info { "Log from controller" }

        // NPE here
        coroutineScope {
            logger.info { "Log from another coroutine" }
        }
        return "Hello World"
    }
}

构建.gradle.kts

plugins {
    kotlin("jvm") version "2.0.20"
    id("org.springframework.boot") version "3.3.3"
    id("io.spring.dependency-management") version "1.1.6"
    kotlin("plugin.spring") version "2.0.20"
}

group = "org.example"
version = "1.0-SNAPSHOT"

repositories {
    mavenLocal()
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom:2.6.0")
    }
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-webflux")

    implementation("io.github.oshai:kotlin-logging:7.0.0")

    implementation("io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter")
    runtimeOnly("io.opentelemetry.instrumentation:opentelemetry-logback-mdc-1.0:2.6.0-alpha")

    implementation("io.micrometer:micrometer-tracing-bridge-otel")
    implementation("io.micrometer:micrometer-registry-prometheus")
    implementation("io.opentelemetry:opentelemetry-exporter-zipkin")
    implementation("org.springframework.boot:spring-boot-starter-actuator")


    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.9.0")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j:1.9.0")
    implementation("io.opentelemetry:opentelemetry-extension-kotlin")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    
    testImplementation(kotlin("test"))
}

tasks.test {
    useJUnitPlatform()
}
kotlin {
    jvmToolchain(21)
}
spring-webflux kotlin-coroutines project-reactor open-telemetry otel
1个回答
0
投票

我相信正确的方法是将响应包装在

kotlinx.coroutines.reactor.mono
中。

import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.reactor.mono
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController

@RestController
class MyReactiveController  {

    private val logger = KotlinLogging.logger {}

    @GetMapping("/hello")
    fun hello() = mono {
        logger.info { "Log from controller" }

        // NPE is now gone
        coroutineScope {
            logger.info { "Log from another coroutine" }
        }

        "Hello World"
    }
}

代码已在我的电脑上验证。

© www.soinside.com 2019 - 2024. All rights reserved.