我有一个小项目(请记住我只是一个Python初学者)。 该项目由几个较小的 .py 文件组成。首先是
main.py
,看起来像这样:
from Controller import Controller
import config as cfg
if __name__ == "__main__":
for path in cfg.paths.values():
if not os.path.exists(path):
os.system(f"mkdir {path} -p")
Con = Controller()
Con.start()
所以这个程序只是创建一些目录,创建控制器对象并运行它的方法。
Controller.py
看起来像这样:
import multiprocessing
import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
import concurrent.futures
from AM import AM
import config as cfg
m = multiprocessing.Manager()
q = m.Queue()
class Handler(watchdog.events.PatternMatchingEventHandler):
def __init__(self):
# Set the patterns for PatternMatchingEventHandler
watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
ignore_directories=True, case_sensitive=False)
def on_created(self, event):
logging.info("AM Watchdog received created event - % s." % event.src_path)
q.put(event.src_path)
def on_moved(self, event):
logging.info("AM Watchdog received modified event - % s." % event.src_path)
q.put(event.src_path)
class Controller:
def __init__(self):
pass
def _start_func(self, newFname):
try:
res = AM(f"{newFname}").start()
return res
except:
return 1
def start(self):
event_handler = Handler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
observer.start()
try:
while True:
time.sleep(1)
with concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers) as executor:
futures = {}
while not q.empty():
newFname = q.get()
futures_to_work = executor.submit(self._start_func, newFname)
futures[futures_to_work] = newFname
for future in concurrent.futures.as_completed(futures):
name = futures.pop(future)
print(f"{name} completed")
except KeyboardInterrupt:
observer.stop()
observer.join()
这个程序比上一个程序更复杂(而且可能有一些问题)。它的目的是观察一个目录(
cfg.paths["ipath"]
)并等待TEST*
文件出现。当它完成时,它的名称将被添加到队列中。当队列不为空时,会创建来自 concurrent.futures.ThreadPoolExecutor
的新 future,名称将传递给 _start_func
方法。此方法从 AM.py 创建一个新对象并运行它。其背后的思考过程是,我想要一个程序等待 TEST*
文件出现,然后对其进行一些操作,同时能够同时处理多个文件并按照它们出现的顺序对其进行处理。
AM.py
看起来像这样:
import subprocess
class AM():
def __init__(self, fname):
pass
def test_func(self, fname):
c = f"some_faulty_unix_program {fname}".split(" ")
p = subprocess.run(c, capture_output=True, text = True)
out, err = p.stdout, p.stderr
if out:
print(out)
if err:
print(err)
return 1
return 0
def start(self, fname):
res = self.test_func(fname)
return res
该程序正在新进程中运行一些unix程序(在
Controller.py
中检测到的文件上)。该程序经常会产生错误(由于 TEST*
文件并不总是有效)。我认为这个程序是什么并不重要,但以防万一这个程序是来自 solve-field
的 astrometry.net
和 TEST*
文件是天空的图像。
整个项目作为服务运行,如下所示:
[Unit]
Description = test astrometry service
After = network.target
[Service]
Type = simple
ExecStart = /bin/bash -c "/home/project/main.py"
Restart = always
RestartSec = 2
TimeoutStartSec = infinity
User = root
Group = users
PrivateTmp = true
Environment = "PATH=/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/project"
[Install]
WantedBy = multi-user.target
当我启用此服务并使用
systemctl status my_project.service
检查它时,大约需要76.0M内存。我倾向于让这个工作整个晚上(我有一个每分钟拍摄一张夜空照片的系统,这个项目是为了计算这张夜空照片的天体测量)。第二天早上,当我使用 systemctl status
进行测试时,如果没有错误,内存约为 200-300M;如果出现问题,内存约为 3.5G(例如,我移动了此 UNIX 程序使用的配置文件,因此它会产生错误)在开始时)。为什么记忆力会这样增加?是我的代码有问题导致的,还是这个unix程序有问题?
我不清楚内存泄漏发生在哪里。如果它在
AM.test_func
中运行的“some_faulty_unix_program”中,那么您需要找到或创建它的替代品。但我相信可以对代码进行一些简化/优化,以减少在其他地方发生内存泄漏的可能性。
首先,我认为您不需要一遍又一遍地重新创建多线程池。似乎
watchdog
使用多线程,因此您可以 使用更高效的 queue.Queue
实例而不是托管队列。但最终我认为,通过对Controller.py 代码进行一些重构,以便您的处理程序将任务提交到多线程池,您可以完全消除显式队列。下面的工作可以吗?
import concurrent.futures
from threading import Event
import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
from AM import AM
import config as cfg
class Handler(watchdog.events.PatternMatchingEventHandler):
def __init__(self):
# Create multithreading pool just once:
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers)
# Set the patterns for PatternMatchingEventHandler
watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
ignore_directories=True, case_sensitive=False)
def on_created(self, event):
logging.info("AM Watchdog received created event - %s.", event.src_path)
self._run_start_func(event.src_path)
def on_moved(self, event):
logging.info("AM Watchdog received modified event - %s.", event.src_path)
self._run_start_func(event.src_path)
def _run_start_func(self, newFname):
future = self._executor.submit(self._start_func, newFname)
future.result() # Wait for completion
print(f"{newFname} completed")
def _start_func(self, newFname):
try:
res = AM(newFname).start()
return res
except:
return 1
class Controller:
def __init__(self):
pass
def start(self):
event_handler = Handler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
observer.start()
event = Event()
try:
# Block until keyboard interrupt:
event.wait()
except KeyboardInterrupt:
observer.stop()
observer.join()