我正在开发一个 kafka 流应用程序,该应用程序从具有三个主题的消费者组进行消费。其中一个主题有 20 个分区,另外一个主题有 10 个分区,最后一个主题有 5 个分区。因此,该消费者组总共有 35 个分区。
streams 应用程序在 kubernetes 环境中运行,并在单个部署中将应用程序的多个实例作为 pod 进行扩展。目标是能够扩展到 35 个 pod(即 35 个消费者),并将每个分区分配给单个消费者以实现最大并行度。
但是,我看到的行为是在应用程序扩展时分配分区时的共同分区。因此,一个消费者将拥有来自所有三个主题的分区 0,另一个消费者将拥有来自所有三个主题的分区 1,等等。这使得我可以实现的最大并行度为 20。如果我有 35 个消费者,则只有 20 个消费者处于活动状态。
据我了解,我无法摆脱与kafka流的共同分区行为,因为分区分配策略是不可更改的。这是我不想要或不需要的行为。我考虑过一些解决方案,但我不确定哪种方法最好,我正在寻找有关如何继续的一些方向。
接受此应用程序的最大并行度将是消费者组中分区计数最高的主题。如果主题的滞后性很高,这将导致一些消费者处理大量数据,而另一些则不处理太多数据。
让每个主题由其自己的消费者组中的单独流使用。这是一个问题,因为消费者组独立运行,并且默认情况下无法确保跨多个消费者组为 35 个消费者以 1:1 的比例分配 35 个分区。很可能仍然会有闲置的消费者。
与上述类似的解决方案,每个主题都有自己的消费者组/流,但消费者将随着 pod 的上下移动而动态分配到消费者组以确保平衡。这可以通过使用 Kafka 管理 API 和 kubernetes API 来强制执行,但实施和维护会很复杂/耗时。
让消费者组中的所有主题具有相同数量的分区。例如,所有三个主题都有 20 个分区。确保有 20 个消费者,所有 20 个消费者都被分配了分区。缺点是我使用的是融合云,因此这会增加成本,但这是迄今为止最简单的解决方案。
我倾向于#1或#4作为解决方案,但很好奇我的理解是否很差或者是否有更简单/更好的解决方案。
谢谢!
您所观察到的是设计使然。最后,Kafka Streams 不是通过分区进行扩展,而是通过tasks 进行扩展。因此,您需要将程序分成更小的独立“部分”以获得更多任务。
鉴于您正在阅读三个输入主题,我假设您正在使用类似这样的内容:
streamsBuilder.stream("t1, "t2", "t3").map(...)...
该程序有效地将三个输入主题合并为一个
KStream
,因此您只得到一个子拓扑,它将扩展到N个任务,如您所观察到的N = max(t1.partitions, t2.partitions, t3.partitions)
。
但是,您可以将程序重新编写为三个独立的部分,从而为每个输入主题创建一个子拓扑:
streamsBuilder.stream("t1).map(...)...
streamsBuilder.stream("t2).map(...)...
streamsBuilder.stream("t3).map(...)...
(为了避免样板代码,您可以提取一个辅助方法,该方法以
KStream
作为输入并应用实际业务逻辑,并为您为每个主题创建的每个 KStream
调用帮助三次)。
如果您有三个子拓扑,每个子拓扑都有自己的一组任务,因此您应该能够在设置中扩展到 35 个任务,因为每个子拓扑可扩展到其 N 个分区的 N 个任务现在输入相应的主题。
要了解程序的结构,您可以检查从
TopologyDescription
获得的 Topology#describe()
。