try {
do work
} finally {
但以非阻塞方式使用 Reactor 库。 我有这样的东西。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class NonblockingLock {
private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);
private String currentOwner;
private final AtomicInteger lockCounter = new AtomicInteger();
private final FluxSink<Boolean> notifierSink;
private final Flux<Boolean> notifier;
private final String resourceId;
public NonblockingLock(String resourceId) {
this.resourceId = resourceId;
EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
notifier = processor.startWith(true);
* Nonblocking version of
* <pre><code>
* lock.lock();
* try {
* do work
* } finally {
* lock.unlock();
* }
* </code></pre>
* */
public <T> Flux<T> processWithLock(String owner, @Nullable Duration tryLockTimeout, Flux<T> work) {
Objects.requireNonNull(owner, "owner");
return notifier.filter(it -> tryAcquire(owner))
.transform(locked -> tryLockTimeout == null ? locked : locked.timeout(tryLockTimeout))
.doOnSubscribe(s -> LOG.debug("trying to obtain lock for resourceId: {}, by owner: {}", resourceId, owner))
.doOnError(err -> LOG.error("can't obtain lock for resourceId: {}, by owner: {}, error: {}", resourceId, owner, err.getMessage()))
.flatMapMany(it -> work)
.doFinally(s -> {
if (tryRelease(owner)) {
LOG.debug("release lock resourceId: {}, owner: {}", resourceId, owner);
private boolean tryAcquire(String owner) {
boolean acquired;
synchronized (this) {
if (currentOwner == null) {
currentOwner = owner;
acquired = currentOwner.equals(owner);
if (acquired) {
return acquired;
private boolean tryRelease(String owner) {
boolean released = false;
synchronized (this) {
if (currentOwner.equals(owner)) {
int count = lockCounter.decrementAndGet();
if (count == 0) {
currentOwner = null;
released = true;
return released;
public void processWithLock() throws Exception {
NonblockingLock lock = new NonblockingLock("work");
String client1 = "client1";
String client2 = "client2";
Flux<String> requests = getWork(client1, lock)
//emulate async request for resource by another client
.mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
//emulate async request for resource by the same client
.mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
private static Flux<String> getWork(String client, NonblockingLock lock) {
return lock.processWithLock(client, null,
.map(i -> client)
import party.iroiro.lock.Lock;
import party.iroiro.lock.ReactiveLock;
Flux<String> getWork(String client, Duration delay, Lock lock) {
return Mono.delay(delay)
.flatMapMany(l -> lock.withLock(() ->
.map(i -> client)
队列来跟踪锁请求。在每次解锁时,它只是从队列中轮询并向 Mono
信号,这可能比使用 Sinks.many().multicast()
向所有请求者广播要好一些。它利用了 Sinks.Empty
不能被多次发射的特性,因此取消锁定(对于那些想要设置超时或处理复杂情况的人)将阻止 ON_COMPLETE
包裹在锁上,可以确保在所有情况下都能正确解锁锁,例如 try-finally
可能会在竞争条件下阻塞,下面用CAS操作重写,使锁是非阻塞的。 (在图书馆,现在所有的锁都是用CAS操作实现的。)
private volatile int count = 0; // 0 if unlocked
public LockHandle tryLock() {
if (COUNT.compareAndSet(this, 0, 1)) {
// Optimistic acquiring
return LockHandle.empty();
} else {
LockHandle handle = SinkUtils.queueSink(queue);
return handle;
public void unlock() {
if (fairness) {
} else {
COUNT.set(this, 0);
* If not "unlocking", fairDecrement first increments COUNT so that it does not end up unlocking a lock.
* If "unlocking", we jump directly to the decrementing.
private void fairDecrement(boolean unlocking) {
* COUNT states:
* - COUNT == 0: The lock is unlocked, with no ongoing decrement operations.
* - COUNT >= 1: Either the lock is being held, or there is an ongoing decrement operation.
* Note that the two are mutual exclusive, since they both require COUNT++ == 0.
* If "unlocking", then we are responsible for decrements.
* Otherwise,
* 1. If COUNT++ >= 1, either someone is holding the lock, or there is an ongoing
* decrement operation. Either way, some thread will eventually emit to pending requests.
* We increment COUNT to signal to the emitter that the queue could have potentially been
* appended to after its last emission.
* 2. If COUNT++ == 0, then we are responsible for decrementing.
if (unlocking || COUNT.incrementAndGet(this) == 1) {
do {
if (SinkUtils.emitAnySink(queue)) {
* Leaves the decrementing job to the next lock holder, who will unlock somehow.
* It is now safe to decrement COUNT, since there is no concurrent decrements.
} while (COUNT.decrementAndGet(this) != 0);
public final class LockData {
// Lock key to identify same operation (same cache key, for example).
private final String key;
// Unique identifier for equals and hashCode.
private final String uuid;
// Date and time of the acquiring for lock duration limiting.
private final OffsetDateTime acquiredDateTime;
接口是 LockData 上阻塞操作的抽象
public interface LockCommand {
Tuple2<Boolean, LockData> tryLock(LockData lockData);
void unlock(LockData lockData);
public interface UnlockEventsRegistry {
// initialize event listeners collection when acquire lock
Mono<Void> add(LockData lockData);
// notify event listeners and remove collection when release lock
Mono<Void> remove(LockData lockData);
// register event listener for given lockData
Mono<Boolean> register(LockData lockData, Consumer<Integer> unlockEventListener);
类可以用锁包装源Mono,解锁和用解锁包装CacheMono writer。
public final class Lock {
private final LockCommand lockCommand;
private final LockData lockData;
private final UnlockEventsRegistry unlockEventsRegistry;
private final EmitterProcessor<Integer> unlockEvents;
private final FluxSink<Integer> unlockEventSink;
public Lock(LockCommand lockCommand, String key, UnlockEventsRegistry unlockEventsRegistry) {
this.lockCommand = lockCommand;
this.lockData = LockData.builder()
this.unlockEventsRegistry = unlockEventsRegistry;
this.unlockEvents = EmitterProcessor.create(false);
this.unlockEventSink = unlockEvents.sink();
public final <T> Mono<T> tryLock(Mono<T> source, Scheduler scheduler) {
return Mono.fromCallable(() -> lockCommand.tryLock(lockData))
.flatMap(isLocked -> {
if (isLocked.getT1()) {
return unlockEventsRegistry.add(lockData)
.onErrorResume(throwable -> unlock().then(Mono.error(throwable))));
} else {
return Mono.error(new LockIsNotAvailableException(isLocked.getT2()));
public Mono<Void> unlock(Scheduler scheduler) {
return Mono.<Void>fromRunnable(() -> lockCommand.unlock(lockData))
public <KEY, VALUE> BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> unlockAfterCacheWriter(
BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> cacheWriter) {
return cacheWriter.andThen(voidMono -> voidMono.then(unlock())
.onErrorResume(throwable -> unlock()));
public final <T> UnaryOperator<Mono<T>> retryTransformer() {
return mono -> mono
error -> unlockEventsRegistry.register(error.getLockData(), unlockEventSink::next)
.doOnNext(registered -> {
if (!registered) unlockEventSink.next(0);
.doOnError(throwable -> !(throwable instanceof LockIsNotAvailableException),
ignored -> unlockEventSink.next(0))
.retryWhen(errorFlux -> errorFlux.zipWith(unlockEvents, (error, integer) -> {
if (error instanceof LockIsNotAvailableException) return integer;
else throw Exceptions.propagate(error);
现在如果我必须用 CacheMono 包装我的 Mono 并锁定,我可以这样做:
private Mono<String> getCachedLockedMono(String cacheKey, Mono<String> source, LockCommand lockCommand, UnlockEventsRegistry unlockEventsRegistry) {
Lock lock = new Lock(lockCommand, cacheKey, unlockEventsRegistry);
return CacheMono.lookup(CACHE_READER, cacheKey)
// Lock and double check
.onCacheMissResume(() -> lock.tryLock(Mono.fromCallable(CACHE::get).switchIfEmpty(source)))
// Retry if lock is not available
您可以在 GitHub 上找到带有示例的代码和测试
public class ReactiveSemaphore {
* This can be thought of as a queue of lock handles. The first argument of the tuple is a signaler that accepts a value
* value when a lock is available. The second argument is a Mono that completes when the lock is released.
private final Sinks.Many<Tuple2<Sinks.One<Boolean>, Mono<Boolean>>> taskQueue;
private final Sinks.One<Boolean> close = Sinks.one();
* Creates a ReactiveSemaphore that only allows one Publisher to be subscribed at a time. Executed by order
* of subscription.
public ReactiveSemaphore() {
* Creates a ReactiveSemaphore that allows up to poolSize Publishers to be subscribed in parallel.
* @param poolSize The number of allowed subscriptions to run in parallel.
public ReactiveSemaphore(int poolSize) {
taskQueue = Sinks.many().unicast().onBackpressureBuffer();
Flux<Boolean> tasks;
if (poolSize <= 1)
// We could use flatMap with parallelism of 1, but that seems weird
tasks = taskQueue
else {
tasks = taskQueue
.flatMap(ReactiveSemaphore::dispatchTask, poolSize);
private static Mono<Boolean> dispatchTask(Tuple2<Sinks.One<Boolean>, Mono<Boolean>> task) {
task.getT1().tryEmitValue(true); // signal that lock is available and consume lock
return task.getT2(); // return Mono that completes when lock is released
private void cleanup() {
public <T> Publisher<T> lock(Publisher<T> publisher) {
return Flux.defer(() -> this.waitForNext(publisher));
public <T> Mono<T> lock(Mono<T> publisher) {
return Mono.defer(() -> this.waitForNext(publisher).next());
public <T> Flux<T> lock(Flux<T> publisher) {
return Flux.defer(() -> this.waitForNext(publisher));
* Waits for an available lock in the taskQueue. When ReactiveSemaphore is ready, a lock will be allocated for the task
* and will not be released until the provided task errors or completes. For this reason this operation should
* only be performed on a hot publisher (a publisher that has been subscribed to). Therefore, this method should
* always be wrapped inside a call to {@link Flux#defer(Supplier)} or {@link Mono#defer(Supplier)}.
* @param task The task to execute once the ReactiveSemaphore has an available lock.
* @return The task wrapped in a Flux
* @param <T> The type of value returned by the task
private <T> Flux<T> waitForNext(Publisher<T> task) {
var ready = Sinks.<Boolean>one();
var release = Sinks.<Boolean>one();
taskQueue.tryEmitNext(Tuples.of(ready, release.asMono()));
return ready.asMono()
.flatMapMany(ignored -> Flux.from(task))
.doOnComplete(() -> release.tryEmitValue(true))
.doOnError(err -> release.tryEmitValue(true));
ReactiveSemaphore semaphore = new ReactiveSemaphore();
示例测试——在这个测试中,我们创建了 10 个 Monos,它们在 1 秒后发出一个值,并尝试并行运行所有它们,但是我们将它们包装在一个池大小为 2 的 ReactiveSemaphore 中,以便在其中运行的不超过 2 个并行:
public void testParallelExecution() {
var semaphore = new ReactiveSemaphore(2);
var monos = IntStream.range(0, 10)
.mapToObj(i -> Mono.fromSupplier(() -> {
log.info("Executing Mono {}", i);
return i;
.map(mono -> semaphore.lock(mono));
var allMonos = Flux.fromStream(monos).flatMap(m -> m).doOnNext(v -> log.info("Got value {}", v));
12:52:40.752 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 0
12:52:40.755 [main] INFO my.package.ReactiveSemaphoreTest - Executing Mono 1
12:52:41.762 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Got value 0
12:52:41.765 [parallel-1] INFO my.package.ReactiveSemaphoreTest - Executing Mono 2
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Got value 1
12:52:41.767 [parallel-2] INFO my.package.ReactiveSemaphoreTest - Executing Mono 3
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 2
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 4
12:52:42.780 [parallel-3] INFO my.package.ReactiveSemaphoreTest - Got value 3
12:52:42.780 [parallel-4] INFO my.package.ReactiveSemaphoreTest - Executing Mono 5
12:52:43.790 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 6
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 4
12:52:43.790 [parallel-5] INFO my.package.ReactiveSemaphoreTest - Got value 5
12:52:43.791 [parallel-6] INFO my.package.ReactiveSemaphoreTest - Executing Mono 7
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 6
12:52:44.802 [parallel-7] INFO my.package.ReactiveSemaphoreTest - Got value 7
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 8
12:52:44.802 [parallel-8] INFO my.package.ReactiveSemaphoreTest - Executing Mono 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 9
12:52:45.814 [parallel-10] INFO my.package.ReactiveSemaphoreTest - Got value 8