并行执行多个Mono并组合结果而不阻塞

问题描述 投票:0回答:1

我正在编写一个微服务,它需要一些其他微服务。 正在调用的某些服务是独立的,而其他服务则依赖于前一个服务的响应。 (所以我认为我们需要链接)

到目前为止我所拥有的是

public class ServiceA {

private Mono<DataBuilder> getDatabuilder() {

  Mono<Result1> result1 = getService1();
  Mono<Result2> result2 = getService2();
  Mono<Result3> result3 = getService3();
  Mono<Result4> result4 = getService4();

  result4.flatmap(result -> { 
    Mono<Result5> result5 = getService5(result);
    if (!exists(id, result) {
        result5 = getService5(id);  
    }

  Mono<Result6> result6 = result5.flatMap(r -> {
    getService6(r);
  });

  Mono<Result7> result7 = result6.flatMap(rx -> {
     getService7(rx);
  });

 DataBuilder dataBuilder = DataBuilder.builder()

return Flux.merge(result1, result2, result3,
                   result4, result5, result6, result7)
           .collect(() -> dataBuilder, (builder, obj) ->
                   createBuilder(builder, obj, context))
           .flatMap(b -> Mono.just(b.build()));

   });
 }
}

// createBuilder 方法看起来像这样

私有 DataBuilder createBuilder(DataBuilder builder, Object obj) {

if (obj instanceof Result1) {
    builder.result1((Result1) obj);
} else if (obj instanceof Result2) {
    builder.result2((Result2) obj);
} else if (obj instanceof Result3) {
    builder.result3((Result3)obj);
} else if (obj instanceof Result4) {
    builder.result4((Result4) obj);
} else if (obj instanceof Result5) {
    builder.result5((Result5) obj);
} else if (obj instanceof Result5) {
    builder.result6((Result6) obj);
}

return builder;

}

在我的主课上我打电话

public class Main() {

ServiceA a = new ServiceA();
DataBuilder builder = a.getDatabuilder().block();

}

在这种情况下,一旦 DataBuilder 准备好,我就使用阻塞调用来读取它,但我没有看到任何更好的方法来执行此操作,因为前 3 个服务是独立的并且是并行调用的。接下来,从服务 4 到服务 7,所有服务都依赖于其他服务或彼此依赖。

DataBuilder 对象然后通过映射器传递并发送到下游。

除了阻塞调用之外,我没有看到任何方法可以在不阻塞的情况下从所有单声道获取输出。

reactive-programming project-reactor
1个回答
0
投票

链接请求的一个好方法是使用数据生成器,也可以用设计模式或地图替换此 if/else 墙,如下所示

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

public class ServiceA {

    private final Map<Class<?>, BiConsumer<DataBuilder, Object>> resultHandlers;

    public ServiceA() {
        resultHandlers = new HashMap<>();
        resultHandlers.put(Result1.class, (builder, obj) -> builder.result1((Result1) obj));
        resultHandlers.put(Result2.class, (builder, obj) -> builder.result2((Result2) obj));
        resultHandlers.put(Result3.class, (builder, obj) -> builder.result3((Result3) obj));
        resultHandlers.put(Result4.class, (builder, obj) -> builder.result4((Result4) obj));
        resultHandlers.put(Result5.class, (builder, obj) -> builder.result5((Result5) obj));
        resultHandlers.put(Result6.class, (builder, obj) -> builder.result6((Result6) obj));
        resultHandlers.put(Result7.class, (builder, obj) -> builder.result7((Result7) obj));
    }

    public Mono<DataBuilder> getDataBuilder() {

        Mono<Result1> result1 = getService1();
        Mono<Result2> result2 = getService2();
        Mono<Result3> result3 = getService3();
        Mono<Result4> result4 = getService4();

        return result4.flatMap(result -> {
            Mono<Result5> result5 = exists(id, result) ? getService5(result) : getService5(id);

            return result5.flatMap(r5 -> {
                Mono<Result6> result6 = getService6(r5);
                return result6.flatMap(r6 -> {
                    Mono<Result7> result7 = getService7(r6);
                    return result7.flatMap(r7 -> {
                        DataBuilder dataBuilder = DataBuilder.builder();
                        return Flux.merge(result1, result2, result3, result4, result5, result6, result7)
                                .collect(() -> dataBuilder, (builder, obj) -> createBuilder(builder, obj))
                                .flatMap(b -> Mono.just(b.build()));
                    });
                });
            });
        });
    }

    private DataBuilder createBuilder(DataBuilder builder, Object obj) {
        BiConsumer<DataBuilder, Object> handler = resultHandlers.get(obj.getClass());
        if (handler != null) {
            handler.accept(builder, obj);
        }
        return builder;
    }

    // Stub methods for the service calls
    private Mono<Result1> getService1() {
        //service 1
    }

    private Mono<Result2> getService2() {
         //service 2
    }

    private Mono<Result3> getService3() {
        //service 3
    }

    private Mono<Result4> getService4() {
        //service 4
    }

    private Mono<Result5> getService5(Result4 result) {
        //service 5
    }

    private Mono<Result5> getService5(String id) {
        //service 5 call with id
    }

    private Mono<Result6> getService6(Result5 result) {
        //service 6
    }

    private Mono<Result7> getService7(Result6 result) {
        //service 7
    }

    private boolean exists(String id, Result4 result) {
        //check existence
    }
}

public class Main {

    public static void main(String[] args) {
        ServiceA a = new ServiceA();
        DataBuilder builder = a.getDataBuilder().block();
        // Use the builder as needed
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.