寻找另一个实例来处理kafka消息

问题描述 投票:0回答:1

我在AWS中部署了2个SpringBoot应用程序,APP1正在管理汽车,对于每辆新添加的汽车,它将创建2个仅用于该汽车的kafka主题(事件APP1的主题A - > APP2和事件APP2的主题B - > 应用程序1).

创建后,APP1 将向 APP2 发送一个事件,以开始处理该汽车(连续异步任务),并且 APP2 时不时地将有关该汽车主题的一些事件发送回 APP1,以向该汽车上的一些事件发出信号。由于在某些时候 APP2 将耗尽容量,我将拥有一个额外的实例,但我不确定管理 APP1 -> APP2 与多个具有容量的 APP2 实例的通信的最佳方法是什么(因为在某些时候非常拥挤的实例可能会变得不那么拥挤)。

因为如果我将所有实例都放在同一个消费者组中,那么只有一个实例会收到开始处理汽车的消息,但如果它没有容量,我如何找到另一个实例来执行任务?

我想到的另一个想法是,当我从 APP1 发送事件时,让 APP2 的实例位于不同的消费者组中,这样每个实例都会收到该事件来启动处理车,但如何确保只有其中一个是处理吗?

java spring-boot apache-kafka architecture spring-kafka
1个回答
0
投票

处理群体事件时需要负载均衡策略。

您的情况:

// In APP1 when sending a new car processing request
{
// First, check available capacity
CompletableFuture<String> targetInstanceId = findAvailableInstance();

// Then send the processing request with the selected instance
CarProcessingRequest request = new CarProcessingRequest(
    carId,
    targetInstanceId,
    timestamp
);
kafkaTemplate.send(carTopicA, request);
}

// In APP2 instances
@KafkaListener(topics = "#{carTopicA}")
public void handleCarProcessing(CarProcessingRequest request) {
// Each instance checks if it's the target
if (!request.targetInstanceId.equals(instanceId)) {
    return; // Ignore if not targeted
}

if (!hasCapacity()) {
    // Publish capacity issue
    publishCapacityUpdate();
    return;
}

// Process the car
startCarProcessing(request.carId);
}

// Periodic capacity reporting in APP2
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void reportCapacity() {
InstanceCapacity capacity = new InstanceCapacity(
    instanceId,
    getCurrentLoad(),
    getAvailableSlots(),
    timestamp
);
kafkaTemplate.send("app2-capacity-topic", capacity);

}

© www.soinside.com 2019 - 2024. All rights reserved.