如何配置 Ray 使用标准 Python 记录器进行多重处理?

问题描述 投票:0回答:2

我正在尝试使用 Ray 来提高进程的速度,并且我希望将日志消息传递到标准 Python 记录器。这样,应用程序就可以处理日志消息的格式化、过滤和保存。但是,当我使用 Ray 时,日志消息不会根据我的记录器配置进行格式化,也不会传回根记录器。我尝试在

log_to_driver=True
中设置
configure_logging=True
ray.init()
,但没有解决问题。如何配置 Ray 使用标准 Python 记录器进行多处理?”

这是一个应该演示该问题的示例:

from ray.util.multiprocessing import Pool
import pathlib
import logging
import json

def setup_logging(config_file: pathlib.Path):
    with open(config_file) as f_in:
        config = json.load(f_in)
    logging.config.dictConfig(config)

logger = logging.getLogger(__name__)
config_file = pathlib.Path(__file__).parent / "log_setup/config_logging.json"
setup_logging(config_file=config_file)

def f(index):
    logger.warning(f"index: {index}")
    return (index, "model")

if __name__ == "__main__":
    logger.warning("Starting")
    pool = Pool(1)
    results = pool.map(f, range(10))
    print(list(results))

我的记录器配置为:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
      "detailed": {
        "format": "[%(levelname)s|%(name)s|%(module)s|L%(lineno)d] %(asctime)s: %(message)s",
        "datefmt": "%Y-%m-%dT%H:%M:%S%z"
      }
    },
    "handlers": {
      "stdout": {
        "class": "logging.StreamHandler",
        "level": "INFO",
        "formatter": "detailed"
      }
    },
    "loggers": {
      "root": {
        "level": "DEBUG",
        "handlers": [
          "stdout"
        ]
      }
    }
  }

如果我只使用 python 地图,我会得到以下打印结果:

[WARNING|__main__|ray_trial|L28] 2024-03-27T15:14:21+0100: Starting
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 0
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 1
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 2
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 3
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 4
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 5
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 6
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 7
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 8
[WARNING|__main__|ray_trial|L22] 2024-03-27T15:14:21+0100: index: 9

但是当我使用 Ray 时我得到:

2024-03-27 14:54:55,064 INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(PoolActor pid=43261) index: 0
(PoolActor pid=43261) index: 1
(PoolActor pid=43261) index: 2
(PoolActor pid=43261) index: 3
(PoolActor pid=43261) index: 4
(PoolActor pid=43261) index: 5
(PoolActor pid=43261) index: 6
(PoolActor pid=43261) index: 7
(PoolActor pid=43261) index: 8
(PoolActor pid=43261) index: 9
python logging multiprocessing ray
2个回答
0
投票

由于

ray
Pool
是 python
Pool
的替代品,它提供了相同的签名并允许使用
initializer
请参阅此处的文档)。

“每个工作进程在启动时都会调用initializer(*initargs)。”

因此您可以使用配置路径调用

setup_logging
函数,如下所示:

def f(index):
    logger.warning(f"index: {index}")
    return (index, "model")


if __name__ == "__main__":
    logger.warning("Starting")
    pool = Pool(1, initializer=setup_logging, initargs=(config_file,))
    results = pool.map(f, range(10))

您可以打印出

print(f"id:{id(logger)} | name: {logger.name}")
来确认,即使记录器共享相同的名称,它们也不是同一个对象。


0
投票

Ray 覆盖

logger.makeRecord
中的
cli_logger.py
方法,如 Line#146 所示。

由于每个进程的

logger
是不同的对象,因此您必须重新加载配置:

def f(index):
    setup_logging(config_file=config_file)
    logger = logging.getLogger(__name__)
    logger.warning(f"index: {index},{__name__}")
    return (index, "model")

或者您可以转向基于队列的方法,在循环中显式运行每个进程(与池不同),然后让线程处理日志记录。示例代码:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import threading
import pathlib
import json

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q,i):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.WARNING)
    root.addHandler(qh)
    
    logger = logging.getLogger(__name__)
    logger.log(logging.WARNING, f'Message no. {i}')

config_file = pathlib.Path(__file__).parent / "log_setup/config_logging.json"
def setup_logging(config_file: pathlib.Path):
    with open(config_file) as f_in:
        config = json.load(f_in)
    return config

if __name__ == '__main__':
    q = Queue()
    d = setup_logging(config_file=config_file)
    workers = []

    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,i))
        workers.append(wp)
        wp.start()
    
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()
© www.soinside.com 2019 - 2024. All rights reserved.