我正在使用队列(Apache Pulsar)中的跟踪日志。我使用了5个keyedPrcoessFunction,最后将有效负载下沉到Postgres Db。我需要为每个keyedProcessFunction订购每个customerId。现在,我通过
实现Datasource.keyBy(fooKeyFunction).process(processA).keyBy(fooKeyFunction).process(processB).keyBy(fooKeyFunction).process(processC).keyBy(fooKeyFunction).process(processE).keyBy(fooKeyFunction).sink(fooSink).
processFunctionC非常耗时,最坏情况下需要30秒才能完成。这导致背压。我尝试为processFunctionC分配更多的插槽,但是吞吐量始终保持不变。它大部分保持每秒<4条消息。
每个进程的当前插槽功能为
processFunctionA: 3
processFunctionB: 30
processFunctionc: 80
processFunctionD: 10
processFunctionC: 10
在Flink UI中,它显示了从processB开始的背压,这意味着C非常慢。有没有一种方法可以在源代码本身使用应用分区逻辑,并为每个processFunction分配每个任务相同的插槽。例如:
dataSoruce.magicKeyBy(fooKeyFunction).setParallelism(80).process(processA).process(processB).process(processC).process(processE).sink(fooSink).
这将导致背压仅在少数几个任务中发生,并且不会使由多个KeyBy引起的背压偏斜。
我可以想到的另一种方法是将我所有的processFunction和接收器组合到单个processFunction中,并在接收器本身中应用所有这些逻辑。
我认为不存在像这样的东西。最接近的是DataStreamUtils.reinterpretAsKeyedStream
,它重新创建KeyedStream
,而实际上没有在运算符之间发送任何数据,因为它使用的是仅在本地转发数据的分区程序。这或多或少是您想要的,但是它仍然添加了分区运算符,并且在幕后重新创建了KeyedStream
,但是它应该更简单,更快,并且也许可以解决您面临的问题。
如果这不能解决问题,那么我认为最好的解决方案是对运算符进行分组,以便按照您的建议将背压最小化,即将所有运算符合并为一个更大的运算符,这应该使背压最小化。