Python 使用 QueueHandler 记录到进程工作人员中 ProcessPoolExecutor 上的单个文件

问题描述 投票:0回答:1

我想将日志添加到来自不同进程的单个文件,其中包含 ProcessPoolExectuor。

我的应用程序具有以下结构:

  • 主要流程
  • 具有进程池的辅助进程:ProcessPoolExecutor

辅助进程和进程池都始终保持打开状态,直到应用程序关闭。

下面的示例是我的应用程序中的架构示例。我正在尝试设置一个多处理队列来处理来自 Process 和 PoolProcessExecutor 的所有日志,以便对文件的访问是安全的。

我尝试使用python文档中的示例,并添加了PoolProcesExecutor,但在下面的示例中,来自主进程的日志未存储在文件中。我错过了什么?

import logging
import logging.handlers
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import sys
import traceback
from random import choice

# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]

NUMBER_OF_WORKERS = 2
NUMBER_OF_POOL_WORKERS = 1
NUMBER_OF_MESSAGES = 1

LOG_SIZE = 1024  # 10 KB
BACKUP_COUNT = 10
LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
LOG_NAME = 'mptest.log'


def configure_logging_format():
    """
    Configure the listener process logging.
    """
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
    f = logging.Formatter(LOG_FORMAT)
    h.setLevel(logging.DEBUG)
    h.setFormatter(f)
    root.addHandler(h)


def main_process_listener(queue: multiprocessing.Queue):
    """
    This is the listener process top-level loop: wait for logging events
    (LogRecords)on the queue and handle them, quit when you get a None for a
    LogRecord.

    Parameters
    ----------
    queue: Queue
        Queue to get the log records from.
    """
    print('Trigger main listener.')
    configure_logging_format()
    while True:
        try:
            record = queue.get()
            if record is None:  # sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            traceback.print_exc(file=sys.stderr)


def broadcast_logs_from_pool_to_main_listener(pool_process_queue, main_process_queue):
    """
    This is the listener process top-level loop: wait for logging events from the pool process
    and broadcast them to the main listener process.

    pool_process_queue: Queue
        Pool process queue to get the log records from.
    main_process_queue: Queue
        Main process queue to put the log records to.
    """
    print('Broadcasting logs from pool to main listener.')
    # configure_logging_format()
    while True:
        try:
            record = pool_process_queue.get()
            if record is None:  # sentinel to tell the listener to quit.
                break
            # TODO: apply level of filtering
            main_process_queue.put(record)
        except Exception:
            traceback.print_exc(file=sys.stderr)


def configure_logging_for_multiprocessing(queue):
    """
    The worker configuration is done at the start of the worker process run.
    Note that on Windows you can't rely on fork semantics, so each process
    will run the logging configuration code when it starts.
    """
    print('Configuring logging for multiprocessing...')
    h = logging.handlers.QueueHandler(queue)  # Handler needed to send records to queue
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)


def pool_process(queue):
    configure_logging_for_multiprocessing(queue)

    name = multiprocessing.current_process().name
    print('Pool process started: %s' % name)
    logging.getLogger(name).log(choice(LEVELS), 'message')
    print('Pool process finished: %s' % name)


def worker_process(queue):
    """
    Worker process that logs messages to the queue.

    Parameters
    ----------
    queue: Queue
        Queue to log the messages to.
    """
    configure_logging_for_multiprocessing(queue)

    pool_queue = multiprocessing.Manager().Queue(-1)
    lp = multiprocessing.Process(target=broadcast_logs_from_pool_to_main_listener, args=(pool_queue, queue))
    lp.start()

    # Create ProcessPoolExecutor
    executor = ProcessPoolExecutor(max_workers=NUMBER_OF_POOL_WORKERS)
    for i in range(NUMBER_OF_POOL_WORKERS):
        executor.submit(pool_process, pool_queue)

    # Send message
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    logging.getLogger(name).log(choice(LEVELS), 'message')
    print('Worker finished: %s' % name)

    # Shutdown the executor and the listener
    executor.shutdown()
    pool_queue.put_nowait(None)


if __name__ == '__main__':
    main_logging_queue = multiprocessing.Manager().Queue()

    # Start the listener process
    lp = multiprocessing.Process(target=main_process_listener, args=(main_logging_queue,))
    lp.start()

    logging.getLogger('main_1').log(choice(LEVELS), 'main process 1')

    # Start the worker processes
    workers = []
    for i in range(NUMBER_OF_WORKERS):
        worker = multiprocessing.Process(target=worker_process, args=(main_logging_queue,))
        workers.append(worker)
        worker.start()

    # Log a message from the main process
    logging.getLogger('main_2').log(choice(LEVELS), 'main process 1')

    # Wait for all of the workers to finish
    for w in workers:
        w.join()

    main_logging_queue.put_nowait(None)
    lp.join()
python multiprocessing python-multiprocessing
1个回答
0
投票

我借此机会简化了您的示例,但这就是我认为您想要的?我认为一些不相关的事情可能会在将来引起问题(嵌套进程创建很难确保在故障情况下进行正确的清理),并且我肯定会推荐 YT 上 mCoding 的关于现代日志记录的教程。它有很多很棒的信息。

import logging
import logging.handlers
import multiprocessing
import threading
from concurrent.futures import ProcessPoolExecutor

LOG_SIZE = 2**16  # 64 KiB
BACKUP_COUNT = 10
LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
LOG_NAME = 'mptest.log'

def init_file_logger():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
    f = logging.Formatter(LOG_FORMAT)
    h.setLevel(logging.DEBUG)
    h.setFormatter(f)
    root.addHandler(h)
    root.setLevel(logging.DEBUG)

def init_q_logger(log_q):  # send log records to the log_q_reader thread
    root = logging.getLogger()
    h = logging.handlers.QueueHandler(log_q)
    root.addHandler(h)
    root.setLevel(logging.DEBUG)

#use a thread to recieve logs so it can share main logging config
def log_q_reader(log_q: multiprocessing.Queue):
    root = logging.getLogger()
    for record in iter(log_q.get, None):
        root.handle(record)

def pool_task(*args):
    logging.debug(f"pool task({args})")

def process_task(log_q, *args):
    init_q_logger(log_q)  # init logging queue handler for child process
    logging.debug(f"process task({args})")

    #Use pool initializer function to init queue handler for grandchild processes
    #  Also grandchild process in general is in my opinion code smell. From the zen
    #  of python: "Flat is better than nested."
    with ProcessPoolExecutor(initializer=init_q_logger, initargs=(log_q,)) as pool:
        futures = []
        for x in [(1,2),(3,4),(5,6)]:
            futures.append(pool.submit(pool_task, x))
        for f in futures: #wait on all results before exiting context manager
            f.result()

if __name__ == "__main__":
    init_file_logger()  # could inline this
    log_q = multiprocessing.Queue()
    log_thread = threading.Thread(target=log_q_reader, args=(log_q,))
    log_thread.start()
    logging.debug("before process1")
    p = multiprocessing.Process(target=process_task, args=(log_q, "a", "b", "c"))
    p.start()
    p.join()
    logging.debug("before process2")
    p = multiprocessing.Process(target=process_task, args=(log_q, "x", "y", "z"))
    p.start()
    p.join()
    logging.debug("end")
    log_q.put(None)
    log_q.close()
    log_thread.join()
© www.soinside.com 2019 - 2024. All rights reserved.