我目前正在开发一个 Spring Boot 项目。我已连接到 Couchbase,并且想要将文档更新插入其中。在我的存储库层中,我使用 upsert() 方法。以下是我的存储库层:
import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.ReactiveCollection
import com.trendyol.productstockapi.entity.ProductStock
import org.springframework.stereotype.Repository
@Repository
class ProductStockRepository (
private val cluster: ReactiveCluster,
private val productStockCollection: ReactiveCollection
){
fun upsertProductStock(productStock: ProductStock){
val result = productStockCollection.upsert(
productStock.stockId,
productStock
)
}
fun deleteProductStock(productStockId: String) {
val result = productStockCollection.remove(productStockId)
}
}
以下是 Couchbase 配置:
import com.couchbase.client.core.cnc.tracing.NoopRequestTracer
import com.couchbase.client.core.env.CompressionConfig
import com.couchbase.client.core.env.IoEnvironment
import com.couchbase.client.core.env.OrphanReporterConfig
import com.couchbase.client.core.env.TimeoutConfig
import com.couchbase.client.java.ClusterOptions
import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.ReactiveCollection
import com.couchbase.client.java.codec.JacksonJsonSerializer
import com.couchbase.client.java.env.ClusterEnvironment
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Duration
@Configuration
@EnableConfigurationProperties(CouchbaseConfigurationProperties::class)
class CouchbaseConfiguration(
private val couchbaseConfigurationProperties: CouchbaseConfigurationProperties,
private val objectMapper: ObjectMapper?,
) {
@Bean
fun clusterEnvironment(): ClusterEnvironment {
return ClusterEnvironment
.builder()
.jsonSerializer(JacksonJsonSerializer.create(objectMapper))
.ioEnvironment(IoEnvironment.builder().eventLoopThreadCount(Runtime.getRuntime().availableProcessors()))
.compressionConfig(CompressionConfig.builder().enable(true))
.requestTracer(NoopRequestTracer.INSTANCE)
.orphanReporterConfig(OrphanReporterConfig.builder().emitInterval(Duration.ofSeconds(60)))
.timeoutConfig(
TimeoutConfig.builder()
.kvTimeout(couchbaseConfigurationProperties.connection.kvTimeout)
.connectTimeout(couchbaseConfigurationProperties.connection.connectTimeout)
.queryTimeout(couchbaseConfigurationProperties.connection.queryTimeout)
)
.build()
}
@Bean
fun cluster(clusterEnvironment: ClusterEnvironment): ReactiveCluster {
val clusterOptions = ClusterOptions
.clusterOptions(couchbaseConfigurationProperties.secrets.cbUsername, couchbaseConfigurationProperties.secrets.cbPassword)
.environment(clusterEnvironment)
return ReactiveCluster.connect(couchbaseConfigurationProperties.hosts.joinToString(","), clusterOptions)
}
@Bean
fun productStockCollection(cluster: ReactiveCluster): ReactiveCollection {
return cluster.bucket(couchbaseConfigurationProperties.productContentBucket).collection("stock")
}
}
问题是当我将鼠标悬停在upsert()或remove()方法上时,我收到一条警告
Value is never used as Publisher
我的 Couchbase 版本是
com.couchbase.client:java-client:3.2.4
我一直无法想出任何解决方案。
ReactiveCollection.upsert()
返回冷Mono
。在发生任何事情之前,您需要订阅 Mono。 IntelliJ 警告您,您没有订阅 Mono(这是一种发布者)。
让代码正常工作的最简单方法是调用
Mono.block()
,它订阅 Mono,并阻塞当前线程,直到 Mono 发出一个值:
fun upsertProductStock(productStock: ProductStock){
val result = productStockCollection.upsert(
productStock.stockId,
productStock
).block()
}
但是,这样的阻塞效率不高。 (如果你愿意阻塞当前线程,你不妨使用 Couchbase SDK 的阻塞 API,而不是反应式 API。)由于你使用的是 Kotlin,你可以将
upsertProductStock
变成一个挂起函数,然后挂起Mono 工作时的阻塞。
对于这个技巧,您需要添加 kotlinx-coroutines-reactive 作为项目的依赖项。
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactive</artifactId>
<version>${kotlin.coroutines.version}</version>
</dependency>
然后你可以写:
suspend fun upsertProductStock(productStock: ProductStock) {
productStockCollection.upsert(
productStock.stockId,
productStock
).awaitSingle()
}
最后,除非您出于某种原因需要使用 Couchbase Java SDK(例如您可能正在使用 Spring Data Couchbase 并且想要共享相同的连接),请考虑使用 Couchbase Kotlin SDK 而不是 Java SDK。 Couchbase Kotlin SDK 的函数自然就是挂起函数。使用 Kotlin SDK,您可以编写:
suspend fun upsertProductStock(productStock: ProductStock) {
// In the Kotlin SDK, `upsert` is a suspend function
productStockCollection.upsert(
productStock.stockId,
productStock
)
}