我需要在一个方法中实现等待,直到不同线程中的其他一些方法完成。
这是我的实现(未按预期工作):
import queue
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import logging
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
)
class Foo(object):
def func_1(self):
log = logging.getLogger(__name__)
log.info("start func_1")
time.sleep(2)
log.info("end func_1")
def func_2(self):
log = logging.getLogger(__name__)
log.info("start func_2")
time.sleep(2)
log.info("end func_2")
def func_3(self):
log = logging.getLogger(__name__)
log.info("start func_3")
time.sleep(2)
log.info("end func_3")
def func_4_free(self):
log = logging.getLogger(__name__)
log.info("start func_4_free")
time.sleep(2)
log.info("end func_4_free")
def do(self):
log = logging.getLogger(__name__)
log.info("start do")
time.sleep(1)
log.info("This is do")
time.sleep(1)
log.info("end do")
class Bar(Foo):
cv_from_do = threading.Event()
FUNC_SYNCHRONISE = {"do": ["func_1", "func_2", "func_3", "do"]}
def __init__(self):
super().__init__()
self.funcs = queue.Queue()
self.cv_from_do.set()
def __getattribute__(self, name):
attribute = super().__getattribute__(name)
if name in ["cv_from_do", "funcs", "FUNC_SYNCHRONISE"]:
return attribute
if name in self.FUNC_SYNCHRONISE.keys():
def wrap(*args, **kwargs):
log = logging.getLogger(__name__)
log.info(f"start {name}. wait till other functions finish...")
try:
self.cv_from_do.wait()
self.cv_from_do.clear() # force to wait other functions
while not self.funcs.empty():
pass
self.funcs.put(name)
attribute(*args, **kwargs)
self.funcs.get(name)
self.cv_from_do.set()
except Exception as ex:
log.exception(ex)
return wrap
elif name in self.FUNC_SYNCHRONISE["do"]:
def wrap(*args, **kwargs):
log = logging.getLogger(__name__)
try:
self.cv_from_do.wait()
self.funcs.put(name)
attribute(*args, **kwargs)
self.funcs.get(name)
except Exception as ex:
log.exception(ex)
return wrap
else:
return attribute
with ThreadPoolExecutor(max_workers=None) as executor:
obj = Bar()
# function of 'obj' can be called any times, in any threads, in any order
# following is just for test
for f in [obj.func_1, obj.func_3, obj.func_4_free, obj.do, obj.func_2, obj.func_1, obj.do, obj.func_4_free, obj.do, obj.func_3, obj.func_2, obj.func_1]:
future = executor.submit(
f,
**{}
)
必须保留遗产
Bar(Foo)
。 Foo
- 不得修改
期望是:
obj = Bar()
的函数可以在线程中任意数量、任意时间调用。do
在线程中启动 - 它会等待其他函数(包括其他 do
,如果它们存在于其他线程中)完成,然后其他函数等待,直到当前 do
完成。当前 do
完成后 -> 其他功能可以继续。Foo
中的FUNC_SYNCHRONISE
功能需要同步。 Foo
的其他功能可以独立工作。我得到以下输出:
start func_1
start func_3
start func_4_free
start do. wait till other functions finish...
start do. wait till other functions finish...
start func_4_free
start do. wait till other functions finish...
end func_1
end func_3
end func_4_free
start do <<< - from here
end func_4_free
This is do
end do <<< - to here is correct. 'do' behaves atomic
start do <<< - from here
start func_1 <<< - to here is NOT correct, as 'do' has to be finished before other methods sart
start func_3
start func_2
start func_2
start func_1
This is do
end func_1
end do
end func_3
end func_2
end func_2
end func_1
start do
This is do
end do
问题是,如果线程中只有一个
do
- 没问题,我的代码可以正常工作。但是,如果我在不同的线程中调用多个 do
- 第一个按预期工作,但下一个 do
会被调用,而无需等待彼此完成。简而言之 - do
应该以顺序模式调用,切勿同时调用。
如果可以的话请帮忙..
UPD
我正在考虑创建一个
do
队列并允许它们在适当的时候执行..但它不是一个原子(因为我需要检查队列中的元素并且仅在弹出它之后)并且可能会导致我相信并发问题
您可以通过使用读写锁来简化问题:
from concurrent.futures import ThreadPoolExecutor
import logging
from rwlock import RWLock # e.g. https://gist.github.com/tylerneylon/a7ff6017b7a1f9a506cf75aa23eacfd6
from so79082898 import Foo
class Bar(Foo):
FUNC_SYNCHRONISE = {"do": ["func_1", "func_2", "func_3", "do"]}
def __init__(self):
super().__init__()
self._lock = RWLock()
def __getattribute__(self, name):
attribute = super().__getattribute__(name)
if name in ["_lock", "FUNC_SYNCHRONISE"]:
return attribute
if name in self.FUNC_SYNCHRONISE.keys():
def wrap(*args, **kwargs):
log = logging.getLogger(__name__)
log.info(f"start {name}. wait till other functions finish...")
with self._lock.w_locked():
attribute(*args, **kwargs)
return wrap
elif name in self.FUNC_SYNCHRONISE["do"]:
def wrap(*args, **kwargs):
with self._lock.r_locked():
attribute(*args, **kwargs)
return wrap
else:
return attribute
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=None) as executor:
obj = Bar()
# function of 'obj' can be called any times, in any threads, in any order
# following is just for test
for f in [obj.func_1, obj.func_3, obj.func_4_free, obj.do, obj.func_2, obj.func_1, obj.do, obj.func_4_free, obj.do, obj.func_3, obj.func_2, obj.func_1]:
future = executor.submit(
f,
**{}
)
FUNC_SYNCHRONISE
中有键和值,这意味着您可能希望独立同步不同的函数集,但原始实现或此实现都没有这样做。但有了这个指令会让事情变得更加复杂。FUNC_SYNCRONISE