我有一个 flink 应用程序,我在其中使用 TumblingEventTimeWindows 和处理函数
DataStream<Map<String, Object>> processedEvents = rawEvents
.keyBy(eventMap -> {
return calculateKey(eventMap.get("id").toString());
})
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(30)))
.allowedLateness(Duration.ofSeconds(3))
.process(new CustomProcessWindowFunction())
.setParallelism(16);
calculateKey - 始终返回 0 到 16 之间的数字。我认为并行度 16 应该在所有子任务之间严格分配,但相反,我得到 5 个空闲任务和几个超载任务
尝试添加分区、更改哈希 - 这些方法都没有给出结果
有没有办法让每个子任务进程不超过一个key? 谢谢!
Flink 使用一个稍微复杂的过程将键映射到子任务索引,该过程首先计算键的“键组”,然后将其映射到运算符索引(子任务索引)。所有这些代码都位于
KeyGroupRangeAssignment
类内部。
因此,您的 0...15 个键值将转换为一定数量的唯一子任务索引,并且这不会是一对一的映射。
您可以编写使用
KeyGroupRangeAssignment
类中的方法来生成映射到子任务索引的整数键值的代码。您基本上为每个目标子任务索引预先计算 Flink 将映射到该子任务的整数键值。然后您的 calculateKey
函数只需执行此查找即可返回适当的键值。
预计算只是循环遍历整数值,直到找到一个有效的值,因此效率不高,这就是为什么您想要执行一次,而不是按密钥生成请求。
代码看起来像这样:
/**
* Return an integer value that will get partitioned to the target <operatorIndex>, given the workflow's
* <maxParallelism> (for key groups) and the operator <parallelism>.
*
* @param maxParallelism
* @param parallelism
* @param operatorIndex
* @return Integer suitable for use in a record as the key.
*/
public static Integer makeKeyForOperatorIndex(int maxParallelism, int parallelism,
int operatorIndex) {
for (int i = 0; i < maxParallelism * 2; i++) {
Integer key = new Integer(i);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
int index = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism,
parallelism, keyGroup);
if (index == operatorIndex) {
return key;
}
}
throw new RuntimeException(String.format(
"Unable to find key for target operator index %d (max parallelism = %d, parallelism = %d",
operatorIndex, maxParallelism, parallelism));
}