并发循环执行失败

问题描述 投票:0回答:1

我正在尝试编写负载均衡器的内存中实现,该实现将有一个 get() 方法以循环模式返回实例,我需要它在并发环境中正常运行,但是,即使在设置之后加上线程安全数据结构的锁和同步,我无法使测试通过。

我的负载均衡器:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class LoadBalancer1 {

    private final List<String> instanceList = new ArrayList<>();
    private static final int MAX_SIZE = 10;
    private final LoadBalancerStrategy strategy;
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

    public LoadBalancer1(LoadBalancerStrategy strategy) {
        this.strategy = strategy;
    }

    public boolean register(String instance) {
        writeLock.lock();
        try {
            if (size() == MAX_SIZE) {
                return false;
            }
            if (instanceList.contains(instance)) {
                return false;
            }
            instanceList.add(instance);
            return true;
        } finally {
            writeLock.unlock();
        }
    }

    public String get() {
        readLock.lock();
        try {
            return this.strategy.get(this.instanceList);
        }finally {
            readLock.unlock();
        }
    }

    public int size() {
        readLock.lock();
        try {
            return this.instanceList.size();
        } finally {
            readLock.unlock();
        }
    }
}

interface LoadBalancerStrategy {
    String get(List<String> instances);
}

class RandomStrategy implements LoadBalancerStrategy {
    @Override
    public String get(List<String> instances) {
        if (instances.isEmpty()) {
            return null;
        }
        return instances.get(ThreadLocalRandom.current().nextInt(instances.size()));
    }
}

class RoundRobinStrategy implements LoadBalancerStrategy {

    private final AtomicInteger currentIndex = new AtomicInteger(0);

    @Override
    public String get(List<String> instances) {  // Synchronize access
        if (instances.isEmpty()) {
            return null;
        }
        int index = currentIndex.getAndUpdate(i -> (i + 1) % instances.size());
        return instances.get(index);
    }
}

我的测试:

@RepeatedTest(100)
    void should_keep_round_robin_pattern_in_concurrent_environment() throws InterruptedException, ExecutionException {
        // given
        loadBalancerRoundRobin.register("1");
        loadBalancerRoundRobin.register("2");
        loadBalancerRoundRobin.register("3");
        Queue<Future<String>> objects = new ArrayBlockingQueue<>(30);

        int threads = Runtime.getRuntime().availableProcessors();
        var executorService = Executors.newScheduledThreadPool(threads);
        try {
            for (int i = 0; i < 30; i++) {
                Future<String> submit = executorService.submit(() -> loadBalancerRoundRobin.get());
                objects.add(submit);
            }
        } finally {
            executorService.shutdown();
            executorService.awaitTermination(1, TimeUnit.MINUTES);
        }

        for (int i = 0; i < objects.size() - 3; i += 3) {
            assertEquals("1", objects.remove().get());
            assertEquals("2", objects.remove().get());
            assertEquals("3", objects.remove().get());
        }
    }

对于每个测试,我都会创建新对象:

@BeforeEach
    void setUp() {
        randomStrategy = new RandomStrategy();
        roundRobin = new RoundRobinStrategy();
        loadBalancerRandomStrategy = new LoadBalancer1(randomStrategy);
        loadBalancerRoundRobin = new LoadBalancer1(roundRobin);
    }

我慢慢地没有办法找出问题所在,我相信

currentIndex
正在以不安全的方式进行修改,我不知道如何找到原因。或者也许
size()
方法有问题,但由于它已锁定,我不知道我还能做什么。如果有人能解释发生了什么,我将非常感激。

java concurrency atomic java.util.concurrent
1个回答
0
投票

您不是在测试

RoundRobinStrategy
的行为,而是测试
ExecutorService
的怪癖,正如@BenManes 在他的评论中指出的那样。相反,您需要一种机制来记录检索到的“实例”,该机制独立于检索它的线程,并确保维持检索顺序。

由于

LoadBalancerStrategy
是一个接口,我们可以在它前面放置一个委托器/装饰器,以确保检索和记录的序列化。以下示例使用简单的
Semaphore
来确保检索和记录保持同步。

class RoundRobinStrategyTest {

    private Delegator delegator;
    
    private LoadBalancer1 loadBalancer;
    
    
    /**
     * Sits in front of the real strategy and records the order of
     * invocation, assuring that retrieving a strategy instance and
     * putting in the queue is serialized.
     */
    static class Delegator implements LoadBalancerStrategy {
        
        private final LoadBalancerStrategy delegate;
        
        Queue<String> queue = new LinkedList<>();
        
        private Semaphore guard = new Semaphore(1);
        
        public Delegator(LoadBalancerStrategy delegate) {
            this.delegate = delegate;
        }

        @Override
        public String get(List<String> instances) {
            try {
                guard.acquire();
                try {
                    var s = delegate.get(instances);
                    queue.add(s);
                    return s;
                } finally {
                    guard.release();
                }
            } catch(InterruptedException e) {
                throw new RuntimeException("screwed up");
            }
        }
        
    }

    @BeforeEach
    void setUp() throws Exception {
        delegator = new Delegator(new RoundRobinStrategy());
        
        loadBalancer = new LoadBalancer1(delegator);
    }

    @RepeatedTest(100)
    void should_keep_round_robin_pattern_in_concurrent_environment() throws InterruptedException, ExecutionException {
        // given
        loadBalancer.register("1");
        loadBalancer.register("2");
        loadBalancer.register("3");

        int threads = Runtime.getRuntime().availableProcessors();
        var executorService = Executors.newScheduledThreadPool(threads);
        try {
            for (int i = 0; i < 30; i++) {
                executorService.submit(() -> loadBalancer.get());
            }
        } finally {
            executorService.shutdown();
            executorService.awaitTermination(1, TimeUnit.MINUTES);
        }

        var q = delegator.queue;
        var limit = q.size() - 3;
        for (int i = 0; i < limit; i += 3) {
            assertEquals("1", q.remove());
            assertEquals("2", q.remove());
            assertEquals("3", q.remove());
        }
    }
}

© www.soinside.com 2019 - 2024. All rights reserved.