我在AWS中部署了2个SpringBoot应用程序,APP1正在管理汽车,对于每辆新添加的汽车,它将创建2个仅用于该汽车的kafka主题(事件APP1的主题A - > APP2和事件APP2的主题B - > 应用程序1).
创建后,APP1 将向 APP2 发送一个事件,以开始处理该汽车(连续异步任务),并且 APP2 时不时地将有关该汽车主题的一些事件发送回 APP1,以向该汽车上的一些事件发出信号。由于在某些时候 APP2 将耗尽容量,我将拥有一个额外的实例,但我不确定管理 APP1 -> APP2 与多个具有容量的 APP2 实例的通信的最佳方法是什么(因为在某些时候非常拥挤的实例可能会变得不那么拥挤)。
因为如果我将所有实例都放在同一个消费者组中,那么只有一个实例会收到开始处理汽车的消息,但如果它没有容量,我如何找到另一个实例来执行任务?
我想到的另一个想法是,当我从 APP1 发送事件时,让 APP2 的实例位于不同的消费者组中,这样每个实例都会收到该事件来启动处理车,但如何确保只有其中一个是处理吗?
处理群体事件时需要负载均衡策略。
您的情况:
// In APP1 when sending a new car processing request
{
// First, check available capacity
CompletableFuture<String> targetInstanceId = findAvailableInstance();
// Then send the processing request with the selected instance
CarProcessingRequest request = new CarProcessingRequest(
carId,
targetInstanceId,
timestamp
);
kafkaTemplate.send(carTopicA, request);
}
// In APP2 instances
@KafkaListener(topics = "#{carTopicA}")
public void handleCarProcessing(CarProcessingRequest request) {
// Each instance checks if it's the target
if (!request.targetInstanceId.equals(instanceId)) {
return; // Ignore if not targeted
}
if (!hasCapacity()) {
// Publish capacity issue
publishCapacityUpdate();
return;
}
// Process the car
startCarProcessing(request.carId);
}
// Periodic capacity reporting in APP2
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void reportCapacity() {
InstanceCapacity capacity = new InstanceCapacity(
instanceId,
getCurrentLoad(),
getAvailableSlots(),
timestamp
);
kafkaTemplate.send("app2-capacity-topic", capacity);
}