Webflux:如何拆分事件,在每个组上应用不同的地图,然后将它们合并回来

问题描述 投票:0回答:1

我有一个项目列表,其中有 2 种类型的 id。我需要将它们分成 2 组,然后为一组调用另一种方法,该方法将丰富 id 并提供不同的 id,而另一组则返回相同的值。在执行上述操作后,我需要将两个组合并为单个流。但是当我应用

map
时,它会返回
Flux<Object>
并丢失类型信息。

Flux.fromIterable(events)
         .map(event -> serializer.deserialize(event, Payload.class))
         .filter(Optional::isPresent)
         .map(Optional::get)   //Flux<Event>
         .groupBy(event -> event.getIdType()) //There are only 2 types
         .concatMap(group -> group.map(event -> {
             if(group.key().equals(IdType.SEC_ID.getValue())) {
                 //Enrich and use
                 return enricher.enrich(List.of(event)); //return Flux<Event>
             }
             else {
                 //Just use them as such
                 return event;
             }
         })) //Returns Flux<Object> why is not returning Flux<Event>

现在如何合并它们?

grouping spring-webflux project-reactor flux
1个回答
0
投票

简短回答: 你得到

Flux<Object>
,因为从
group.map
你返回
Event
Flux<Event>
,它们的共同父类是 Object。快速解决方法是
group.flatMap
+
Mono.just()
:

     .concatMap(group -> group.flatMap(event -> {
         if(group.key().equals(IdType.SEC_ID.getValue())) {
             //Enrich and use
             return enricher.enrich(List.of(event)); //return Flux<Event>
         }
         else {
             //Just use them as such
             return Mono.just(event);
         }
     }))

但是,既然你要一一丰富活动(

List.of(event)
),你真的需要分组和
concatMap
吗?您可以更轻松地做到这一点:

    Flux.fromIterable(events)
            .map(event -> serializer.deserialize(event, Payload.class))
            .filter(Optional::isPresent)
            .map(Optional::get)   //Flux<Event>
            .flatMap(event -> {
                if(event.getIdType().equals(IdType.SEC_ID.getValue())) {
                    return enricher.enrich(List.of(event)); //return Flux<Event>
                }
                else {
                    //Just use them as such
                    return Mono.just(event);
                }
            }); //Returns Flux<Event>

如果您想将其减少为单个

enricher
调用提供完整的事件列表,那么您可以这样做:

            Flux.fromIterable(List.of(new Event()))
                    .map(event -> serializer.deserialize(event, Payload.class))
                    .filter(Optional::isPresent)
                    .map(Optional::get)   //Flux<Event>
                    .groupBy(Event::getIdType)
                    .concatMap(group -> {
                        if (group.key().equals(IdType.SEC_ID.getValue())) {
                            //Enrich and use
                            return group.collectList()
                                    .flatMapMany(enricher::enrich);
                        } else {
                            //Just use them as such
                            return group;
                        }
                    });

请小心,

SEC_ID
事件列表将被收集在内存中。

© www.soinside.com 2019 - 2024. All rights reserved.