我是 Python 中的多处理/线程和异步的新手,我想并行两个函数调用,以便第一个函数在 5 分钟内无限循环中更新运行状况检查文本文件。直到第二个功能完成为止。之后第一个函数调用的循环应该停止。
这是一个尝试,但不起作用:
import multiprocessing
import time
done_event = multiprocessing.Event()
# Function to update the healthcheck text file
def update_healthcheck():
interval = 5 * 60 # 5 minutes interval in seconds
while not done_event.is_set():
with open("healthcheck.txt", "w") as f:
f.write("Health is okay.")
time.sleep(interval)
# Function that simulates the second task
def second_task():
time.sleep(20) # Simulating some work
done_event.set() # Set the event to signal the first function to stop
if __name__ == "__main__":
# Start the first function in a separate process
healthcheck_process = multiprocessing.Process(target=update_healthcheck)
healthcheck_process.start()
# Start the second function in the main process
second_task()
# Wait for the healthcheck process to finish
healthcheck_process.join()
print("Both tasks completed.")
该代码片段的正确且更好的实现是什么?
谢谢!
是的 - 就是这个想法 -
只需将“多处理”改为“线程”即可。您没有详细说明那里“不起作用”的内容 - 但这是因为以这种方式编写的代码在子流程中创建了一个独立的
Event
实例 - 很可能只是将 done_event
作为参数传递给 update_healthcheck
就可以了。
但是在这种情况下没有理由使用多重处理 - 它会在共享对象上带来一些边缘情况,就像您遇到的那样。另一方面,如果您的主任务有可能失败并且程序停止运行,“健康检查”子进程将随之消失:如果需要进程独立性,您应该将健康检查保留在主进程中,并且将主要任务委托给子流程,并可能向其传递一个
multiprocessing.Queue
,以便它可以发布通知主流程它仍然存在。
否则,如果这样的检查是多余的,并且您只想在任务完成时进行注释,那么只要将其更改为使用线程,您编写的代码就可以工作:
import threading
import time, datetime
done_event = threading.Event()
# Function to update the healthcheck text file
def update_healthcheck():
interval = 5 * 60 # 5 minutes interval in seconds
while not done_event.is_set():
# append timestamped logs:
with open("healthcheck.txt", "at") as f:
f.write(f"{datetime.datetime.now()} - Health is okay. ")
time.sleep(interval)
# Function that simulates the second task
def second_task():
time.sleep(20) # Simulating some work
done_event.set() # Set the event to signal the first function to stop
def main():
# bring code inside functions for better
# readability/maintaninability
# Start the first function in a separate process
healthcheck_process = threading.Thread(target=update_healthcheck)
healthcheck_process.start()
# Start the second function in the main thread
second_task()
# Wait for the healthcheck process to finish
healthcheck_process.join()
print("Both tasks completed.")
if __name__ == "__main__":
# the guarded code should ideally be short -
# by moving the logic to a function, we need a single line:
main()