我正在尝试编写负载均衡器的内存中实现,该实现将有一个 get() 方法以循环模式返回实例,我需要它在并发环境中正常运行,但是,即使在设置之后加上线程安全数据结构的锁和同步,我无法使测试通过。
我的负载均衡器:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LoadBalancer1 {
private final List<String> instanceList = new ArrayList<>();
private static final int MAX_SIZE = 10;
private final LoadBalancerStrategy strategy;
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private static final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
public LoadBalancer1(LoadBalancerStrategy strategy) {
this.strategy = strategy;
}
public boolean register(String instance) {
writeLock.lock();
try {
if (size() == MAX_SIZE) {
return false;
}
if (instanceList.contains(instance)) {
return false;
}
instanceList.add(instance);
return true;
} finally {
writeLock.unlock();
}
}
public String get() {
readLock.lock();
try {
return this.strategy.get(this.instanceList);
}finally {
readLock.unlock();
}
}
public int size() {
readLock.lock();
try {
return this.instanceList.size();
} finally {
readLock.unlock();
}
}
}
interface LoadBalancerStrategy {
String get(List<String> instances);
}
class RandomStrategy implements LoadBalancerStrategy {
@Override
public String get(List<String> instances) {
if (instances.isEmpty()) {
return null;
}
return instances.get(ThreadLocalRandom.current().nextInt(instances.size()));
}
}
class RoundRobinStrategy implements LoadBalancerStrategy {
private final AtomicInteger currentIndex = new AtomicInteger(0);
@Override
public String get(List<String> instances) { // Synchronize access
if (instances.isEmpty()) {
return null;
}
int index = currentIndex.getAndUpdate(i -> (i + 1) % instances.size());
return instances.get(index);
}
}
我的测试:
@RepeatedTest(100)
void should_keep_round_robin_pattern_in_concurrent_environment() throws InterruptedException, ExecutionException {
// given
loadBalancerRoundRobin.register("1");
loadBalancerRoundRobin.register("2");
loadBalancerRoundRobin.register("3");
Queue<Future<String>> objects = new ArrayBlockingQueue<>(30);
int threads = Runtime.getRuntime().availableProcessors();
var executorService = Executors.newScheduledThreadPool(threads);
try {
for (int i = 0; i < 30; i++) {
Future<String> submit = executorService.submit(() -> loadBalancerRoundRobin.get());
objects.add(submit);
}
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
for (int i = 0; i < objects.size() - 3; i += 3) {
assertEquals("1", objects.remove().get());
assertEquals("2", objects.remove().get());
assertEquals("3", objects.remove().get());
}
}
对于每个测试,我都会创建新对象:
@BeforeEach
void setUp() {
randomStrategy = new RandomStrategy();
roundRobin = new RoundRobinStrategy();
loadBalancerRandomStrategy = new LoadBalancer1(randomStrategy);
loadBalancerRoundRobin = new LoadBalancer1(roundRobin);
}
我慢慢地没有办法找出问题所在,我相信
currentIndex
正在以不安全的方式进行修改,我不知道如何找到原因。或者也许 size()
方法有问题,但由于它已锁定,我不知道我还能做什么。如果有人能解释发生了什么,我将非常感激。
您不是在测试
RoundRobinStrategy
的行为,而是测试 ExecutorService
的怪癖,正如@BenManes 在他的评论中指出的那样。相反,您需要一种机制来记录检索到的“实例”,该机制独立于检索它的线程,并确保维持检索顺序。
由于
LoadBalancerStrategy
是一个接口,我们可以在它前面放置一个委托器/装饰器,以确保检索和记录的序列化。以下示例使用简单的 Semaphore
来确保检索和记录保持同步。
class RoundRobinStrategyTest {
private Delegator delegator;
private LoadBalancer1 loadBalancer;
/**
* Sits in front of the real strategy and records the order of
* invocation, assuring that retrieving a strategy instance and
* putting in the queue is serialized.
*/
static class Delegator implements LoadBalancerStrategy {
private final LoadBalancerStrategy delegate;
Queue<String> queue = new LinkedList<>();
private Semaphore guard = new Semaphore(1);
public Delegator(LoadBalancerStrategy delegate) {
this.delegate = delegate;
}
@Override
public String get(List<String> instances) {
try {
guard.acquire();
try {
var s = delegate.get(instances);
queue.add(s);
return s;
} finally {
guard.release();
}
} catch(InterruptedException e) {
throw new RuntimeException("screwed up");
}
}
}
@BeforeEach
void setUp() throws Exception {
delegator = new Delegator(new RoundRobinStrategy());
loadBalancer = new LoadBalancer1(delegator);
}
@RepeatedTest(100)
void should_keep_round_robin_pattern_in_concurrent_environment() throws InterruptedException, ExecutionException {
// given
loadBalancer.register("1");
loadBalancer.register("2");
loadBalancer.register("3");
int threads = Runtime.getRuntime().availableProcessors();
var executorService = Executors.newScheduledThreadPool(threads);
try {
for (int i = 0; i < 30; i++) {
executorService.submit(() -> loadBalancer.get());
}
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
var q = delegator.queue;
var limit = q.size() - 3;
for (int i = 0; i < limit; i += 3) {
assertEquals("1", q.remove());
assertEquals("2", q.remove());
assertEquals("3", q.remove());
}
}
}