我正在尝试了解如何使用 spring webflux 和 opentelemetry 与 kotlin 协程一起工作。
当我尝试在协程中执行一些日志时,我在
kotlin.coroutines.jvm.internal.ContinuationImpl.getContext
上收到 NPE。但是,如果我从 runBlocking 或没有协程执行日志 - 一切都会很好。
我尝试将不同的范围传递给协程,例如
MDCContext()
,Context.current().asContextElement()
,但它没有解决我的问题。
堆栈跟踪
2024-09-28 16:46:05.531 [reactor-http-nio-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [05106c0b-1] 500 Server Error for HTTP GET "/hello" [traceId=82be7888152bd850658435ee9fc73b97, customId=]
java.lang.NullPointerException: null
at kotlin.coroutines.jvm.internal.ContinuationImpl.getContext(ContinuationImpl.kt:105)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoLiftFuseable] :
reactor.core.publisher.Mono.flatMap(Mono.java:3179)
org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:185)
Error has been observed at the following site(s):
*________Mono.flatMap ⇢ at org.springframework.web.reactive.result.method.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:185)
*__________Mono.defer ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:260)
*___________Mono.then ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:260)
|_ Mono.doOnNext ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:261)
|_ Mono.onErrorResume ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handle(RequestMappingHandlerAdapter.java:262)
|_ Mono.onErrorResume ⇢ at org.springframework.web.reactive.DispatcherHandler.handleResultMono(DispatcherHandler.java:168)
|_ Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handleResultMono(DispatcherHandler.java:172)
*__________Mono.error ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handleException(RequestMappingHandlerAdapter.java:317)
*__________Mono.error ⇢ at org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerAdapter.handleException(RequestMappingHandlerAdapter.java:317)
*________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:154)
*__________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:106)
*__________checkpoint ⇢ io.opentelemetry.instrumentation.spring.webflux.v5_3.TelemetryProducingWebFilter [DefaultWebFilterChain]
*__________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:106)
|_ Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
|_ Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:85)
|_ Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
*__________Mono.error ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler$CheckpointInsertingHandler.handle(ExceptionHandlingWebHandler.java:106)
|_ checkpoint ⇢ HTTP GET "/hello" [ExceptionHandlingWebHandler]
Original Stack Trace:
at kotlin.coroutines.jvm.internal.ContinuationImpl.getContext(ContinuationImpl.kt:105)
at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:260)
at service.app.MyReactiveController.hello$suspendImpl(MyReactiveController.kt:31)
at service.app.MyReactiveController.hello(MyReactiveController.kt)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:198)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onComplete(TracingSubscriber.java:92)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:189)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onComplete(TracingSubscriber.java:92)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:121)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:259)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:865)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onNext(TracingSubscriber.java:68)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2331)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:339)
at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onSubscribe(FluxConcatMapNoPrefetch.java:164)
at io.opentelemetry.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:61)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at io.opentelemetry.instrumentation.spring.webflux.v5_3.TelemetryProducingWebFilter$TelemetryWrappedMono.subscribe(TelemetryProducingWebFilter.java:70)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1176)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:715)
at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:481)
at reactor.netty.http.server.HttpServerOperations.handleDefaultHttpRequest(HttpServerOperations.java:829)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:774)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
所以,这是我的配置和解决此问题的最低代码。
控制器
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.coroutineScope
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
@RestController
class MyReactiveController {
private val logger = KotlinLogging.logger {}
@GetMapping("/hello")
suspend fun hello(): String {
logger.info { "Log from controller" }
// NPE here
coroutineScope {
logger.info { "Log from another coroutine" }
}
return "Hello World"
}
}
构建.gradle.kts
plugins {
kotlin("jvm") version "2.0.20"
id("org.springframework.boot") version "3.3.3"
id("io.spring.dependency-management") version "1.1.6"
kotlin("plugin.spring") version "2.0.20"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenLocal()
mavenCentral()
}
dependencyManagement {
imports {
mavenBom("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom:2.6.0")
}
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("io.github.oshai:kotlin-logging:7.0.0")
implementation("io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter")
runtimeOnly("io.opentelemetry.instrumentation:opentelemetry-logback-mdc-1.0:2.6.0-alpha")
implementation("io.micrometer:micrometer-tracing-bridge-otel")
implementation("io.micrometer:micrometer-registry-prometheus")
implementation("io.opentelemetry:opentelemetry-exporter-zipkin")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.9.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j:1.9.0")
implementation("io.opentelemetry:opentelemetry-extension-kotlin")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
testImplementation(kotlin("test"))
}
tasks.test {
useJUnitPlatform()
}
kotlin {
jvmToolchain(21)
}
我相信正确的方法是将响应包装在
kotlinx.coroutines.reactor.mono
中。
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.reactor.mono
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
@RestController
class MyReactiveController {
private val logger = KotlinLogging.logger {}
@GetMapping("/hello")
fun hello() = mono {
logger.info { "Log from controller" }
// NPE is now gone
coroutineScope {
logger.info { "Log from another coroutine" }
}
"Hello World"
}
}
代码已在我的电脑上验证。