我们有一个用例,我们需要使用基于参与者可用性的 akka kafka 连接器来控制来自 kafka 队列的消息轮询,以便处理消息。我们有 2 个消费者,每个消费者有 5 个参与者来处理消息,我们需要根据两个消费者的参与者可用性来轮询消息。
我们观察到akka Consumer1不断地从kafka轮询消息并将它们存储在akka流中,而consumer2闲置,但我们需要控制这种行为,只要我们有可用的处理actor,那么我们只需要消费者轮询消息。
根据有关背压的 akka 文档,我们尝试添加 Sink.actorRefWithBackPressure() 来控制轮询,但背压发生在 Actor 流上,而不是消费者轮询上。
我们如何根据演员池中我处理演员的可用性来控制消费者轮询。
下面是我为特定主题创建和注册消费者的代码:
public void createAndRegisterConsumer(String groupId, ActorRef actor, SourceType sourceType, String... topics) {
RestartSource.onFailuresWithBackoff(Duration.ofSeconds(3), Duration.ofSeconds(20), 0.2,
() -> createRawConsumer(groupId, sourceType, topics).mapMaterializedValue(c -> {
topicMapping.put(topicKey(topics), c);
return c;
})).map(ConsumerRecord::value).runWith(Sink.actorRefWithBackpressure(actor, new StreamInit(), Ack.INSTANCE, new StreamComplete(), (e) -> {
LOGGER.error("Stream has failed", e);
return new StreamFailed(e);
}), materializer);
}
public Source<ConsumerRecord<byte[], String>, Consumer.Control> createRawConsumer(String groupId, SourceType sourceType, String... topics) {
if (topics == null || topics.length == 0) {
throw new RuntimeException("Must provide at least one topic to consume");
}
final ConsumerSettings<byte[], String> consumerSettings = getConsumerSettings(groupId, this.kafkaConfig, sourceType == SourceType.PLAIN);
final Subscription subscription = Subscriptions.topics(topics);
switch (sourceType) {
case PLAIN:
return Consumer.plainSource(consumerSettings, subscription);
case AT_MOST_ONCE:
return Consumer.atMostOnceSource(consumerSettings, subscription);
default:
throw new UnsupportedOperationException("The source type " + sourceType + " is not supported");
}
}
完全做到这一点是不可能的:即使没有需求,消费者参与者也会定期进行轮询。部分原因是旧 Kafka 的遗留问题,不进行轮询可能会导致消费者重新平衡。
可以通过
ConsumerSettings
: 将计划轮询之间的间隔设置为任意长
consumerSettings.withPollInterval(Duration.ofHours(1))