我使用的是 Python 3.12 或更高版本。我需要一个线程锁,仅阻塞,具有冷却功能,可用作简单的速率限制器。这将帮助我避免过度锤击资源。
预期功能:
对应期望的接口是:
def __init__(self, cooldown: float = 1)
def acquire(self) # Note: For simplicity, there is no 'blocking' or 'timeout' arg.
# As with `threading.Lock`:
def release(self) # Must not block!
def __enter__(self)
def __exit__(self, exc_type, exc_val, exc_tb):
我有一个当前的解决方案,作为答案发布在下面,但我觉得由于潜在的竞争条件,它不是 100% 线程安全的,尽管它接近它。
第三方包如果有准确的实现也是可以接受的。如果它们以稍微不同的方式解决速率限制问题也没关系,只要它们是线程安全的。非线程安全产品将不起作用。
我当前的解决方案如下。它跟踪允许的最早重新获取锁的时间。
import threading
import time
from typing import Optional, Self
class CooldownLock:
"""A lock mechanism that also enforces a cooldown period between releases and subsequent acquisitions.
This is useful in scenarios where it's also necessary to limit the frequency of lock acquisition,
such as for managing access to a shared resource in a way that prevents excessively rapid consecutive uses.
Caution: The implementation may have a slight race condition, although efforts have been made while acquiring the lock to minimize its impact.
"""
def __init__(self, cooldown: float = 1, name: Optional[str] = None):
"""Initialize with a specified cooldown period.
Args:
cooldown: The cooldown period in seconds after releasing the lock before it can be acquired again.
name: Optional name of lock used in log messages.
"""
self._cooldown_period = cooldown
self._name = f"{name} lock" if name else "lock"
self._earliest_use_time = 0
self._main_lock = threading.Lock()
self._cooldown_lock = threading.Lock()
def acquire(self) -> bool:
num_approvals = 0
with self._cooldown_lock:
while num_approvals < 2: # Intended to minimize effect of race condition.
wait_time = self._earliest_use_time - time.monotonic()
if wait_time > 0:
print(f"Sleeping for {wait_time:.1f}s to acquire {self._name}.")
time.sleep(wait_time)
num_approvals = 0
else:
num_approvals += 1
self._main_lock.acquire()
return True
def release(self):
self._earliest_use_time = time.monotonic() + self._cooldown_period
self._main_lock.release()
def locked(self) -> bool:
return self._main_lock.locked()
def __enter__(self) -> Self:
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()