我有一个项目列表,其中有 2 种类型的 id。我需要将它们分成 2 组,然后为一组调用另一种方法,该方法将丰富 id 并提供不同的 id,而另一组则返回相同的值。在执行上述操作后,我需要将两个组合并为单个流。但是当我应用
map
时,它会返回 Flux<Object>
并丢失类型信息。
Flux.fromIterable(events)
.map(event -> serializer.deserialize(event, Payload.class))
.filter(Optional::isPresent)
.map(Optional::get) //Flux<Event>
.groupBy(event -> event.getIdType()) //There are only 2 types
.concatMap(group -> group.map(event -> {
if(group.key().equals(IdType.SEC_ID.getValue())) {
//Enrich and use
return enricher.enrich(List.of(event)); //return Flux<Event>
}
else {
//Just use them as such
return event;
}
})) //Returns Flux<Object> why is not returning Flux<Event>
现在如何合并它们?
简短回答: 你得到
Flux<Object>
,因为从 group.map
你返回 Event
或 Flux<Event>
,它们的共同父类是 Object。快速解决方法是 group.flatMap
+ Mono.just()
:
.concatMap(group -> group.flatMap(event -> {
if(group.key().equals(IdType.SEC_ID.getValue())) {
//Enrich and use
return enricher.enrich(List.of(event)); //return Flux<Event>
}
else {
//Just use them as such
return Mono.just(event);
}
}))
但是,既然你要一一丰富活动(
List.of(event)
),你真的需要分组和concatMap
吗?您可以更轻松地做到这一点:
Flux.fromIterable(events)
.map(event -> serializer.deserialize(event, Payload.class))
.filter(Optional::isPresent)
.map(Optional::get) //Flux<Event>
.flatMap(event -> {
if(event.getIdType().equals(IdType.SEC_ID.getValue())) {
return enricher.enrich(List.of(event)); //return Flux<Event>
}
else {
//Just use them as such
return Mono.just(event);
}
}); //Returns Flux<Event>
如果您想将其减少为单个
enricher
调用提供完整的事件列表,那么您可以这样做:
Flux.fromIterable(List.of(new Event()))
.map(event -> serializer.deserialize(event, Payload.class))
.filter(Optional::isPresent)
.map(Optional::get) //Flux<Event>
.groupBy(Event::getIdType)
.concatMap(group -> {
if (group.key().equals(IdType.SEC_ID.getValue())) {
//Enrich and use
return group.collectList()
.flatMapMany(enricher::enrich);
} else {
//Just use them as such
return group;
}
});
请小心,
SEC_ID
事件列表将被收集在内存中。