下面的代码是我根据我正在处理的代码建模的,并为了解决这个问题而进行了简化,它似乎没有正确使用 p.join() 来管理 p.start( 创建的进程) )导致内存泄漏,运行该代码的K8S pod资源耗尽。我有明显遗漏的东西吗?
import multiprocessing
import random
import time
def process_C(semaphore):
try:
sleep_time = random.random() * 6
print("Process C sleeping for {:.2f} seconds".format(sleep_time))
if sleep_time > 4.2:
raise Exception("Random exception")
time.sleep(sleep_time)
print("Process C completed")
semaphore.release()
except Exception as e:
print("Process C failed: {}".format(e))
semaphore.release()
def process_B(semaphore):
p = multiprocessing.Process(target=process_C, args=(semaphore,))
semaphore.acquire()
p.start()
p.join(timeout=0) # Set timeout to 0 to prevent blocking
print("Process B completed")
def process_A(semaphore):
process_B(semaphore)
print("Process A completed")
if __name__ == "__main__":
semaphore = multiprocessing.Semaphore(20)
while True:
process_A(semaphore)
我尝试检查进程是否还活着并一一终止它们,但没有成功。
主要问题是
p.join(timeout=0)
不会等待该过程完成。
它立即返回并进入下一个循环迭代,导致僵尸进程并导致内存泄漏。 要检查进程是否处于活动状态并终止它们(如果是),您应该跟踪每个子进程。 您可以将每个子进程附加到一个列表中,检查每个子进程是否还活着,然后终止它们。
def process_B(semaphore, list_of_child_processes):
p = multiprocessing.Process(target=process_C, args=(semaphore,))
semaphore.acquire()
p.start()
list_of_child_processes.append(p) # Keep track of child processes
print("Process B completed")
def process_A(semaphore, list_of_child_processes):
process_B(semaphore, list_of_child_processes)
def terminate_processes(list_of_child_processes):
"""
Call this function to terminate a process if it's alive
"""
for p in list_of_child_processes:
if p.is_alive():
print("Terminating process:", p.pid)
p.terminate()
然后
if __name__ == "__main__":
semaphore = multiprocessing.Semaphore(20)
list_of_child_processes = []
while True:
process_A(semaphore, list_of_child_processes)
根据您在 Pod 上运行的应用程序,您可以决定如何调用
terminate_processes
函数。