我需要在运行系统代码时记录大量数据。我可以使用哪些日志记录包来进行高效的异步日志记录? 默认情况下,标准 Python 日志记录包 (https://docs.python.org/2/library/logging.html) 是异步的吗?
您可以使用
nworker 池执行
logging.info()
消息,使用 concurrent.futures.ThreadPoolExecutor,n 应该始终等于一:
import concurrent.futures
import logging
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def info(self, msg, *args):
executor.submit(logging.info, msg, *args)
异步代码可以使用通常的日志记录功能,而无需诉诸特殊的异步模块或包装器。像这样的代码是可能的。
import logging
async def do_some_async_stuff(self):
logging.getLogger(__name__).info("Started doing stuff...")
logging.getLogger(__name__).warn("Things went awry...")
这里关心的是提交日志条目是否会在日志条目写入文件时产生一些延迟,从而剥夺异步系统在失效期间运行其他任务的机会。如果将写入文件的阻塞处理程序直接添加到日志记录层次结构的某处,就会发生这种情况。
标准
logging
模块为此提供了一个简单的解决方案:使用非阻塞处理程序将其消息排队到在其自己的私有线程中运行的所需阻塞处理程序。
撇开纯粹主义,没有硬性规定阻止使用
QueueHandler
提供使用非阻塞日志处理程序记录日志的异步代码,与 QueueListener
. 中托管的阻塞处理程序一起使用
下面的解决方案与调用
logging
记录器并以典型方式提交条目的协程完全兼容 - 不需要调用 .run_in_executor()
的包装器。异步代码不会遇到来自日志系统的任何阻塞行为。
例如可以设置一个
QueueHandler
为root handler
import queue
from logging.handlers import QueueHandler
log_queue = queue.Queue()
queue_handler = QueueHandler(log_queue) # Non-blocking handler.
root = logging.getLogger()
root.addHandler(queue_handler) # Attached to the root logger.
你想要的阻塞处理程序可以放在一个
QueueListener
中:
from logging.handlers import QueueListener
from logging.handlers import RotatingFileHandler
rot_handler = RotatingFileHandler(...) # The blocking handler.
queue_listener = QueueListener(log_queue,
rot_handler) # Sitting comfortably in its
# own thread, isolated from
# async code.
queue_listener.start()
然后使用您需要的任何日志条目格式配置嵌套在侦听器中的处理程序。
我个人喜欢旋转文件处理程序,因为它限制了生成的日志文件的大小和数量,并在创建新备份时删除最旧的文件。
如果对 Python 和 GIL 的单核线程模型有任何真正的担忧,队列可以来自
multiprocessing
模块而不是 queue
模块。与日志记录相关的本地进程中的线程任务切换应该以稍微复杂一些为代价。
import multiprocessing as mp
import logging
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler
def log_receiver_setup(log_queue):
# In a separate process, set up the rotating log file handler that receives
# messages from the queue.
rot_handler = RotatingFileHandler(filename="log.txt", backupCount=3)
q_listener = QueueListener(log_queue, rot_handler)
q_listener.start()
if __name__ == '__main__':
# Set up the logging queue and process that will run the rotating log file
# handler.
log_queue = mp.Queue()
log_proc = mp.Process(target=log_receiver_setup, args=(log_queue,))
log_proc.start()
# On the __main__ process side, configure logging to log to the queue.
queue_handler = QueueHandler(log_queue)
root = logging.getLogger()
root.addHandler(queue_handler)
# Example log messages.
logging.getLogger(__name__).setLevel(logging.DEBUG)
logging.getLogger(__name__).info("Started doing stuff...")
logging.getLogger(__name__).warning("Things went awry...")
# Shut down the logging process.
log_queue.close()
log_proc.join()
最简单的方法是通过 Polog 库。这个库最初是基于线程池异步的,你只需要启用这个特性。
安装它:
$ pip install polog
并使用:
from polog import log, config, file_writer
config.add_handlers(file_writer('file.log')) # Add a file handler.
config.set(pool_size=2) # The size of the thread pool. By default it's 0, that means "not asynchronous".
log('message')
如您所见,很难让它变得更简单。