“价值永远不会被用作发布者”

问题描述 投票:0回答:1

我目前正在开发一个 Spring Boot 项目。我已连接到 Couchbase,并且想要将文档更新插入其中。在我的存储库层中,我使用 upsert() 方法。以下是我的存储库层:

import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.ReactiveCollection
import com.trendyol.productstockapi.entity.ProductStock
import org.springframework.stereotype.Repository

@Repository
class ProductStockRepository (
    private val cluster: ReactiveCluster,
    private val productStockCollection: ReactiveCollection
){

    fun upsertProductStock(productStock: ProductStock){
        val result = productStockCollection.upsert(
            productStock.stockId,
            productStock
        )
    }

    fun deleteProductStock(productStockId: String) {
        val result = productStockCollection.remove(productStockId)
    }

}

以下是 Couchbase 配置:

import com.couchbase.client.core.cnc.tracing.NoopRequestTracer
import com.couchbase.client.core.env.CompressionConfig
import com.couchbase.client.core.env.IoEnvironment
import com.couchbase.client.core.env.OrphanReporterConfig
import com.couchbase.client.core.env.TimeoutConfig
import com.couchbase.client.java.ClusterOptions
import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.ReactiveCollection
import com.couchbase.client.java.codec.JacksonJsonSerializer
import com.couchbase.client.java.env.ClusterEnvironment
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Duration

@Configuration
@EnableConfigurationProperties(CouchbaseConfigurationProperties::class)
class CouchbaseConfiguration(
    private val couchbaseConfigurationProperties: CouchbaseConfigurationProperties,
    private val objectMapper: ObjectMapper?,
) {

    @Bean
    fun clusterEnvironment(): ClusterEnvironment {
        return ClusterEnvironment
            .builder()
            .jsonSerializer(JacksonJsonSerializer.create(objectMapper))
            .ioEnvironment(IoEnvironment.builder().eventLoopThreadCount(Runtime.getRuntime().availableProcessors()))
            .compressionConfig(CompressionConfig.builder().enable(true))
            .requestTracer(NoopRequestTracer.INSTANCE)
            .orphanReporterConfig(OrphanReporterConfig.builder().emitInterval(Duration.ofSeconds(60)))
            .timeoutConfig(
                TimeoutConfig.builder()
                    .kvTimeout(couchbaseConfigurationProperties.connection.kvTimeout)
                    .connectTimeout(couchbaseConfigurationProperties.connection.connectTimeout)
                    .queryTimeout(couchbaseConfigurationProperties.connection.queryTimeout)
            )
            .build()
    }

    @Bean
    fun cluster(clusterEnvironment: ClusterEnvironment): ReactiveCluster {
        val clusterOptions = ClusterOptions
            .clusterOptions(couchbaseConfigurationProperties.secrets.cbUsername, couchbaseConfigurationProperties.secrets.cbPassword)
            .environment(clusterEnvironment)

        return ReactiveCluster.connect(couchbaseConfigurationProperties.hosts.joinToString(","), clusterOptions)
    }

    @Bean
    fun productStockCollection(cluster: ReactiveCluster): ReactiveCollection {
        return cluster.bucket(couchbaseConfigurationProperties.productContentBucket).collection("stock")

    }


}

问题是当我将鼠标悬停在upsert()或remove()方法上时,我收到一条警告

Value is never used as Publisher

我的 Couchbase 版本是

com.couchbase.client:java-client:3.2.4

我一直无法想出任何解决方案。

spring-boot kotlin couchbase
1个回答
10
投票

ReactiveCollection.upsert()
返回
Mono
。在发生任何事情之前,您需要订阅 Mono。 IntelliJ 警告您,您没有订阅 Mono(这是一种发布者)。

让代码正常工作的最简单方法是调用

Mono.block()
,它订阅 Mono,并阻塞当前线程,直到 Mono 发出一个值:

fun upsertProductStock(productStock: ProductStock){
    val result = productStockCollection.upsert(
        productStock.stockId,
        productStock
    ).block()
}

但是,这样的阻塞效率不高。 (如果你愿意阻塞当前线程,你不妨使用 Couchbase SDK 的阻塞 API,而不是反应式 API。)由于你使用的是 Kotlin,你可以将

upsertProductStock
变成一个挂起函数,然后挂起Mono 工作时的阻塞。

对于这个技巧,您需要添加 kotlinx-coroutines-reactive 作为项目的依赖项。

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactive</artifactId>
    <version>${kotlin.coroutines.version}</version>
</dependency>

然后你可以写:

suspend fun upsertProductStock(productStock: ProductStock) {
    productStockCollection.upsert(
        productStock.stockId,
        productStock
    ).awaitSingle()
}

最后,除非您出于某种原因需要使用 Couchbase Java SDK(例如您可能正在使用 Spring Data Couchbase 并且想要共享相同的连接),请考虑使用 Couchbase Kotlin SDK 而不是 Java SDK。 Couchbase Kotlin SDK 的函数自然就是挂起函数。使用 Kotlin SDK,您可以编写:

suspend fun upsertProductStock(productStock: ProductStock) {
    // In the Kotlin SDK, `upsert` is a suspend function
    productStockCollection.upsert(
        productStock.stockId,
        productStock
    )
}
© www.soinside.com 2019 - 2024. All rights reserved.