我需要合并可用的大数据流,每个数据流包含一个时间戳和一个值。如果时间戳匹配,则需要对这些值求和。 Flux中的数据按照时间戳升序排列。
对于较小的流,我会使用
groupBy
函数,但由于通量包含许多条目,因此效率不高。
我想利用通量中的条目是有序的这一事实,但我找不到正确的构造。有什么工具可以实现这样的目标。下面是一些关于我想做的事情的 sudo 代码:
var flux1 = Flux.just(
new Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 1.0)
);
var flux2 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 2.0),
new Data(ZonedDateTime.parse("2025-04-01T00:00:00"), 2.0)
);
var flux3 = Flux.just(
new Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 5.0)
);
var input = List.of(flux1, flux2, flux3);
var output = Flux.create(sink -> {
List<ZonedDateTime> nextEntries = input.stream().map(Flux::next).toList();
do {
ZonedDateTime nextTimestamp = nextEntries.stream().map(Data::getTimestamp).min(ZonedDateTime::compareTo).get();
List<Integer> affectedStreams = IntStream.range(0, input.size()).filter(i -> nextTimestamp == nextEntries[i].getTimestamp()).toList();
double nextOutput = affectedStreams.stream().mapToDouble(i -> nextEntries[i].getValue()).sum();
sink.next(new Data(nextTimestamp, nextOutput));
affectedStreams.forEach(i -> nextEntries[i] = input.get(i).next());
} while (!allFluxAreConsumed);
});
// expected output:
// [
// Data(ZonedDateTime.parse("2025-01-01T00:00:00"), 1.0),
// Data(ZonedDateTime.parse("2025-02-01T00:00:00"), 7.0),
// Data(ZonedDateTime.parse("2025-03-01T00:00:00"), 3.0),
// Data(ZonedDateTime.parse("2025-05-01T00:00:00"), 2.0)
// ]
通过堆叠以下运算符即可得到预期的结果:
reduce
可根据需要合并记录。这给出了这样的东西:
Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
.windowUntilChanged(Data::datetime)
.flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
您可以使用单元测试来测试它,如下所示:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.LocalDateTime;
import java.util.Comparator;
public class MergeSorted {
record Data(LocalDateTime datetime, Integer value) {}
@Test
public void test() {
var flux1 = Flux.just(
new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 1)
);
var flux2 = Flux.just(
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 2),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 2),
new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
);
var flux3 = Flux.just(
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 5)
);
var mergeSum = Flux.mergeComparing(Comparator.comparing(Data::datetime), flux1, flux2, flux3)
.windowUntilChanged(Data::datetime)
.flatMap(window -> window.reduce((d1, d2) -> new Data(d1.datetime(), d1.value() + d2.value())));
StepVerifier.create(mergeSum)
.expectNext(
new Data(LocalDateTime.parse("2025-01-01T00:00:00"), 1),
new Data(LocalDateTime.parse("2025-02-01T00:00:00"), 7),
new Data(LocalDateTime.parse("2025-03-01T00:00:00"), 3),
new Data(LocalDateTime.parse("2025-05-01T00:00:00"), 2)
)
.verifyComplete();
}
}