我认为并行度 16 应该在所有子任务之间严格分配

问题描述 投票:0回答:1

我有一个 flink 应用程序,我在其中使用 TumblingEventTimeWindows 和处理函数

DataStream<Map<String, Object>> processedEvents = rawEvents
    .keyBy(eventMap -> {
        return calculateKey(eventMap.get("id").toString());
    })
    .window(TumblingEventTimeWindows.of(Duration.ofSeconds(30)))
    .allowedLateness(Duration.ofSeconds(3))
    .process(new CustomProcessWindowFunction())
    .setParallelism(16);

calculateKey - 始终返回 0 到 16 之间的数字。我认为并行度 16 应该在所有子任务之间严格分配,但相反,我得到 5 个空闲任务和几个超载任务

尝试添加分区、更改哈希 - 这些方法都没有给出结果

有没有办法让每个子任务进程不超过一个key? 谢谢!

java apache-flink flink-streaming
1个回答
0
投票

Flink 使用一个稍微复杂的过程将键映射到子任务索引,该过程首先计算键的“键组”,然后将其映射到运算符索引(子任务索引)。所有这些代码都位于

KeyGroupRangeAssignment
类内部。

因此,您的 0...15 个键值将转换为一定数量的唯一子任务索引,并且这不会是一对一的映射。

您可以编写使用

KeyGroupRangeAssignment
类中的方法来生成映射到子任务索引的整数键值的代码。您基本上为每个目标子任务索引预先计算 Flink 将映射到该子任务的整数键值。然后您的
calculateKey
函数只需执行此查找即可返回适当的键值。

预计算只是循环遍历整数值,直到找到一个有效的值,因此效率不高,这就是为什么您想要执行一次,而不是按密钥生成请求。

代码看起来像这样:

    /**
     * Return an integer value that will get partitioned to the target <operatorIndex>, given the workflow's
     * <maxParallelism> (for key groups) and the operator <parallelism>.
     * 
     * @param maxParallelism
     * @param parallelism
     * @param operatorIndex
     * @return Integer suitable for use in a record as the key.
     */
    public static Integer makeKeyForOperatorIndex(int maxParallelism, int parallelism,
            int operatorIndex) {

        for (int i = 0; i < maxParallelism * 2; i++) {
            Integer key = new Integer(i);
            int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
            int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism,
                    parallelism, keyGroup);
            if (index == operatorIndex) {
                return key;
            }
        }

        throw new RuntimeException(String.format(
                "Unable to find key for target operator index %d (max parallelism = %d, parallelism = %d",
                operatorIndex, maxParallelism, parallelism));
    }

© www.soinside.com 2019 - 2024. All rights reserved.