为什么ListView的性能比ArrayList好

问题描述 投票:0回答:1

我在使用 Flink(版本 1.20)自定义 UDAF 时遇到问题。我想实现一个计算中位数的UDAF,我使用了以下两种方法:

public class MedianUDAF2 extends AggregateFunction<Double, MedianUDAF2.State> {

    public static class State {

        public int scale = 2;

        public ListView<Double> numbers;

        public State() {}
    }

    @Override
    public State createAccumulator() {
        State state = new State();
        state.numbers = new ListView<>();
        return state;
    }

    public void accumulate(State acc, Double val, Integer scale) throws Exception {
        acc.numbers.add(val);
        if (scale != null && scale > 0) acc.scale = scale;
    }

    public void merge(State acc, Iterable<State> it) throws Exception {
        for (State a : it) {
            acc.numbers.addAll(a.numbers.getList());
        }
    }

    @Override
    public Double getValue(State acc) {
        try {
            List<Double> numbers = acc.numbers.getList();
            numbers.sort(Double::compareTo);
            double n = numbers.size() - 1;
            double index = n * 0.5;

            int low = (int) Math.floor(index);
            int high = (int) Math.ceil(index);

            double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
            BigDecimal decimal = new BigDecimal(value);
            return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
        } catch (Exception ignored) {
        }
        return 0.0;
    }
}
public class MedianUDAF extends AggregateFunction<Double, MedianUDAF.State> {

    public static class State {

        public int scale = 2;

        @DataTypeHint(value = "ARRAY<DOUBLE>")
        public ArrayList<Double> numbers;

        public State() {}
    }

    @Override
    public State createAccumulator() {
        State state = new State();
        state.numbers = new ArrayList<>();
        return state;
    }

    public void accumulate(State acc, Double val, Integer scale) throws Exception {
        acc.numbers.add(val);
        if (scale != null && scale > 0) acc.scale = scale;
    }

    public void merge(State acc, Iterable<State> it) throws Exception {
        for (State a : it) {
            acc.numbers.addAll(a.numbers);
        }
    }

    @Override
    public Double getValue(State acc) {
        try {
            List<Double> numbers = acc.numbers;
            numbers.sort(Double::compareTo);
            double n = numbers.size() - 1;
            double index = n * 0.5;

            int low = (int) Math.floor(index);
            int high = (int) Math.ceil(index);

            double value = low == high ? (numbers.get(low) + numbers.get(high)) * 0.5 : numbers.get(high);
            BigDecimal decimal = new BigDecimal(value);
            return decimal.setScale(acc.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
        } catch (Exception ignored) {
        }
        return 0.0;
    }
}
tableEnvironment.createTemporarySystemFunction("median", new MedianUDAF()); // Or new MedianUDAF2()
Table table = tableEnvironment.sqlQuery("select median(l_linenumber, 2) from lineitem");

他们的区别在于,一个在State中使用ArrayList,另一个使用ListView,性能差距非常大。为什么?

ListView 上的评论指出它将在大量数据中使用状态后端。在 Flink-table-planner 1.14 之前,AggregationCodeGenerator 中的 addAccumulatorDataViews 可以看到这个转换过程,但在 1.20 版本中不再可见, 我试图在AggsHandlerCodeGenerator类中DEBUG这个转换过程,但仍然无法成功, 请问这个转换过程发生在哪里以及我应该如何观察这个现象?谢谢谢谢!!!

java apache-flink
1个回答
0
投票

ListView看起来像一个Java类,但实际上是一个接口。只要有可能,它就会直接针对 Flink 的状态后端。

这将提高 get 请求的性能,尤其是。在大名单上。调用accumulate()时不是访问整个列表,而是延迟访问列表元素。因此,反序列化是延迟发生的,并且仅发生在访问的元素上,而不是完整的列表上。

© www.soinside.com 2019 - 2024. All rights reserved.