我在 Java 中创建了一个基于存储桶令牌的速率限制器,并使用 Executor Service 通过单线程和多线程方法对其进行了测试。当我使用单个线程运行时,当我将每秒 20 个请求设置为速率时,我得到 200 作为消耗的总请求数。当我尝试使用执行器以多线程方法运行时,相同的配置我得到 220 作为消耗的总请求,即使我使用了同步关键字。我运行这两种方法总共 10 秒。
请找到以下两个课程:
BucketToken.class
package algo.interviewquestions;
public class BucketToken {
private Integer maxBucketSize;
private Integer tokensAvailable;
private Long refillBucketRate;
private Long nextRefillTime;
private Long totalRequestsReceived;
private Long totalRequestsConsumed;
private Integer bucketRefillCount;
public BucketToken(Integer maxBucketSize, Long refillBucketRate) {
this.maxBucketSize = maxBucketSize;
this.refillBucketRate = refillBucketRate;
this.nextRefillTime = System.currentTimeMillis() + refillBucketRate;
this.tokensAvailable = maxBucketSize;
this.totalRequestsReceived = 0L;
this.totalRequestsConsumed = 0L;
this.bucketRefillCount = 0;
refill();
}
public Long getTotalRequestsConsumed() {
return totalRequestsConsumed;
}
public Integer getTokensAvailable() {
return tokensAvailable;
}
public Integer getBucketRefillCount() {
return bucketRefillCount;
}
public Long getTotalRequestsReceived() {
return totalRequestsReceived;
}
public synchronized boolean tryConsume() {
totalRequestsReceived++;
refill();
if (this.tokensAvailable > 0) {
this.tokensAvailable --;
this.totalRequestsConsumed++;
return true;
}
return false;
}
private synchronized void refill() {
if (System.currentTimeMillis() < this.nextRefillTime) {
return;
}
this.nextRefillTime = System.currentTimeMillis() + this.refillBucketRate;
this.tokensAvailable = Math.max(this.maxBucketSize, this.tokensAvailable);
this.bucketRefillCount++;
}
}
BucketTokenTest.class
package algo.interviewquestions;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class BucketTokenTest {
public static void main(String[] args) throws InterruptedException {
BucketToken bucketToken = new BucketToken(20, 1000L);
AtomicInteger totalRequestsConsumed1 = runInMultiThreadedApproach(bucketToken);
BucketToken bucketToken2 = new BucketToken(20, 1000L);
AtomicInteger totalRequestsConsumed2 = runWithSingleThread(bucketToken2);
printRequestsStats(bucketToken, totalRequestsConsumed1);
printRequestsStats(bucketToken2, totalRequestsConsumed2);
}
private static AtomicInteger runWithSingleThread(BucketToken bucketToken) {
AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 10000L) {
if (bucketToken.tryConsume()) {
totalRequestsConsumed.incrementAndGet();
System.out.println(Thread.currentThread().getName() + " - Request Accepted");
continue;
}
System.out.println(Thread.currentThread().getName() + " - Request Declined");
}
return totalRequestsConsumed;
}
private static void printRequestsStats(BucketToken bucketToken, AtomicInteger totalRequestsConsumed) {
System.out.println("Total Requests Received = " + bucketToken.getTotalRequestsReceived());
System.out.println("Total Requests Accepted = " + totalRequestsConsumed.get());
System.out.println("Total Requests Accepted = " + bucketToken.getTotalRequestsConsumed());
System.out.println("Bucket refill count = " + bucketToken.getBucketRefillCount());
}
private static AtomicInteger runInMultiThreadedApproach(BucketToken bucketToken) throws InterruptedException {
int threadCount = 5;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(threadCount);
IntStream.rangeClosed(1,threadCount).boxed().forEach(t -> executorService.submit(() -> {
while (System.currentTimeMillis() - startTime < 10000L) {
if (bucketToken.tryConsume()) {
totalRequestsConsumed.incrementAndGet();
System.out.println(Thread.currentThread().getName() + " - Request Accepted");
continue;
}
System.out.println(Thread.currentThread().getName() + " - Request Declined");
}
latch.countDown();
}));
long startTime2 = System.currentTimeMillis();
latch.await();
System.out.println("Completed in " + (System.currentTimeMillis() - startTime2) + "ms");
executorService.shutdown();
return totalRequestsConsumed;
}
}
有人可以告诉我需要做哪些更改来处理并发修改或者我缺少什么吗?
我希望单线程和多线程方法都能在 10 秒内给出相同的请求消耗计数,即 200。
我尝试在完成修改的方法上使用同步关键字,但我仍然看到不同的值。
我终于找到了我发布的问题。我在存储桶令牌中使用的增量运算符是后一元运算符(++ 和 --),这会导致消耗额外的请求,因为令牌递减发生在下一次迭代中。这表明我们在使用这些后置一元运算符时需要非常小心,或者如果我们想要在不处理额外增量的情况下拥有一致的逻辑,则需要避免使用它们。 更新后的同步块如下。
public boolean tryConsume() {
synchronized (objectLock) {
totalRequestsReceived += 1;
refill();
if (this.tokensAvailable > 0) {
this.tokensAvailable -= 1;
this.totalRequestsConsumed += 1;
return true;
}
return false;
}
}