Python 线程锁具有冷却时间以限制速率

问题描述 投票:0回答:1

我使用的是 Python 3.12 或更高版本。我需要一个线程锁,仅阻塞,具有冷却功能,可用作简单的速率限制器。这将帮助我避免过度锤击资源。

预期功能:

  1. 它必须具有线程锁的典型互斥特性。
  2. 在最后一次释放后的 1 秒内必须无法重新获取它。这是初始化时设置的冷却时间。

对应期望的接口是:

def __init__(self, cooldown: float = 1)

def acquire(self)  # Note: For simplicity, there is no 'blocking' or 'timeout' arg.

# As with `threading.Lock`:
def release(self)  # Must not block!
def __enter__(self)
def __exit__(self, exc_type, exc_val, exc_tb):

我有一个当前的解决方案,作为答案发布在下面,但我觉得由于潜在的竞争条件,它不是 100% 线程安全的,尽管它接近它。

第三方包如果有准确的实现也是可以接受的。如果它们以稍微不同的方式解决速率限制问题也没关系,只要它们是线程安全的。非线程安全产品将不起作用。

python multithreading locking rate-limiting
1个回答
0
投票

我当前的解决方案如下。它跟踪允许的最早重新获取锁的时间。

import threading
import time
from typing import Optional, Self


class CooldownLock:
    """A lock mechanism that also enforces a cooldown period between releases and subsequent acquisitions.

    This is useful in scenarios where it's also necessary to limit the frequency of lock acquisition,
    such as for managing access to a shared resource in a way that prevents excessively rapid consecutive uses.

    Caution: The implementation may have a slight race condition, although efforts have been made while acquiring the lock to minimize its impact.
    """

    def __init__(self, cooldown: float = 1, name: Optional[str] = None):
        """Initialize with a specified cooldown period.

        Args:
            cooldown: The cooldown period in seconds after releasing the lock before it can be acquired again.
            name: Optional name of lock used in log messages.
        """
        self._cooldown_period = cooldown
        self._name = f"{name} lock" if name else "lock"

        self._earliest_use_time = 0
        self._main_lock = threading.Lock()
        self._cooldown_lock = threading.Lock()

    def acquire(self) -> bool:
        num_approvals = 0
        with self._cooldown_lock:
            while num_approvals < 2:  # Intended to minimize effect of race condition.
                wait_time = self._earliest_use_time - time.monotonic()
                if wait_time > 0:
                    print(f"Sleeping for {wait_time:.1f}s to acquire {self._name}.")
                    time.sleep(wait_time)
                    num_approvals = 0
                else:
                    num_approvals += 1
            self._main_lock.acquire()
            return True

    def release(self):
        self._earliest_use_time = time.monotonic() + self._cooldown_period
        self._main_lock.release()

    def locked(self) -> bool:
        return self._main_lock.locked()

    def __enter__(self) -> Self:
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
© www.soinside.com 2019 - 2024. All rights reserved.