我想将日志添加到来自不同进程的单个文件,其中包含 ProcessPoolExectuor。
我的应用程序具有以下结构:
辅助进程和进程池都始终保持打开状态,直到应用程序关闭。
下面的示例是我的应用程序中的架构示例。我正在尝试设置一个多处理队列来处理来自 Process 和 PoolProcessExecutor 的所有日志,以便对文件的访问是安全的。
我尝试使用python文档中的示例,并添加了PoolProcesExecutor,但在下面的示例中,来自主进程的日志未存储在文件中。我错过了什么?
import logging
import logging.handlers
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import sys
import traceback
from random import choice
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]
NUMBER_OF_WORKERS = 2
NUMBER_OF_POOL_WORKERS = 1
NUMBER_OF_MESSAGES = 1
LOG_SIZE = 1024 # 10 KB
BACKUP_COUNT = 10
LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
LOG_NAME = 'mptest.log'
def configure_logging_format():
"""
Configure the listener process logging.
"""
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
f = logging.Formatter(LOG_FORMAT)
h.setLevel(logging.DEBUG)
h.setFormatter(f)
root.addHandler(h)
def main_process_listener(queue: multiprocessing.Queue):
"""
This is the listener process top-level loop: wait for logging events
(LogRecords)on the queue and handle them, quit when you get a None for a
LogRecord.
Parameters
----------
queue: Queue
Queue to get the log records from.
"""
print('Trigger main listener.')
configure_logging_format()
while True:
try:
record = queue.get()
if record is None: # sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
traceback.print_exc(file=sys.stderr)
def broadcast_logs_from_pool_to_main_listener(pool_process_queue, main_process_queue):
"""
This is the listener process top-level loop: wait for logging events from the pool process
and broadcast them to the main listener process.
pool_process_queue: Queue
Pool process queue to get the log records from.
main_process_queue: Queue
Main process queue to put the log records to.
"""
print('Broadcasting logs from pool to main listener.')
# configure_logging_format()
while True:
try:
record = pool_process_queue.get()
if record is None: # sentinel to tell the listener to quit.
break
# TODO: apply level of filtering
main_process_queue.put(record)
except Exception:
traceback.print_exc(file=sys.stderr)
def configure_logging_for_multiprocessing(queue):
"""
The worker configuration is done at the start of the worker process run.
Note that on Windows you can't rely on fork semantics, so each process
will run the logging configuration code when it starts.
"""
print('Configuring logging for multiprocessing...')
h = logging.handlers.QueueHandler(queue) # Handler needed to send records to queue
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
def pool_process(queue):
configure_logging_for_multiprocessing(queue)
name = multiprocessing.current_process().name
print('Pool process started: %s' % name)
logging.getLogger(name).log(choice(LEVELS), 'message')
print('Pool process finished: %s' % name)
def worker_process(queue):
"""
Worker process that logs messages to the queue.
Parameters
----------
queue: Queue
Queue to log the messages to.
"""
configure_logging_for_multiprocessing(queue)
pool_queue = multiprocessing.Manager().Queue(-1)
lp = multiprocessing.Process(target=broadcast_logs_from_pool_to_main_listener, args=(pool_queue, queue))
lp.start()
# Create ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=NUMBER_OF_POOL_WORKERS)
for i in range(NUMBER_OF_POOL_WORKERS):
executor.submit(pool_process, pool_queue)
# Send message
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
logging.getLogger(name).log(choice(LEVELS), 'message')
print('Worker finished: %s' % name)
# Shutdown the executor and the listener
executor.shutdown()
pool_queue.put_nowait(None)
if __name__ == '__main__':
main_logging_queue = multiprocessing.Manager().Queue()
# Start the listener process
lp = multiprocessing.Process(target=main_process_listener, args=(main_logging_queue,))
lp.start()
logging.getLogger('main_1').log(choice(LEVELS), 'main process 1')
# Start the worker processes
workers = []
for i in range(NUMBER_OF_WORKERS):
worker = multiprocessing.Process(target=worker_process, args=(main_logging_queue,))
workers.append(worker)
worker.start()
# Log a message from the main process
logging.getLogger('main_2').log(choice(LEVELS), 'main process 1')
# Wait for all of the workers to finish
for w in workers:
w.join()
main_logging_queue.put_nowait(None)
lp.join()
我借此机会简化了您的示例,但这就是我认为您想要的? (请阅读代码注释以获取更多信息)我看到一些不相关的事情可能会在将来引起问题(嵌套进程创建很难确保在故障情况下进行正确的清理),我肯定会推荐现代日志记录的教程由 YT 上的 mCoding 提供。它有很多很棒的信息。
import logging
import logging.handlers
import multiprocessing
import threading
from concurrent.futures import ProcessPoolExecutor
LOG_SIZE = 2**16 # 64 KiB
BACKUP_COUNT = 10
LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
LOG_NAME = 'mptest.log'
def init_file_logger():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
f = logging.Formatter(LOG_FORMAT)
h.setLevel(logging.DEBUG)
h.setFormatter(f)
root.addHandler(h)
root.setLevel(logging.DEBUG)
def init_q_logger(log_q): # send log records to the log_q_reader thread
root = logging.getLogger()
h = logging.handlers.QueueHandler(log_q)
root.addHandler(h)
root.setLevel(logging.DEBUG)
#use a thread to recieve logs so it can share main logging config
def log_q_reader(log_q: multiprocessing.Queue):
root = logging.getLogger()
for record in iter(log_q.get, None):
root.handle(record)
def pool_task(*args):
logging.debug(f"pool task({args})") # logs to queuehandler as configured by init_q_logger() inside pool initializer
def process_task(log_q, *args):
init_q_logger(log_q) # init logging queue handler for child process
logging.debug(f"process task({args})") # logs to queuehandler as configured by init_q_logger()
#Use pool initializer function to init queue handler for grandchild processes
# Also grandchild process in general is in my opinion code smell. From the zen
# of python: "Flat is better than nested."
with ProcessPoolExecutor(initializer=init_q_logger, initargs=(log_q,)) as pool:
futures = []
for x in [(1,2),(3,4),(5,6)]:
futures.append(pool.submit(pool_task, x))
for f in futures: #wait on all results before exiting context manager
f.result()
if __name__ == "__main__":
init_file_logger() # could inline this
log_q = multiprocessing.Queue() # Using a real queue rather than a managed proxy can only be passed as pool initargs, not as an argument for task submission.
log_thread = threading.Thread(target=log_q_reader, args=(log_q,))
log_thread.start()
logging.debug("before process1") # logs directly to the rotatingfilehandler as was configured by init_file_logger()
p = multiprocessing.Process(target=process_task, args=(log_q, "a", "b", "c"))
p.start()
p.join()
logging.debug("before process2")
p = multiprocessing.Process(target=process_task, args=(log_q, "x", "y", "z"))
p.start()
p.join()
logging.debug("end")
log_q.put(None)
log_q.close()
log_thread.join()