在使用多线程进行测试时,基于 Bucket Token 算法创建的速率限制器会给出错误的总请求消耗值

问题描述 投票:0回答:1

我在 Java 中创建了一个基于存储桶令牌的速率限制器,并使用 Executor Service 通过单线程和多线程方法对其进行了测试。当我使用单个线程运行时,当我将每秒 20 个请求设置为速率时,我得到 200 作为消耗的总请求数。当我尝试使用执行器以多线程方法运行时,相同的配置我得到 220 作为消耗的总请求,即使我使用了同步关键字。我运行这两种方法总共 10 秒。

请找到以下两个课程:

BucketToken.class

package algo.interviewquestions;

public class BucketToken {
    private Integer maxBucketSize;
    private Integer tokensAvailable;
    private Long refillBucketRate;
    private Long nextRefillTime;
    private Long totalRequestsReceived;
    private Long totalRequestsConsumed;
    private Integer bucketRefillCount;

    public BucketToken(Integer maxBucketSize, Long refillBucketRate) {
        this.maxBucketSize = maxBucketSize;
        this.refillBucketRate = refillBucketRate;
        this.nextRefillTime = System.currentTimeMillis() + refillBucketRate;
        this.tokensAvailable = maxBucketSize;
        this.totalRequestsReceived = 0L;
        this.totalRequestsConsumed = 0L;
        this.bucketRefillCount = 0;
        refill();
    }

    public Long getTotalRequestsConsumed() {
        return totalRequestsConsumed;
    }

    public Integer getTokensAvailable() {
        return tokensAvailable;
    }

    public Integer getBucketRefillCount() {
        return bucketRefillCount;
    }

    public Long getTotalRequestsReceived() {
        return totalRequestsReceived;
    }

     public synchronized boolean tryConsume() {
        totalRequestsReceived++;
        refill();
        if (this.tokensAvailable > 0) {
            this.tokensAvailable --;
            this.totalRequestsConsumed++;
            return true;
        }
        return false;
    }

    private synchronized void refill() {
        if (System.currentTimeMillis() < this.nextRefillTime) {
            return;
        }
        this.nextRefillTime = System.currentTimeMillis() + this.refillBucketRate;
        this.tokensAvailable = Math.max(this.maxBucketSize, this.tokensAvailable);
        this.bucketRefillCount++;
    }

}

BucketTokenTest.class

package algo.interviewquestions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class BucketTokenTest {
    public static void main(String[] args) throws InterruptedException {
        BucketToken bucketToken = new BucketToken(20, 1000L);
        AtomicInteger totalRequestsConsumed1 = runInMultiThreadedApproach(bucketToken);

        BucketToken bucketToken2 = new BucketToken(20, 1000L);
        AtomicInteger totalRequestsConsumed2 = runWithSingleThread(bucketToken2);

        printRequestsStats(bucketToken, totalRequestsConsumed1);
        printRequestsStats(bucketToken2, totalRequestsConsumed2);
    }

    private static AtomicInteger runWithSingleThread(BucketToken bucketToken) {
        AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < 10000L) {
            if (bucketToken.tryConsume()) {
                totalRequestsConsumed.incrementAndGet();
                System.out.println(Thread.currentThread().getName() + " - Request Accepted");
                continue;
            }
            System.out.println(Thread.currentThread().getName() + " - Request Declined");
        }
        return totalRequestsConsumed;
    }

    private static void printRequestsStats(BucketToken bucketToken, AtomicInteger totalRequestsConsumed) {
        System.out.println("Total Requests Received = " + bucketToken.getTotalRequestsReceived());
        System.out.println("Total Requests Accepted = " + totalRequestsConsumed.get());
        System.out.println("Total Requests Accepted = " + bucketToken.getTotalRequestsConsumed());
        System.out.println("Bucket refill count = " + bucketToken.getBucketRefillCount());
    }

    private static AtomicInteger runInMultiThreadedApproach(BucketToken bucketToken) throws InterruptedException {
        int threadCount = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
        long startTime = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(threadCount);
        IntStream.rangeClosed(1,threadCount).boxed().forEach(t -> executorService.submit(() -> {
            while (System.currentTimeMillis() - startTime < 10000L) {
                if (bucketToken.tryConsume()) {
                    totalRequestsConsumed.incrementAndGet();
                    System.out.println(Thread.currentThread().getName() + " - Request Accepted");
                    continue;
                }
                System.out.println(Thread.currentThread().getName() + " - Request Declined");
            }
            latch.countDown();
        }));
        long startTime2 = System.currentTimeMillis();
        latch.await();
        System.out.println("Completed in " + (System.currentTimeMillis() - startTime2) + "ms");
        executorService.shutdown();
        return totalRequestsConsumed;
    }
}

有人可以告诉我需要做哪些更改来处理并发修改或者我缺少什么吗?

我希望单线程和多线程方法都能在 10 秒内给出相同的请求消耗计数,即 200。

我尝试在完成修改的方法上使用同步关键字,但我仍然看到不同的值。

java rate-limiting thread-synchronization concurrentmodification
1个回答
0
投票

我终于找到了我发布的问题。我在存储桶令牌中使用的增量运算符是后一元运算符(++ 和 --),这会导致消耗额外的请求,因为令牌递减发生在下一次迭代中。这表明我们在使用这些后置一元运算符时需要非常小心,或者如果我们想要在不处理额外增量的情况下拥有一致的逻辑,则需要避免使用它们。 更新后的同步块如下。

public boolean tryConsume() {
        synchronized (objectLock) {
            totalRequestsReceived += 1;
            refill();
            if (this.tokensAvailable > 0) {
                this.tokensAvailable -= 1;
                this.totalRequestsConsumed += 1;
                return true;
            }
            return false;
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.