我一直在尝试使用并发期货,但是由于它是一个非常新鲜的Python库,因此其文档并不是最好的。
我正在使用一个称为广义随机Petri网的数学模型,但为简化我的问题,它们很像图。在每个节点上,应该有一个函数可以执行并代表我们当前在图形上的位置,我们有标记。我将在下面提供一张图片,以便您了解模型的外观:simple example。黑色小点是标记,而矩形称为过渡,但它们与我的问题无关。总之,这就是我要在项目中执行的操作:
我想发生的是,当线程完成任务时,我希望它执行代码状态为“已占用”的部分,但是,这仅在所有线程完成其任务时才会发生,这实际上意味着并发不是100%起作用。
在显示我的代码之前,我只想解释一些变量:
代码如下:
'''
def decide_function_to_execute(self):
counter = 1
while counter != 3:
with ThreadPoolExecutor(max_workers=self.__number_of_tokens) as executor:
number_tokens = self.__gspn.get_number_of_tokens()
for thread_number in range(number_tokens):
if self.__token_states[thread_number] == 'Free':
place = self.look_for_token(thread_number + 1)
splitted_path = self.__place_to_function_mapping[place].split(".")
# On the first case we have path = FILE.FUNCTION
if len(splitted_path) <= 2:
function_location = splitted_path[0]
function_name = splitted_path[1]
module_to_exec = __import__(function_location)
function_to_exec = getattr(module_to_exec, function_name)
# On the second case we have path = FOLDER. ... . FILE.FUNCTION
else:
new_path = splitted_path[0]
for element in splitted_path[1:]:
if element != splitted_path[-1]:
new_path = new_path + "." + element
function_location = new_path
function_name = splitted_path[-1]
module_to_exec = __import__(function_location, fromlist=[function_name])
function_to_exec = getattr(module_to_exec, function_name)
self.__token_states[thread_number] = 'Occupied'
self.__futures[thread_number] = executor.submit(function_to_exec, thread_number)
if self.__token_states[thread_number] == 'Occupied' and self.__futures[thread_number].done():
print("I am occupied and finished the task", thread_number + 1)
self.__token_states[thread_number] = 'Done'
continue
if self.__token_states[thread_number] == 'Done':
print("I am done", thread_number + 1)
self.apply_policy()
self.__token_states[thread_number] = 'Free'
counter = counter + 1
'''
您可以忽略从“ place = self.look_for_token(thread_number + 1)”开始直到“ function_to_exec = getattr(module_to_exec,function_name)”的代码行,因为这与我需要的项目的另一部分有关以便能够在Python文件中查找函数。
为什么线程只有在全部完成后才执行“如果self .__ token_states [thread_number] =='被占用'和self .__ futures [thread_number] .done():“?
我已经在这个问题上停留了两个星期,我无法取得进展。谁能帮忙?
谢谢
简化的功能如下。 (这是伪python,因此并非每个函数都存在)
def decide_function_to_execute(self):
counter = 1
while counter < 3:
with ThreadPoolExecutor(...) as executor:
for thread in pool:
if not thread.started():
thread.start()
if thread.done():
do_something()
counter += 1
第一次访问每个线程后,您将退出ThreadPoolExecutor上下文。当代码离开执行程序上下文时,将隐式调用executor.shutdown()。连接每个线程(请参见documentation)。然后,您增加计数器并再次输入执行程序上下文。因为每个线程都已经加入,所以thread.done()对于每个线程都是True。然后每个线程都执行do_something()。