迭代方法中的通量内容,直到找到非空列表。 Spring WebFLux

问题描述 投票:0回答:1

我很难提出这个问题,因为我没有找到与我类似的用例,我刚刚开始使用WebFLux。 我有一个方法

getIdEntrevista
返回带有“id”列表的 Flux。 方法:

public Flux<IdEntrevista> getIdEntrevista(String perfil){
        return this.webClient.baseUrl("http://localhost:8081").build()
                .get()
                .uri("/api/orquestador/v1/entrevistador/public/entrevista_muestra_id?perfil="+perfil)
                .retrieve()
                .bodyToFlux(IdEntrevista.class);
    }

结果:

[
{
  "id": "6639711af44f9905dbdcd889"
},
{
  "id": "663971fdd44ffdd5dbdcd88b"
},
...(29 elements more)
]

我有另一种方法

getPreguntas
,一个从存储库函数返回Flux的方法
interviewTestDao.getPreguntas
,这个方法有一个id和限制作为属性,id是重要的东西。

    public Flux<SoloPreguntaImp> getPreguntas(String perfil, int limit) {
        //logica para obtener el idEntrevista
        //return this.interviewTestDao.getPreguntas("id",limit);
        return interviewTestDao.getPreguntas("6639711af44f9905dbdcd889",3);
    }

this.interviewTestDao.getPreguntas 的结果:

[
  {
    "pregunta": "¿Cuál es tu experiencia trabajando con tecnologías de Java y React en proyectos de desarrollo de software?"
  },
  {
    "pregunta": "¿Has liderado el diseño e implementación de aplicaciones de software complejas utilizando JavaScript, ReactJS y Java?"
  },
  {
    "pregunta": "¿Cómo garantizas la calidad del código, la mantenibilidad y escalabilidad de las aplicaciones de software en las que trabajas?"
  }
]

这是我的情况:我想迭代从

getIdEntrevista
获得的通量以获取 id 并在存储库方法中使用它,直到找到非空列表。这是因为有些 id 没有相关问题并返回空 []。 它必须尊重非阻塞原则。

我尝试了其他选择,但他们总是给我一个错误。

public Flux<SoloPreguntaImp> getPreguntas(String perfil, int limit) {
    return getIdEntrevista(perfil)
            .flatMap(idEntrevista ->  interviewTestDao.getPreguntas(idEntrevista.getId(), limit)
                    .collectList()
                    .flatMapMany(list -> {
                        if (list.isEmpty()) {
                            // Si el flujo está vacío, intenta nuevamente después de un período de tiempo
                            return Mono.error(new RuntimeException("Lista vacía"));
                        } else {
                            // Si el flujo no está vacío, emite la lista de preguntas
                            return Flux.fromIterable(list);
                        }
                    }))
            .repeatWhenEmpty(repeat -> repeat.delayElements(Duration.ofSeconds(5)))
      

提前谢谢您

java spring spring-webflux flux
1个回答
0
投票

您可以使用

reactor.core.publisher.Flux#next
,其文档如下:

仅将由此

Flux
发出的第一个项目发射到新的
Mono
中。如果在空的
Flux
上调用,则会发出空的
Mono

我做了这个小测试:

@RestController
public class SO78479713 {

  @GetMapping(path = "/78479713")
  public Mono<SoloPreguntaImp> getPreguntas(@RequestParam String perfil) {
    return getIdEntrevista(perfil)
        .flatMap(idEntrevista -> getPreguntasFromEntrevista(idEntrevista.id()))
        .next();
  }

  public Flux<IdEntrevista> getIdEntrevista(String perfil) {
    return Flux.just(new IdEntrevista("E_1_%s".formatted(perfil)),
                     new IdEntrevista("E_2_%s".formatted(perfil)),
                     new IdEntrevista("E_3_%s".formatted(perfil)),
                     new IdEntrevista("E_4_%s".formatted(perfil)),
                     new IdEntrevista("E_5_%s".formatted(perfil)),
                     new IdEntrevista("E_6_%s".formatted(perfil)),
                     new IdEntrevista("E_7_%s".formatted(perfil)));
  }

  public Flux<SoloPreguntaImp> getPreguntasFromEntrevista(String entrevistaId) {
    // The first few elements are empty, then we get two valid
    // pregunta, then the last one is empty as well.
    return switch (entrevistaId) {
      case "E_5_test" -> Flux.just(new SoloPreguntaImp("FOUND"));
      case "E_6_test" -> Flux.just(new SoloPreguntaImp("SHOULD NOT BE RETURNED"));
      default -> Flux.empty();
    };
  }

  record SoloPreguntaImp(String id) {

  }

  record IdEntrevista(String id) {

  }

}

它似乎按您的预期工作:

➜  ~ curl --silent -X GET --location "http://localhost:8080/78479713?perfil=test" | jq
{
  "id": "FOUND"
}
© www.soinside.com 2019 - 2024. All rights reserved.