高效加入排序的通量生产商

问题描述 投票:0回答:1

我需要合并可用的大数据流,每个数据流包含一个时间戳和一个值。如果时间戳匹配,则需要对这些值求和。 Flux中的数据按照时间戳升序排列。

对于较小的流,我会使用

groupBy
函数,但由于通量包含许多条目,因此效率不高。

我想利用通量中的条目是有序的这一事实,但我找不到正确的构造。有什么工具可以实现这样的目标。下面是一些关于我想做的事情的 sudo 代码:

    var flux1 = Flux.just(
            new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
    );


    var flux2 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
            new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
    );

    var flux3 = Flux.just(
            new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
    );

    var input = List.of(flux1, flux2, flux3);


    var output = Flux.create(sink -> {
        List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();

        do {
            ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
            List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
            double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
            sink.next(new Data(nextTimestamp, nextOutput));
            affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
        } while (!allFluxAreConsumed);
    });

    // expected output:
    // [
    //      Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
    //      Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
    //      Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
    //      Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
    // ]
java spring-webflux flux
1个回答
0
投票

通过堆叠以下运算符即可得到预期的结果:

  1. 使用 mergeComparing 运算符组合输入通量,保留总体日期排序。警告:如果输入流量缓慢,则会减慢整个下游管道
  2. 使用 windowUntilChanged 对使用相同时间戳的相邻记录进行分组
  3. 在每个窗口上使用
    reduce
    可根据需要合并记录。

这给出了这样的东西:

Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
    .windowUntilChanged(Data::datetime)
    .flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));

您可以使用单元测试来测试它,如下所示:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.LocalDateTime;
import java.util.Comparator;

public class MergeSorted {

    record Data(LocalDateTime datetime, Integer value) {}

    @Test
    public void test() {

        var flux1 = Flux.just(
                new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
                new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 1)
        );


        var flux2 = Flux.just(
                new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 2),
                new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 2),
                new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
        );

        var flux3 = Flux.just(
                new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 5)
        );

        var mergeSum = Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
                           .windowUntilChanged(Data::datetime)
                           .flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));

        StepVerifier.create(mergeSum)
                .expectNext(
                              new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
                              new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 7),
                              new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 3),
                              new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
                )
                .verifyComplete();
    }
}
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.