在一个方法中等待,直到某些方法在另一个线程中完成

问题描述 投票:0回答:1

我需要在一个方法中实现等待,直到不同线程中的其他一些方法完成。

这是我的实现(未按预期工作):

import queue
from concurrent.futures import ThreadPoolExecutor
import threading
import time
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(message)s",
)


class Foo(object):

    def func_1(self):
        log = logging.getLogger(__name__)
        log.info("start func_1")
        time.sleep(2)
        log.info("end func_1")

    def func_2(self):
        log = logging.getLogger(__name__)
        log.info("start func_2")
        time.sleep(2)
        log.info("end func_2")

    def func_3(self):
        log = logging.getLogger(__name__)
        log.info("start func_3")
        time.sleep(2)
        log.info("end func_3")

    def func_4_free(self):
        log = logging.getLogger(__name__)
        log.info("start func_4_free")
        time.sleep(2)
        log.info("end func_4_free")

    def do(self):
        log = logging.getLogger(__name__)
        log.info("start do")
        time.sleep(1)
        log.info("This is do")
        time.sleep(1)
        log.info("end do")

class Bar(Foo):

    cv_from_do = threading.Event()

    FUNC_SYNCHRONISE = {"do": ["func_1", "func_2", "func_3", "do"]}

    def __init__(self):
        super().__init__()
        self.funcs = queue.Queue()
        self.cv_from_do.set()

    def __getattribute__(self, name):
        attribute = super().__getattribute__(name)
        if name in ["cv_from_do", "funcs", "FUNC_SYNCHRONISE"]:
            return attribute
        if name in self.FUNC_SYNCHRONISE.keys():
            def wrap(*args, **kwargs):
                log = logging.getLogger(__name__)
                log.info(f"start {name}. wait till other functions finish...")
                try:
                    self.cv_from_do.wait()
                    self.cv_from_do.clear()  # force to wait other functions
                    while not self.funcs.empty():
                        pass
                    self.funcs.put(name)
                    attribute(*args, **kwargs)
                    self.funcs.get(name)
                    self.cv_from_do.set()
                except Exception as ex:
                    log.exception(ex)
            return wrap
        elif name in self.FUNC_SYNCHRONISE["do"]:
            def wrap(*args, **kwargs):
                log = logging.getLogger(__name__)
                try:
                    self.cv_from_do.wait()
                    self.funcs.put(name)
                    attribute(*args, **kwargs)
                    self.funcs.get(name)
                except Exception as ex:
                    log.exception(ex)
            return wrap
        else:
            return attribute


with ThreadPoolExecutor(max_workers=None) as executor:
    obj = Bar()
    # function of 'obj' can be called any times, in any threads, in any order
    # following is just for test
    for f in [obj.func_1, obj.func_3, obj.func_4_free, obj.do, obj.func_2, obj.func_1, obj.do, obj.func_4_free, obj.do, obj.func_3, obj.func_2, obj.func_1]:
        future = executor.submit(
            f,
            **{}
        )

必须保留遗产

Bar(Foo)
Foo
- 不得修改

期望是:

  1. obj = Bar()
    的函数可以在线程中任意数量、任意时间调用。
  2. 当任何函数在线程中启动时,它们就会工作 独立并联。
  3. 一旦
    do
    在线程中启动 - 它会等待其他函数(包括其他
    do
    ,如果它们存在于其他线程中)完成,然后其他函数等待,直到当前
    do
    完成。当前
    do
    完成后 -> 其他功能可以继续。
  4. Foo
    中的
    FUNC_SYNCHRONISE
    功能需要同步。
    Foo
    的其他功能可以独立工作。

我得到以下输出:

start func_1
start func_3
start func_4_free
start do. wait till other functions finish...
start do. wait till other functions finish...
start func_4_free
start do. wait till other functions finish...
end func_1
end func_3
end func_4_free
start do              <<< - from here
end func_4_free
This is do
end do                <<< - to here is correct. 'do' behaves atomic
start do              <<< - from here
start func_1          <<< - to here is NOT correct, as 'do' has to be finished before other methods sart
start func_3
start func_2
start func_2
start func_1
This is do
end func_1
end do
end func_3
end func_2
end func_2
end func_1
start do
This is do
end do

问题是,如果线程中只有一个

do
- 没问题,我的代码可以正常工作。但是,如果我在不同的线程中调用多个
do
- 第一个按预期工作,但下一个
do
会被调用,而无需等待彼此完成。简而言之 -
do
应该以顺序模式调用,切勿同时调用。

如果可以的话请帮忙..

UPD

我正在考虑创建一个

do
队列并允许它们在适当的时候执行..但它不是一个原子(因为我需要检查队列中的元素并且仅在弹出它之后)并且可能会导致我相信并发问题

python multithreading concurrency
1个回答
0
投票

您可以通过使用读写锁来简化问题:

from concurrent.futures import ThreadPoolExecutor
import logging

from rwlock import RWLock # e.g. https://gist.github.com/tylerneylon/a7ff6017b7a1f9a506cf75aa23eacfd6
from so79082898 import Foo

class Bar(Foo):
    FUNC_SYNCHRONISE = {"do": ["func_1", "func_2", "func_3", "do"]}

    def __init__(self):
        super().__init__()
        self._lock = RWLock()

    def __getattribute__(self, name):
        attribute = super().__getattribute__(name)
        if name in ["_lock", "FUNC_SYNCHRONISE"]:
            return attribute
        if name in self.FUNC_SYNCHRONISE.keys():
            def wrap(*args, **kwargs):
                log = logging.getLogger(__name__)
                log.info(f"start {name}. wait till other functions finish...")
                with self._lock.w_locked():
                    attribute(*args, **kwargs)
            return wrap
        elif name in self.FUNC_SYNCHRONISE["do"]:
            def wrap(*args, **kwargs):
                with self._lock.r_locked():
                    attribute(*args, **kwargs)
            return wrap
        else:
            return attribute


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=None) as executor:
        obj = Bar()
        # function of 'obj' can be called any times, in any threads, in any order
        # following is just for test
        for f in [obj.func_1, obj.func_3, obj.func_4_free, obj.do, obj.func_2, obj.func_1, obj.do, obj.func_4_free, obj.do, obj.func_3, obj.func_2, obj.func_1]:
            future = executor.submit(
                f,
                **{}
            )
  • FUNC_SYNCHRONISE
    中有键和值,这意味着您可能希望独立同步不同的函数集,但原始实现或此实现都没有这样做。但有了这个指令会让事情变得更加复杂。
  • 这使用每个实例的锁而不是每个类的锁,不确定这是否重要
  • 可能可以通过在构造时仅包装一次函数来改进,例如https://stackoverflow.com/a/11350487
  • 如果某些方法调用其他方法,则可能需要切换到可重入锁,并且使用 R/W 锁库而不是允许读取到写入的提升可以简化代码(因为所有方法都可以获得读锁,并且只有“do”可以然后促进写入),如果您在
    FUNC_SYNCRONISE
  • 中有不止一组键/值对,则可能需要
© www.soinside.com 2019 - 2024. All rights reserved.