反应性管道的书写方式

问题描述 投票:0回答:1

我正在为返回诺言的方法编写方面。考虑以下方法:

public Mono<Stream> publishToKafka(Stream s) {
    //publishToKafka is asynchronous
    return Mono.just(s).flatMap(worker::publishToKafka);
}

我想缓存发布是否成功。由于这是一个贯穿各领域的问题,因此外观似乎是最好的设计。这是我的观点。

@Around("@annotation....")
public Object cache() {
    //get the data to cache from the annotation
    Object result = pjp.proceed();
    cache.cache("key","data");
    return result;
}

现在,由于publishToKafka是异步的,因此只要发生线程切换并调用cache.cache(),目标方法就会返回。这不是我想要的。我想要的是,如果事件已成功发布到Kafka,则应缓存结果。以下建议有效。

@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
    //get the data to cache from the annotation
    return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}

我想了解这里的情况。这会在管道的组装时间内发生吗?还是在我的建议中添加了pjp.proceed()运算符的执行时间(doOnNext返回一个Promise)?

在此示例的上下文中,我需要了解汇编与执行时间。

java rx-java aop spring-aop project-reactor
1个回答
0
投票

Spring AOP和AspectJ方面总是在与被拦截的连接点相同的线程中同步执行。因此,如果您截获的方法立即返回并且返回值类似于promise,future或没有(void)再加上回调,则您不能期望在方面的建议中神奇地获得异步结果。您确实需要使方面意识到异步情况。

© www.soinside.com 2019 - 2024. All rights reserved.