合并两个不同类型的 Flux 实例

问题描述 投票:0回答:3

使用 SpringBoot 2 和 Poi 类(兴趣点):

public class Poi {
public Poi(String poidId, Double price, Double latitude, Double longitude) {...}
private String poidId;
private Double latitude;
private Double longitude;
private Double price;
//And Getters and Setters
}

我有 2 个 Poi 通量:

Flux<Poi> availablePoisFlux;
Flux<Poi> poiFlux;

第一个元素availablePoisFlux包含Pois:

  • 一个 poidId
  • 没有纬度信息
  • 没有经度信息
  • 价格信息

第二个元素 poiFlux 包含 Pois 与:

  • poidId
  • 纬度
  • 经度
  • 没有价格信息

(poidId 是一个 Poi 的标识符)。

我想从两个 Flux(poiFlux 和 availablePoisFlux)创建一个带有 Pois(带有 poidId、价格、经度和纬度)的新 Flux resultPoisFlux

poidId属性是两个Flux(poiFlux和availablePoisFlux)之间的关键

实施示例:

我想我可以使用 zipWith 运算符来做到这一点,但我需要一些关于反应运算符(和过滤器?)的信息和建议

我想从第一个 Flux 开始迭代,并使用 poidId 标识符从第二个 flux 获取信息(价格),并使用正确的值更新价格属性。

示例输入值:

poiFlux = Poi(poidId=poiId0, price=null, name=name0, latitude=2.2222, longitude=14.222)
poiFlux = Poi(poidId=poiId1, price=null, name=name1, latitude=3.2222, longitude=15.222)
poiFlux = Poi(poidId=poiId2, price=null, name=name2, latitude=4.2222, longitude=16.222)
poiFlux = Poi(poidId=poiId3, price=null, name=name3, latitude=5.2222, longitude=17.222)
poiFlux = Poi(poidId=poiId4, price=null, name=name4, latitude=6.2222, longitude=18.222)
poiFlux = Poi(poidId=poiId5, price=null, name=name5, latitude=7.2222, longitude=19.222)
poiFlux = Poi(poidId=poiId6, price=null, name=name6, latitude=8.2222, longitude=20.222)
poiFlux = Poi(poidId=poiId7, price=null, name=name7, latitude=9.2222, longitude=21.222)
poiFlux = Poi(poidId=poiId8, price=null, name=name8, latitude=10.2222, longitude=22.222)
poiFlux = Poi(poidId=poiId9, price=null, name=name9,  latitude=11.2222, longitude=23.222)

availablePoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=null, longitude=null)
availablePoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=null, longitude=null)

预期结果:

resultPoisFlux = Poi(poidId=poiId0, price=120.0, name=name0, latitude=2.2222, longitude=14.222)
resultPoisFlux = Poi(poidId=poiId1, price=120.0, name=name1, latitude=3.2222, longitude=15.222)
resultPoisFlux = Poi(poidId=poiId2, price=120.0, name=name2, latitude=4.2222, longitude=16.222)
resultPoisFlux = Poi(poidId=poiId3, price=120.0, name=name3, latitude=5.2222, longitude=17.222)
resultPoisFlux = Poi(poidId=poiId4, price=120.0, name=name4, latitude=6.2222, longitude=18.222)

类似的东西:

Flux<Poi> resultPoisFlux = availablePoisFlux.zipWith(poiFlux, (a, b) -> new Poi(a.getPoidId(), a.getPrice(), getLatitudeFromPoiFluxByPoidId(a.getPoidId()), getLongitudeFromPoiFluxByPoidId(a.getPoidId())))....

谢谢你的帮助。

reactive-programming project-reactor
3个回答
5
投票

zip/zipWith
,但它只将两个来源成对组合......

...只要它有足够的元素来配对。因此,只有当您 保证 元素在两个源中的顺序相同,并且每一侧的

poiIds
中没有差异时,它才对您的情况有用。在您的示例中就是这种情况,因为即使第二个源只有 4 个元素,这些元素与第一个源的开头相同。

poiFlux.zipWith(availablePoisFlux, (a, b) -> new Poi(a.getPoiId(), 
    b.getPrice(),
    a.getLatitude(),
    a.getLongitude(),
    a.getName()));

更通用的解决方案,更少的反应性

如果没有这样的保证,则需要以某种方式组合两个无序且不相交的序列。如果不收集其中一个来源中的所有元素(最好是

availablePoisFlux
),你就不能这样做,这意味着它将延迟另一个来源的处理,直到所述来源完成。

组合的一种方法是将所有值收集到由

