是否可以将 Reactor Context 传递给从 Sinks 创建的 Flux?

问题描述 投票:0回答:2

我们当前正在使用 ReactiveSecurityContextHolder,它可以获取正确的身份验证详细信息并在 Flux 流中使用。

现在我们想要解除耦合。第一次迭代是我们使用接收器作为中间“事件中心”。因此,我们从端点生成一些项目到 Sinks.Many。

侦听器正在使用此接收器中的事件并完成繁重的工作。现在,在这个消费者中,我想使用生产站点上可用的上下文。 我知道可以通过

deferContextual
将当前上下文传递给另一个 Flux。但是是否可以将上下文从 Sinks 传递给生成的 Flux?

提前致谢。

亚历克斯

java project-reactor
2个回答
1
投票

目前没有 API 可以在任意情况下公开该功能

Sinks
Sinks
的挑战在于,它们中的很多都多播到多个
Subscriber
,并且
Context
是在每个
Subscriber
上定义的。

不过有一个技巧:

Sinks.Many<T>
Scannable
,并且大多数具体实现应该通过
Stream<Scannable> inners()
方法公开其当前的订阅者集合。对于单播接收器,
scan(Attr.ACTUAL)
也可以工作。

两大警告:

  • 这些API仅公开
    Scannable
    ,不允许直接访问
    Context
  • 如果实现的内部订阅者不是
    Scannable
    ,它将在流中被
    Scannable#NOT_SCANNABLE
    常量替换

大多数(如果不是全部)

reactor-core
CoreSubscribers
Scannable
,但如果您连接不可扫描的自定义订阅者,即使它有
Context
,您也将无法看到它.

reactor-core 中的多播接收器倾向于将下游订阅者包装在它们自己的内部 Scannable 内部跟踪器中,这将使这种方法发挥作用。

单播接收器有点不同,因为它们直接连接到下游

Subscriber
。因此,如果它是
CoreSubscriber
但不知为何不是
Scannable
,您将无法将其视为
CoreSubscriber
并访问其
Context

总结一下方法:

  1. 致电
    sink.inners()
    获取
    Stream<Scannable>
  2. 确保值是
    CoreSubscriber
    的实例(这是可能出错的部分)
  3. 将值转换为
    CoreSubscriber
    并调用
    currentContext()
  4. 以某种方式协调您必须提取相关键值对的各种
    Context

0
投票

好问题 Alex Wouda 和好答案 Simon Baslé,这个答案纯粹希望提供更多“如果我们不能做到这一点,那么我们应该如何处理这个”的帮助。

我正在开发一个类似的多租户应用程序,我们使用“当前租户”的概念来决定连接到哪个数据库。大多数情况下,这工作得很好,如果它是一个 Web 请求等,我们会从 JWT 获取当前租户。但是,如果我们想从一堆不同的租户中提取数据,例如由计划生成的每日报告,该怎么办?跑步者?我们就是这样做的:

我们有一项服务将返回特定租户的所有用户和团队数据,它还有一个与之关联的 JWT 令牌(发送者)。 NB 请注意任何客户端可能需要的辅助上下文方法:

public interface TenantServiceReactive
{
  // service methods -----------------------------------------------------------
  
  public Mono<TenantDataResource> getTenant(String tenantId);
  
  // context helper methods ----------------------------------------------------
  
  public Context scheduler(String tenantId);
  
  public Context sender(TenantDataResource tenant);
}

然后,当我们使用它时,我们确保我们

flatMap*
Mono
或使用
Flux
contextWrite
,例如以下将报告每个租户有多少(聊天)房间:

  @PostMapping(path = { REPORT })
  public Flux<Tuple2<String, Long>> report() {
    
    // note the room counter pipe needs a "current tenant" we set later
    Mono<Long> roomCounter = this.roomRepository.count();
    
    // my wish list of tenants
    String[] ids = new String[] { "aws", "azure", "gcloud" };
    
    return Flux.fromArray(ids)
        
        .flatMap((id1) -> this.tenantService.getTenant(id1))
        
        .flatMap((t) -> {
          var ctx = this.tenantService.sender(t);
          return Mono.zip(Mono.just(id), roomCounter.contextWrite(ctx)); // NB
        });
  }

当然,这里没有显示的是我必须为我们使用的 Mongo 和 MySQL 数据库执行的配置,这些配置根据当前租户上下文切换数据库。请发表评论,如果需要,我很乐意扩展这个答案。

© www.soinside.com 2019 - 2024. All rights reserved.