Spring Webflux 和 @Cacheable - 缓存 Mono / Flux 类型结果的正确方法

问题描述 投票:0回答:4

我正在学习 Spring WebFlux,在编写示例应用程序期间,我发现了与响应式类型(Mono/Flux)与 Spring Cache 相结合相关的问题。

考虑以下代码片段(Kotlin 中):

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id)
}

这种缓存方法调用返回 Mono 或 Flux 的方法有效且安全吗?也许还有其他一些原则可以做到这一点?

以下代码适用于 SimpleCacheResolver,但默认情况下无法使用 Redis,因为 Mono 不可序列化。为了使它们工作,例如需要使用 Kryo 序列化器。

spring-boot spring-cache spring-webflux
4个回答
45
投票

黑客之路

目前,

@Cacheable
与 Reactor 3 还没有流畅的集成。 但是,您可以通过添加
.cache()
运算符来返回
Mono

来绕过这个问题
@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Cacheable("tasks")
    fun get(id: String): Mono<Task> = taskRepository.findById(id).cache()
}

hack缓存并共享从

taskRepository
数据返回的内容。反过来,spring cacheable 将缓存返回的
Mono
的引用,然后返回该引用。换句话说,它是一个保存着缓存的 mono 的缓存:)。

Reactor 插件方式

Reactor 3 有一个 addition,它允许与现代内存缓存(如 caffeinejcache 等)流畅集成。使用该技术,您将能够轻松缓存数据:

@Repository
interface TaskRepository : ReactiveMongoRepository<Task, String>

@Service
class TaskService(val taskRepository: TaskRepository) {

    @Autowire
    CacheManager manager;


    fun get(id: String): Mono<Task> = CacheMono.lookup(reader(), id)
                                               .onCacheMissResume(() -> taskRepository.findById(id))
                                               .andWriteWith(writer());

    fun reader(): CacheMono.MonoCacheReader<String, Task> = key -> Mono.<Signal<Task>>justOrEmpty((Signal) manager.getCache("tasks").get(key).get())
    fun writer(): CacheMono.MonoCacheWriter<String, Task> = (key, value) -> Mono.fromRunnable(() -> manager.getCache("tasks").put(key, value));
} 

注意:Reactor 插件缓存自己的抽象,即

Signal<T>
,所以,不用担心这一点并遵循该约定


3
投票

我使用了 Oleh Dokuka 的 hacky 解决方案,效果很好,但有一个问题。您必须在 Flux 缓存中使用比 Cachable 缓存生存时间值更大的持续时间。如果你不使用 Flux 缓存的持续时间,它不会使其失效(Flux 文档说“将此 Flux 变成热源并缓存最后发出的信号以供进一步的订阅者使用。”)。 因此,将 Flux 缓存设置为 2 分钟,将生存时间设置为 30 秒可以是有效的配置。如果首先发生 ehcahce 超时,则会生成新的 Flux 缓存引用并使用它。


0
投票

截至今天,

Mono<T>
Flux<T>
的缓存按预期工作。请参阅spring缓存注释文档

这会按预期缓存给定键的返回任务:

@Cacheable("tasks")
public Mono<Task> getTask(String id) {...}

-2
投票

// 在外观中:

public Mono<HybrisResponse> getProducts(HybrisRequest request) {
    return Mono.just(HybrisResponse.builder().build());
}

// 在服务层:

@Cacheable(cacheNames = "embarkations")
public HybrisResponse cacheable(HybrisRequest request) {
    LOGGER.info("executing cacheable");
    return null;
}

@CachePut(cacheNames = "embarkations")
public HybrisResponse cachePut(HybrisRequest request) {
    LOGGER.info("executing cachePut");
    return hybrisFacade.getProducts(request).block();
}

// 在控制器中:

HybrisResponse hybrisResponse = null;

try {
   // get from cache
   hybrisResponse = productFeederService.cacheable(request);

} catch (Throwable e) {
   // if not in cache then cache it
   hybrisResponse = productFeederService.cachePut(request);
}

return Mono.just(hybrisResponse)
    .map(result -> ResponseBody.<HybrisResponse>builder()
        .payload(result).build())
    .map(ResponseEntity::ok);
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.