我正在尝试使用 Ray 来提高进程的速度,并且我希望将日志消息传递到标准 Python 记录器。这样,应用程序就可以处理日志消息的格式化、过滤和保存。但是,当我使用 Ray 时,日志消息不会根据我的记录器配置进行格式化,也不会传回根记录器。我尝试在
log_to_driver=True
中设置 configure_logging=True
和 ray.init()
,但没有解决问题。如何配置 Ray 使用标准 Python 记录器进行多处理?”
这是一个应该演示该问题的示例:
from ray.util.multiprocessing import Pool
import pathlib
import logging
import json
def setup_logging(config_file: pathlib.Path):
with open(config_file) as f_in:
config = json.load(f_in)
logging.config.dictConfig(config)
logger = logging.getLogger(__name__)
config_file = pathlib.Path(__file__).parent / "log_setup/config_logging.json"
setup_logging(config_file=config_file)
def f(index):
logger.warning(f"index: {index}")
return (index, "model")
if __name__ == "__main__":
logger.warning("Starting")
pool = Pool(1)
results = pool.map(f, range(10))
print(list(results))
我的记录器配置为:
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"detailed": {
"format": "[%(levelname)s|%(name)s|%(module)s|L%(lineno)d] %(asctime)s: %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S%z"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "detailed"
}
},
"loggers": {
"root": {
"level": "DEBUG",
"handlers": [
"stdout"
]
}
}
}
如果我只使用 python 地图,我会得到以下打印结果:
[WARNING|__main__|ray_trial|L28] 2024-03-27T15:14:21+0100: Starting
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 0
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 1
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 2
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 3
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 4
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 5
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 6
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 7
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 8
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 9
但是当我使用 Ray 时我得到:
2024-03-27 14:54:55,064 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
(PoolActor pid=43261) index: 0
(PoolActor pid=43261) index: 1
(PoolActor pid=43261) index: 2
(PoolActor pid=43261) index: 3
(PoolActor pid=43261) index: 4
(PoolActor pid=43261) index: 5
(PoolActor pid=43261) index: 6
(PoolActor pid=43261) index: 7
(PoolActor pid=43261) index: 8
(PoolActor pid=43261) index: 9
由于
ray
的 Pool
是 python Pool
的替代品,它提供了相同的签名并允许使用 initializer
(请参阅此处的文档)。
“每个工作进程在启动时都会调用initializer(*initargs)。”
因此您可以使用配置路径调用
setup_logging
函数,如下所示:
def f(index):
logger.warning(f"index: {index}")
return (index, "model")
if __name__ == "__main__":
logger.warning("Starting")
pool = Pool(1, initializer=setup_logging, initargs=(config_file,))
results = pool.map(f, range(10))
您可以打印出
print(f"id:{id(logger)} | name: {logger.name}")
来确认,即使记录器共享相同的名称,它们也不是同一个对象。
Ray 覆盖
logger.makeRecord
中的 cli_logger.py
方法,如 Line#146 所示。
由于每个进程的
logger
是不同的对象,因此您必须重新加载配置:
def f(index):
setup_logging(config_file=config_file)
logger = logging.getLogger(__name__)
logger.warning(f"index: {index},{__name__}")
return (index, "model")
或者您可以转向基于队列的方法,在循环中显式运行每个进程(与池不同),然后让线程处理日志记录。示例代码:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import threading
import pathlib
import json
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q,i):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.WARNING)
root.addHandler(qh)
logger = logging.getLogger(__name__)
logger.log(logging.WARNING, f'Message no. {i}')
config_file = pathlib.Path(__file__).parent / "log_setup/config_logging.json"
def setup_logging(config_file: pathlib.Path):
with open(config_file) as f_in:
config = json.load(f_in)
return config
if __name__ == '__main__':
q = Queue()
d = setup_logging(config_file=config_file)
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,i))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()