从不再需要分布式缓存的应用程序中删除 HazelCast。
但是我仍然需要保持对类似于 IMap 的 Map 的同步访问。
我首先想用Map和ConcurrentHashMap替换IMap。
现在集合只是一个简单的Java Map,但我仍然需要控制对其的异步读/写。
ConcurrentHashMap 是否足以解决这个问题,或者我是否需要其他锁定解决方案/机制。
该应用程序使用 RabbitMQ,有 3 个队列。
使用 HazelCast,我有一个带有域对象的 IMap,我在访问它的每个 MQ 线程中执行了锁定。
在批处理作业的初始化期间,它会创建新的域对象,并将其放置在地图上。
在初始化结束时,它会在 3 个不同的消息队列上为 Map 中的每个对象发送请求。
只有 1 个 MQ 线程应该能够从 Map 获取对象、更新它并将其放回到 Map 上。
ConcurrentHashMap 可以处理这个问题吗?我需要在 Map.get 和 Map.put 之间实现一些锁定/解锁吗?
使用 HazelCast,这是通过锁定地图来完成的。
IMap<Long, MyObject> myCollection;
public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
final Long myObjectId = response.getObjectId();
myCollection.lock(myObjectId);
try {
final MyObject myObject = myCollection.get(myObjectId);
updateMyObject(myObject, response);
myCollection.put(myObjectId, myObject)
if (myObject.isCompleted()) {
repository.save(myObject);
}
} finally {
myCollection.unlock(myObjectId);
}
}
public void processMessage2(final MyResponse2 reponse) { // MQ-Thread2
// Similar with locking as processMessage1
}
public void processMessage3(final MyResponse3 reponse) { // MQ-Thread3
// Similar with locking as processMessage1
}
将 HazelCast 和 IMap 锁定更改为 ConcurrentHashMap 后,我的代码现在如何。
@Data
@Component
public class MyCache {
private final Map<Long, Producer> producers = new HashMap<>();
private final ConcurrentMap<Long, Customer> customers = new ConcurrentHashMap<>();
}
@Service
public class CustomerService {
private final MyCache myCache;
public CustomerService(final MyCache myCache) {
this.myCache = myCache;
}
@Transaction
public void startProcessing() {
getProducers();
deleteCustomers();
myCache.getProducers().keySet().forEach(id -> startProcessingProducer(id));
}
public void startProcessingProducer(Long producerId) {
log.info("Started processing for producerId={}", producerId);
initialize(producerId);
}
public void initialize(Long producerId) {
log.info("Initialize start {}", producerId);
final Customer customer = new Customer();
sendRabbitMq1(producerId);
sendRabbitMq2(producerId);
sendRabbitMq3(producerId);
myCache.getCustomers().put(producerId, customer);
log.info("Initialize end {}", producerId);
}
}
@Service
public class MessageService {
private final MyCache myCache;
private final StorageService storageService;
public MessageService(final MyCache myCache, final StorageService storageService) {
this.myCache = myCache;
this.storageService = storageService;
}
@Bean
@Transactional
public Consumer<MyResponse1> response1() {
return this::processResponse1;
}
@Bean
@Transactional
public Consumer<MyResponse2> response2() {
return this::processResponse2;
}
@Bean
@Transactional
public Consumer<MyResponse3> response3() {
return this::processResponse3;
}
private void processResponse1(final MyResponse1 response) {
myCache.getCustomers().computeIfPresent(response.getProducerId(),
(producerId, customer) -> updateFromResponse1(response, customer));
}
private Customer updateFromResponse1(final MyResponse1 response, final Customer customer) {
log.debug("Got Response 1 {}", response);
// Updating Customer with Response
process(customer, storageService::saveCustomer);
return customer;
}
public void process(final Customer customer, final Consumer<Customer> customerUpdateCallback) {
// Processing Customer
customerUpdateCallback.accept(customer);
}
}
@Service
public class StorageService {
private final CustomerRepository customerRepository;
public StorageService(final CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
}
public void saveCustomer(final Customer customer) {
customerRepository.save(customer);
}
}
ConcurrentHashMap 可以使用以下方法之一解决键锁定问题:
关键信息是 javadoc 指出:
整个方法调用都是原子执行的。每次调用此方法时,所提供的函数都会被调用一次。
如果我们调整您的代码示例以使用 ConcurrentHashMap 进行更新,它将给出如下所示的内容:
final Map<Long, MyObject> myCollection = new ConcurrentHashMap<>();
public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
final Long myObjectId = response.getObjectId();
myCollection.computeIfPresent(myObjectId, (key, oldValue) -> {
var newValue = updateMyObject(oldValue, response);
if (newValue.isCompleted()) {
repository.save(newValue);
}
return newValue;
});
}