我需要限制同时处理相同资源的客户端数量
所以我尝试实现模拟
lock.lock();
try {
do work
} finally {
lock.unlock();
}
但以非阻塞方式使用 Reactor 库。 我有这样的东西。
但我有一个问题:
有没有更好的方法来做到这一点
或者也许有人知道已实施的解决方案
或者这不是在反应世界中应该如何完成的,并且还有另一种方法可以解决此类问题?
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class NonblockingLock {
private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);
private String currentOwner;
private final AtomicInteger lockCounter = new AtomicInteger();
private final FluxSink<Boolean> notifierSink;
private final Flux<Boolean> notifier;
private final String resourceId;
public NonblockingLock(String resourceId) {
this.resourceId = resourceId;
EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
notifier = processor.startWith(true);
}
/**
* Nonblocking version of
* <pre><code>
* lock.lock();
* try {
* do work
* } finally {
* lock.unlock();
* }
* </code></pre>
* */
public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
Objects.requireNonNull(owner, "owner");
return notifier.filter(it -> tryAcquire(owner))
.next()
.transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
.doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
.doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
.flatMapMany(it -> work)
.doFinally(s -> {
if (tryRelease(owner)) {
LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);
notifierSink.next(true);
}
});
}
private boolean tryAcquire(String owner) {
boolean acquired;
synchronized (this) {
if (currentOwner == null) {
currentOwner = owner;
}
acquired = currentOwner.equals(owner);
if (acquired) {
lockCounter.incrementAndGet();
}
}
return acquired;
}
private boolean tryRelease(String owner) {
boolean released = false;
synchronized (this) {
if (currentOwner.equals(owner)) {
int count = lockCounter.decrementAndGet();
if (count == 0) {
currentOwner = null;
released = true;
}
}
}
return released;
}
}
这就是我认为它应该工作的方式
@Test
public void processWithLock() throws Exception {
NonblockingLock lock = new NonblockingLock("work");
String client1 = "client1";
String client2 = "client2";
Flux<String> requests = getWork(client1, lock)
//emulate async request for resource by another client
.mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
//emulate async request for resource by the same client
.mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
StepVerifier.create(requests)
.expectSubscription()
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client1)
.expectNext(client2)
.expectNext(client2)
.expectNext(client2)
.expectComplete()
.verify(Duration.ofMillis(5000));
}
private static Flux<String> getWork(String client, NonblockingLock lock) {
return lock.processWithLock(client, null,
Flux.interval(Duration.ofMillis(300))
.take(3)
.map(i -> client)
.log(client)
);
}
既然Reactor引入了
Sinks
,实现这样的锁就更简单了。我写了一个库,你可以用它编写这样的代码:
import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;
Flux<String> getWork(String client, Duration delay, Lock lock) {
return Mono.delay(delay)
.flatMapMany(l -> lock.withLock(() ->
Flux.interval(Duration.ofMillis(300))
.take(3)
.map(i -> client)
.log(client)));
}
它在内部使用一个
Sinks.Empty
队列来跟踪锁请求。在每次解锁时,它只是从队列中轮询并向 Mono
发送一个 ON_COMPLETE
信号,这可能比使用 Sinks.many().multicast()
向所有请求者广播要好一些。它利用了 Sinks.Empty
不能被多次发射的特性,因此取消锁定(对于那些想要设置超时或处理复杂情况的人)将阻止 ON_COMPLETE
的发射,反之亦然.
并且通过将
Flux.using
包裹在锁上,可以确保在所有情况下都能正确解锁锁,例如 try-finally
.
如果您有兴趣,这里是实现的一部分。原来的答案是
synchronized
可能会在竞争条件下阻塞,下面用CAS操作重写,使锁是非阻塞的。 (在图书馆,现在所有的锁都是用CAS操作实现的。)
private volatile int count = 0; // 0 if unlocked
public LockHandle tryLock() {
if (COUNT.compareAndSet(this, 0, 1)) {
// Optimistic acquiring
return LockHandle.empty();
} else {
LockHandle handle = SinkUtils.queueSink(queue);
fairDecrement(false);
return handle;
}
}
public void unlock() {
if (fairness) {
fairDecrement(true);
} else {
COUNT.set(this, 0);
fairDecrement(false);
}
}
/*
* If not "unlocking", fairDecrement first increments COUNT so that it does not end up unlocking a lock.
* If "unlocking", we jump directly to the decrementing.
*/
private void fairDecrement(boolean unlocking) {
/*
* COUNT states:
* - COUNT == 0: The lock is unlocked, with no ongoing decrement operations.
* - COUNT >= 1: Either the lock is being held, or there is an ongoing decrement operation.
* Note that the two are mutual exclusive, since they both require COUNT++ == 0.
*
* If "unlocking", then we are responsible for decrements.
*
* Otherwise,
* 1. If COUNT++ >= 1, either someone is holding the lock, or there is an ongoing
* decrement operation. Either way, some thread will eventually emit to pending requests.
* We increment COUNT to signal to the emitter that the queue could have potentially been
* appended to after its last emission.
* 2. If COUNT++ == 0, then we are responsible for decrementing.
*/
if (unlocking || COUNT.incrementAndGet(this) == 1) {
do {
if (SinkUtils.emitAnySink(queue)) {
/*
* Leaves the decrementing job to the next lock holder, who will unlock somehow.
*/
return;
}
/*
* It is now safe to decrement COUNT, since there is no concurrent decrements.
*/
} while (COUNT.decrementAndGet(this) != 0);
}
}
另外,如果你想限制客户端数量为N而不是一个,库提供了
ReactiveSemaphore
,对应于java.util.concurrent.Semaphore
。
我有一个使用相同参数的远程服务独占调用的解决方案。也许对你的情况有帮助。
它基于立即
tryLock
如果资源繁忙和Mono.retryWhen
“等待”释放。
所以我有
LockData
锁的元数据类
public final class LockData {
// Lock key to identify same operation (same cache key, for example).
private final String key;
// Unique identifier for equals and hashCode.
private final String uuid;
// Date and time of the acquiring for lock duration limiting.
private final OffsetDateTime acquiredDateTime;
...
}
LockCommand
接口是 LockData 上阻塞操作的抽象
public interface LockCommand {
Tuple2<Boolean, LockData> tryLock(LockData lockData);
void unlock(LockData lockData);
...
}
UnlockEventsRegistry
接口是解锁事件监听器收集器的抽象。
public interface UnlockEventsRegistry {
// initialize event listeners collection when acquire lock
Mono<Void> add(LockData lockData);
// notify event listeners and remove collection when release lock
Mono<Void> remove(LockData lockData);
// register event listener for given lockData
Mono<Boolean> register(LockData lockData, Consumer<Integer> unlockEventListener);
}
和
Lock
类可以用锁包装源Mono,解锁和用解锁包装CacheMono writer。
public final class Lock {
private final LockCommand lockCommand;
private final LockData lockData;
private final UnlockEventsRegistry unlockEventsRegistry;
private final EmitterProcessor<Integer> unlockEvents;
private final FluxSink<Integer> unlockEventSink;
public Lock(LockCommand lockCommand, String key, UnlockEventsRegistry unlockEventsRegistry) {
this.lockCommand = lockCommand;
this.lockData = LockData.builder()
.key(key)
.uuid(UUID.randomUUID().toString())
.build();
this.unlockEventsRegistry = unlockEventsRegistry;
this.unlockEvents = EmitterProcessor.create(false);
this.unlockEventSink = unlockEvents.sink();
}
...
public final <T> Mono<T> tryLock(Mono<T> source, Scheduler scheduler) {
return Mono.fromCallable(() -> lockCommand.tryLock(lockData))
.subscribeOn(scheduler)
.flatMap(isLocked -> {
if (isLocked.getT1()) {
return unlockEventsRegistry.add(lockData)
.then(source
.switchIfEmpty(unlock().then(Mono.empty()))
.onErrorResume(throwable -> unlock().then(Mono.error(throwable))));
} else {
return Mono.error(new LockIsNotAvailableException(isLocked.getT2()));
}
});
}
public Mono<Void> unlock(Scheduler scheduler) {
return Mono.<Void>fromRunnable(() -> lockCommand.unlock(lockData))
.then(unlockEventsRegistry.remove(lockData))
.subscribeOn(scheduler);
}
public <KEY, VALUE> BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> unlockAfterCacheWriter(
BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> cacheWriter) {
Objects.requireNonNull(cacheWriter);
return cacheWriter.andThen(voidMono -> voidMono.then(unlock())
.onErrorResume(throwable -> unlock()));
}
public final <T> UnaryOperator<Mono<T>> retryTransformer() {
return mono -> mono
.doOnError(LockIsNotAvailableException.class,
error -> unlockEventsRegistry.register(error.getLockData(), unlockEventSink::next)
.doOnNext(registered -> {
if (!registered) unlockEventSink.next(0);
})
.then(Mono.just(2).map(unlockEventSink::next)
.delaySubscription(lockCommand.getMaxLockDuration()))
.subscribe())
.doOnError(throwable -> !(throwable instanceof LockIsNotAvailableException),
ignored -> unlockEventSink.next(0))
.retryWhen(errorFlux -> errorFlux.zipWith(unlockEvents, (error, integer) -> {
if (error instanceof LockIsNotAvailableException) return integer;
else throw Exceptions.propagate(error);
}));
}
}
现在如果我必须用 CacheMono 包装我的 Mono 并锁定,我可以这样做:
private Mono<String> getCachedLockedMono(String cacheKey, Mono<String> source, LockCommand lockCommand, UnlockEventsRegistry unlockEventsRegistry) {
Lock lock = new Lock(lockCommand, cacheKey, unlockEventsRegistry);
return CacheMono.lookup(CACHE_READER, cacheKey)
// Lock and double check
.onCacheMissResume(() -> lock.tryLock(Mono.fromCallable(CACHE::get).switchIfEmpty(source)))
.andWriteWith(lock.unlockAfterCacheWriter(CACHE_WRITER))
// Retry if lock is not available
.transform(lock.retryTransformer());
}
您可以在 GitHub 上找到带有示例的代码和测试
我知道这已经有几个合理的答案,但我认为有一个(主观上)更简单的解决方案可以利用
flatMap
(在类似信号量的用例中)或concatMap
(在lock
/synchronized
用例)来控制并行化。
本方案仅使用Sinks和Reactor算子来支持加锁。未订阅的发布者也不会使用锁。
public class ReactiveSemaphore {
/**
* This can be thought of as a queue of lock handles. The first argument of the tuple is a signaler that accepts a value
* value when a lock is available. The second argument is a Mono that completes when the lock is released.
*/
private final Sinks.Many<Tuple2<Sinks.One<Boolean>, Mono<Boolean>>> taskQueue;
private final Sinks.One<Boolean> close = Sinks.one();
/**
* Creates a ReactiveSemaphore that only allows one Publisher to be subscribed at a time. Executed by order
* of subscription.
*/
public ReactiveSemaphore() {
this(1);
}
/**
* Creates a ReactiveSemaphore that allows up to poolSize Publishers to be subscribed in parallel.
* @param poolSize The number of allowed subscriptions to run in parallel.
*/
public ReactiveSemaphore(int poolSize) {
taskQueue = Sinks.many().unicast().onBackpressureBuffer();
Flux<Boolean> tasks;
if (poolSize <= 1)
// We could use flatMap with parallelism of 1, but that seems weird
tasks = taskQueue
.asFlux()
.concatMap(ReactiveSemaphore::dispatchTask);
else {
tasks = taskQueue
.asFlux()
.flatMap(ReactiveSemaphore::dispatchTask, poolSize);
}
tasks
.takeUntilOther(close.asMono())
.subscribe();
}
private static Mono<Boolean> dispatchTask(Tuple2<Sinks.One<Boolean>, Mono<Boolean>> task) {
task.getT1().tryEmitValue(true); // signal that lock is available and consume lock
return task.getT2(); // return Mono that completes when lock is released
}
@PreDestroy
private void cleanup() {
close.tryEmitValue(true);
}
public <T> Publisher<T> lock(Publisher<T> publisher) {
return Flux.defer(() -> this.waitForNext(publisher));
}
public <T> Mono<T> lock(Mono<T> publisher) {
return Mono.defer(() -> this.waitForNext(publisher).next());
}
public <T> Flux<T> lock(Flux<T> publisher) {
return Flux.defer(() -> this.waitForNext(publisher));
}
/**
* Waits for an available lock in the taskQueue. When ReactiveSemaphore is ready, a lock will be allocated for the task
* and will not be released until the provided task errors or completes. For this reason this operation should
* only be performed on a hot publisher (a publisher that has been subscribed to). Therefore, this method should
* always be wrapped inside a call to {@link Flux#defer(Supplier)} or {@link Mono#defer(Supplier)}.
* @param task The task to execute once the ReactiveSemaphore has an available lock.
* @return The task wrapped in a Flux
* @param <T> The type of value returned by the task
*/
private <T> Flux<T> waitForNext(Publisher<T> task) {
var ready = Sinks.<Boolean>one();
var release = Sinks.<Boolean>one();
taskQueue.tryEmitNext(Tuples.of(ready, release.asMono()));
return ready.asMono()
.flatMapMany(ignored -> Flux.from(task))
.doOnComplete(() -> release.tryEmitValue(true))
.doOnError(err -> release.tryEmitValue(true));
}
}
用法:
ReactiveSemaphore semaphore = new ReactiveSemaphore();
semaphore.lock(someFluxMonoOrPublisher);
示例测试——在这个测试中,我们创建了 10 个 Monos,它们在 1 秒后发出一个值,并尝试并行运行所有它们,但是我们将它们包装在一个池大小为 2 的 ReactiveSemaphore 中,以便在其中运行的不超过 2 个并行:
@Test
public void testParallelExecution() {
var semaphore = new ReactiveSemaphore(2);
var monos = IntStream.range(0, 10)
.mapToObj(i -> Mono.fromSupplier(() -> {
log.info("Executing Mono {}", i);
return i;
})
.delayElement(Duration.ofMillis(1000)))
.map(mono -> semaphore.lock(mono));
var allMonos = Flux.fromStream(monos).flatMap(m -> m).doOnNext(v -> log.info("Got value {}", v));
StepVerifier.create(allMonos)
.expectNextCount(10)
.verifyComplete();
}
/* OUTPUT:
12:52:40.752 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 0
12:52:40.755 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 1
12:52:41.762 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Got value 0
12:52:41.765 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Executing Mono 2
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Got value 1
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Executing Mono 3
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 2
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 4
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 3
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 5
12:52:43.790 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 6
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 4
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 5
12:52:43.791 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 7
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 6
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 7
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 8
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 8