我正在使用这样的 Python 3 序列:
lock = threading.Lock()
res = lock.acquire(timeout=10)
if res:
# do something ....
lock.release()
else:
# do something else ...
我更愿意使用with语句而不是显式的“获取”和“释放”,但我不知道如何获得超时效果。
您可以使用上下文管理器轻松完成此操作:
import threading
from contextlib import contextmanager
@contextmanager
def acquire_timeout(lock, timeout):
result = lock.acquire(timeout=timeout)
try:
yield result
finally:
if result:
lock.release()
# Usage:
lock = threading.Lock()
with acquire_timeout(lock, 2) as acquired:
if acquired:
print('got the lock')
# do something ....
else:
print('timeout: lock not available')
# do something else ...
*注意:这在 Python 2.x 中不起作用,因为
timeout
没有 Lock.acquire
参数。
import threading
from contextlib import contextmanager
class TimeoutLock(object):
def __init__(self):
self._lock = threading.Lock()
def acquire(self, blocking=True, timeout=-1):
return self._lock.acquire(blocking, timeout)
@contextmanager
def acquire_timeout(self, timeout):
result = self._lock.acquire(timeout=timeout)
yield result
if result:
self._lock.release()
def release(self):
self._lock.release()
# Usage:
lock = TimeoutLock()
with lock.acquire_timeout(3) as result:
if result:
print('got the lock')
# do something ....
else:
print('timeout: lock not available')
# do something else ...
看来你不能子类化
threading.Lock
,所以我不得不创建一个包装类。
from datetime import datetime
import time
from queue import Queue
from threading import Thread
def _log(msg : str):
print(f"{datetime.utcnow()} {msg}")
import threading
from contextlib import contextmanager
from typing import TypeVar
class TimeoutLock(object):
def __init__(self, timeout_sec = -1):
self._lock = threading.Lock()
self.timeout_sec = timeout_sec
@contextmanager
def acquire_timeout(self):
result = self._lock.acquire(timeout=self.timeout_sec)
yield result
if result:
self._lock.release()
def producer(name, q, delay_sec):
try:
i : int = 0
while True:
q.put(i)
_log(f"{name} {i}")
time.sleep(delay_sec)
i = i + 1
except Exception as e:
err_msg = f"{name} error: {str(e)}"
_log(err_msg)
raise
def consumer(name, q, lock, delay_sec):
while True:
with lock.acquire_timeout() as acquired:
if acquired:
i = q.get()
_log(f'{name} {i}')
time.sleep(delay_sec)
else:
_log(f"{name} wait timeout'ed")
try:
q = Queue()
lock = TimeoutLock(timeout_sec=3)
consumer1_thread = Thread(target = consumer, args =('consumer1', q, lock, 5 ))
consumer2_thread = Thread(target = consumer, args =('consumer2', q, lock, 5 ))
producer1_thread = Thread(target = producer, args =('producer1', q, 1 ))
producer1_thread.start()
consumer1_thread.start()
time.sleep(5)
consumer2_thread.start()
q.join()
except Exception as e:
err_msg = f"main thread error: {str(e)}"
_log(err_msg)
finally:
_log(f'main thread done!')
from threading import Lock # Nb. would also work with RLock
class TimeoutLock():
timeout = None
lock = None
# Semi-transparent __init__ method
def __init__(self, timeout=None, *args, **kwargs):
self.timeout = timeout
self.lock = Lock(*args, **kwargs)
# Context management protocol __enter__ method
def __enter__(self, *args, **kwargs):
rc = self.lock.acquire(timeout=self.timeout)
if rc is False:
raise TimeoutError(f"Could not acquire lock within "
f"specified timeout of {self.timeout}s")
return rc
# "Transparent" Context management protocol __exit__ method
def __exit__(self, *args, **kwargs):
return self.lock.release(*args, **kwargs)
# Might be more correct to call self.lock.__exit__() here?
# Transparent method calls for rest of Lock's public methods:
def acquire(self, *args, **kwargs):
return self.lock.acquire(*args, **kwargs)
def release(self, *args, **kwargs):
return self.lock.release(*args, **kwargs)
def locked(self, *args, **kwargs):
return self.lock.locked(*args, **kwargs)
如果在指定的超时时间内无法获取锁,上面的示例将引发 TimeoutError 异常。
要使用超时为 1.0 秒的方法:
my_lock = TimeoutLock(timeout=1.0)
with my_lock:
# Do some code here. If the lock cannot be acquired within
# timeout, TimeoutError exception will be raised and the
# code in this block will not get executed.
pass