您会如何将
threading.RLock
与 threading.Semaphore
结合起来?或者这样的结构已经存在吗?
在Python中,有一个可重入锁的原语,
threading.RLock(N)
,它允许同一线程多次获取锁,但其他线程不能。还有threading.Semaphore(N)
,允许在阻塞前获取N
次锁。如何将这两种结构结合起来?我希望最多 N
个单独的线程能够获取锁,但我希望线程上的每个单独的锁都是可重入的。
所以我猜可重入信号量不存在。这是我想出的实现,很高兴接受评论。
import threading
import datetime
class ReentrantSemaphore(object):
'''A counting Semaphore which allows threads to reenter.'''
def __init__(self, value = 1):
self.local = threading.local()
self.sem = threading.Semaphore(value)
def acquire(self):
if not getattr(self.local, 'lock_level', 0):
# We do not yet have the lock, acquire it.
start = datetime.datetime.utcnow()
self.sem.acquire()
end = datetime.datetime.utcnow()
if end - start > datetime.timedelta(seconds = 3):
logging.info("Took %d Sec to lock."%((end - start).total_seconds()))
self.local.lock_time = end
self.local.lock_level = 1
else:
# We already have the lock, just increment it due to the recursive call.
self.local.lock_level += 1
def release(self):
if getattr(self.local, 'lock_level', 0) < 1:
raise Exception("Trying to release a released lock.")
self.local.lock_level -= 1
if self.local.lock_level == 0:
self.sem.release()
__enter__ = acquire
def __exit__(self, t, v, tb):
self.release()
您的实施已经很好了。
threading.local()
方法简单高效,因此通过对超时和非阻塞调用的额外支持,您将拥有一个完整的可重入信号量。不过,我还编写了自己的组合这两种结构的变体,我称之为 aiologic.RCapacityLimiter
(我是 aiologic 的创建者)。
import time
from concurrent.futures import ThreadPoolExecutor
from aiologic import RCapacityLimiter
limiter = RCapacityLimiter(2)
def subfunc(i):
with limiter:
assert limiter.borrowed_tokens <= 2
time.sleep(0.5)
print(f"it works! (thread #{i})")
def func(i):
with limiter:
subfunc(i)
with ThreadPoolExecutor(4) as executor:
for i in range(4):
executor.submit(func, i)
与信号量相比,可重入容量限制器为您提供有关运行时发生的情况的更多信息:
RCapacityLimiter.waiting
是等待获取限制器的线程数。RCapacityLimiter.available_tokens
是可用的非阻塞调用的数量。RCapacityLimiter.borrowed_tokens
是已获取限制器的线程数。RCapacityLimiter.total_tokens
是最大线程数。RCapacityLimiter.borrowers
是一个字典,包含有关哪个线程已获取限制器多少次的信息。除此之外,您还可以获得
aiologic
包的所有其他功能,例如对 asyncio
等异步库的支持。