poiId
键控的映射中,然后“迭代”第二个源。由于地图中可能找不到某些元素,因此您需要
handle
才能“跳过”这些元素:

availablePoisFlux.collectMap(Poi::getId, Poi::getPrice)
    .flatMapMany(knownPrices -> poiFlux.handle((poi, sink) -> {
        String poiId = poi.getPoiId();
        if (knownPrices.containsKey(poiId) {
            Double price = knownPrices.get(poiId);
            Poi complete = new Poi(poiId, price, poi.getLatitude(),
                poi.getLongitude(), poi.getName());
            sink.next(complete);
        } //else do nothing and let handle skip that poi
    }));

0
投票

当尝试将两个 Flux-es 合并为一个时,您的需求也很有可能会请求创建处理器类,它将订阅 booth Flux-es,在内部状态(地图、树、集合、... ) 并发布单个输出通量。像这样的东西:

// source data
public final Sinks.Many<SimpleTreeInfoEvent> treeInfoEventSink = Sinks.many().multicast().onBackpressureBuffer();
public final Sinks.Many<TrainArrivalDataEvent> trainArrivalDataEventEventSink = Sinks.many().multicast().onBackpressureBuffer();

// processing class example:
public class TrainAndWeatherProcessor {
    private final Map<String, String> internalState = new HashMap<>();

    public final Sinks.Many<WetTrainInfoEvent> wetTrainInfoEventSink = Sinks.many().multicast().onBackpressureBuffer();
    public TrainAndWeatherProcessor(
            Sinks.Many<SimpleTreeInfoEvent> treeInfoEventSink,
            Sinks.Many<TrainArrivalDataEvent> trainArrivalDataEventEventSink
    ) {
        treeInfoEventSink.asFlux()
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(treeInfo -> {
                    // some heavy processing according your business
                    final boolean somethingHasChanged = internalState.get("" + treeInfo.toString()).isEmpty();
                    if(somethingHasChanged) {
                        wetTrainInfoEventSink.tryEmitNext(new WetTrainInfoEvent());
                    }
                });

        trainArrivalDataEventEventSink.asFlux()
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe(treeInfo -> {
                    // some heavy processing according your business
                    final boolean somethingHasChanged = internalState.get("" + treeInfo.toString()).isEmpty();
                    if(somethingHasChanged) {
                        wetTrainInfoEventSink.tryEmitNext(new WetTrainInfoEvent());
                    }
                });
    }

    record SimpleTreeInfoEvent(){}
    record TrainArrivalDataEvent(){}
    record WetTrainInfoEvent(){}
}

此示例处理类将采用两个流(SimpleTreeInfoEvent 和 TrainArrivalDataEvent 实例进入)并生成单个 WetTrainInfoEvent 实例流。


0
投票

首先 zip/zipWith 将停止发射其中一个蒸汽末端。所以当我们需要使用两个元素数量不相等的列表时,这不是一个理想的选择。 Flux.concatWith 可以在这种情况下使用,如果是 id 或任何东西来对元素进行分组。

final Flux<Poi> available = availablePoisFlux;
final Flux<Poi> poiFlux = priceOnlyPoisFlux;

 return availablePoisFlux.concatWith(priceOnlyPoisFlux) //Flux<Poi>
.groupBy(Poi::getPoiId) //Flux<GroupedFlux<String,Poi>>
.onBackpressureBuffer(Integer.MAX_VALUE) //if more groups groupBy hangs
.flatMap(groupedPois -> groupedPois.reduce((p1,p1)-> {
   Poi pricePoi = Objects.nonNull(p1.getPrice()) ? p1 : p2;
   Poi latitudePoi =  = Objects.isNull(p1.getPrice()) ? p1 : p2;
   return new Poi(pricePoi.getPoiId(),
            pricePoi.getPrice(),
            latitudePoi.getLatitude(),
            latitudePoi.getLongitude());

} //Flux<Poi>
.filter(poi -> Objects.nonNull(poi.getPrice()) && Objects.nonNull(poi.getLongitude())); //Flux<Poi>

final filter 调用将确保从最终结果中跳过任何没有一对的 poi。

下面一步一步

  1. 结合两种助焊剂
  2. 用 poiId 将它们分组
  3. flatmap -> 只有一对存在这将来到 flatMap 并执行里面提到的操作 loic。这里是 reduce
  4. filter -> 如果存在对,则进入第 4 步,如果不存在,直接跳过第 3 步,在第 4 步可用 无论收到的对象是什么,我们都会检查价格和纬度字段是否存在。如果存在,它将在最后可用。
© www.soinside.com 2019 - 2024. All rights reserved.