使用Java OrbitzWorld领事客户端,我正在尝试通过acquireLock
方法同步我的Java应用程序的多个实例。
到目前为止我的代码:将应用注册为领事服务:
private void registerService(Config config) {
String serviceId = config.getService().getId();
String serviceName = config.getService().getName();
long ttl = config.getService().getTtl();
AgentClient agentClient = client.agentClient();
Registration service = ImmutableRegistration.builder()
.id(serviceId)
.name(serviceName)
.check(Registration.RegCheck.ttl(ttl))
.build();
agentClient.register(service);
new HeartBeater(agentClient, serviceId, ttl).start();
}
HeartBeater:
@Override
public void run() {
while(true) {
try {
client.pass(serviceId);
Thread.sleep((Math.max(ttl / 2, 1)));
} catch (NotRegisteredException | InterruptedException e) {}
}
}
上面的代码可以正常工作,并且服务会在领事中成功刷新。现在我想知道锁定的实现。
我到目前为止所写的内容:
public boolean amILeader() {
// return if current java app is leader
}
private String createSession() {
final Session session = ImmutableSession.builder().name(config.getService().getName()).build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange() {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, Constants.LEADER_LOCK_KEY, config.getService().getWatchKey());
kvCache.addListener(map -> {
Value value = map.get(Constants.LEADER_LOCK_KEY);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(Constants.LEADER_LOCK_KEY, ???); //create new session here ???
}
});
kvCache.start();
}
我被困在这里,因为我不了解理论,在文档中没有发现任何有用的东西。
我的问题:
acquireLock
方法进行同步是否需要会话?您能否提供一些代码示例或填写我的实现?感谢您的任何回复:]
我想我现在明白了。
理论是这样的:
public class SessionFacade {
private String leaderLock;
private String sessionId;
private Consul client;
private Config config;
public SessionFacade(Consul client, Config config) {
this.client = client;
this.config = config;
this.leaderLock = "service/" + config.getService().getName() + "/leader";
this.sessionId = createSession();
new SessionHeartBeater(client, sessionId, config.getService().getSessionTtl()).start();
watchLeaderLockStateChange(sessionId);
client.keyValueClient().acquireLock(leaderLock, sessionId);
}
public boolean doIPossesLeaderLock() {
Optional<Value> leaderValue = client.keyValueClient().getValue(leaderLock);
if(leaderValue.isPresent()) {
Optional<String> session = leaderValue.get().getSession();
return session.isPresent() && session.get().equals(sessionId);
}
return false;
}
private String createSession() {
int sessionTtl = config.getService().getSessionTtl();
final Session session = ImmutableSession.builder()
.name(config.getService().getName())
.ttl(sessionTtl + "s")
.build();
return client.sessionClient().createSession(session).getId();
}
private void watchLeaderLockStateChange(String sessionId) {
KeyValueClient keyValueClient = client.keyValueClient();
KVCache kvCache = KVCache.newCache(keyValueClient, leaderLock, config.getService().getWatchLockEach());
kvCache.addListener(map -> {
Value value = map.get(leaderLock);
if(!value.getSession().isPresent()) {
keyValueClient.acquireLock(leaderLock, sessionId);
}
});
kvCache.start();
}
}
请注意,由于我尚未对其进行全面测试,因此该代码可能有错误。
您是否已阅读https://learn.hashicorp.com/consul/developer-configuration/elections?它在使用Consul进行领导者选举的应用程序级别遍历此场景。