在源头的flink中对整个dataStream进行分区,并保持分区直到接收器

问题描述 投票:0回答:1

我正在使用队列(Apache Pulsar)中的跟踪日志。我使用了5个keyedPrcoessFunction,最后将有效负载下沉到Postgres Db。我需要为每个keyedProcessFunction订购每个customerId。现在,我通过

实现
Datasource.keyBy(fooKeyFunction).process(processA).keyBy(fooKeyFunction).process(processB).keyBy(fooKeyFunction).process(processC).keyBy(fooKeyFunction).process(processE).keyBy(fooKeyFunction).sink(fooSink).

processFunctionC非常耗时,最坏情况下需要30秒才能完成。这导致背压。我尝试为processFunctionC分配更多的插槽,但是吞吐量始终保持不变。它大部分保持每秒<4条消息。

每个进程的当前插槽功能为

processFunctionA: 3
processFunctionB: 30
processFunctionc: 80
processFunctionD: 10
processFunctionC: 10

在Flink UI中,它显示了从processB开始的背压,这意味着C非常慢。有没有一种方法可以在源代码本身使用应用分区逻辑,并为每个processFunction分配每个任务相同的插槽。例如:

dataSoruce.magicKeyBy(fooKeyFunction).setParallelism(80).process(processA).process(processB).process(processC).process(processE).sink(fooSink).

这将导致背压仅在少数几个任务中发生,并且不会使由多个KeyBy引起的背压偏斜。

我可以想到的另一种方法是将我所有的processFunction和接收器组合到单个processFunction中,并在接收器本身中应用所有这些逻辑。

apache-flink flink-streaming flink-cep flink-sql flink-batch
1个回答
0
投票

我认为不存在像这样的东西。最接近的是DataStreamUtils.reinterpretAsKeyedStream,它重新创建KeyedStream,而实际上没有在运算符之间发送任何数据,因为它使用的是仅在本地转发数据的分区程序。这或多或少是您想要的,但是它仍然添加了分区运算符,并且在幕后重新创建了KeyedStream,但是它应该更简单,更快,并且也许可以解决您面临的问题。

如果这不能解决问题,那么我认为最好的解决方案是对运算符进行分组,以便按照您的建议将背压最小化,即将所有运算符合并为一个更大的运算符,这应该使背压最小化。

© www.soinside.com 2019 - 2024. All rights reserved.