Python 多处理日志记录错误 - TypeError:需要一个整数(类型为 NoneType)

问题描述 投票:0回答:3

我一直在开发一个 python3 (3.4.5) 项目,该项目利用

multiprocessing.Pool
通过 4 个工作人员运行大约 50 多个作业。 我有一个带有
logging.handlers.QueueListener
的单独进程设置,因此我可以通过与
Queue
一起使用的
multiprocessing.Manager()
将全局内容记录到单个文件中。 所以基本上流程是这样的

  1. 主程序开始
  2. 通过
    Queue
     创建 
    multiprocessing.Manager()
  3. 使用
    QueueListener
    监听我刚刚为全局日志创建的
    Queue
    启动专用日志记录进程。 (我也尝试过使用主程序之外的线程来实现相同的结果。)
  4. 创建一个
    multiprocessing.Pool
    来处理各个作业,向它们传递之前创建的
    Queue
    以及运行和设置其日志记录所需的配置信息(有一个全局日志,以及每个作业的单独日志,其中包含更精细的信息)。作业以
    map_async
    开始。
  5. 等待所有作业处理完毕,然后执行一些最后步骤和清理。

不过,我在某些作业上不断收到间歇性错误,通常其中 1 个作业有错误(每次都有一个不同的错误),偶尔会有 2 个相同的错误或零个错误。据我所知,导致错误的不是作业中的代码,而是

multiprocessing
logging
设置中的某些内容。 这是我收到的错误的示例:

--- Logging error ---
Traceback (most recent call last):
  File "/usr/lib64/python3.4/logging/handlers.py", line 1347, in emit
    self.enqueue(self.prepare(record))
  File "/usr/lib64/python3.4/logging/handlers.py", line 1313, in enqueue
    self.queue.put_nowait(record)
  File "<string>", line 2, in put_nowait
  File "/usr/lib64/python3.4/multiprocessing/managers.py", line 731, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib64/python3.4/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/lib64/python3.4/multiprocessing/connection.py", line 413, in _send_bytes
    self._send(chunk)
  File "/usr/lib64/python3.4/multiprocessing/connection.py", line 369, in _send
    n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)
Call stack:
  File "./sampling__test__py.py", line 100, in <module>
    run_pool     = multiprocessing.Pool(4)
  File "/usr/lib64/python3.4/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 168, in __init__
    self._repopulate_pool()
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 233, in _repopulate_pool
    w.start()
  File "/usr/lib64/python3.4/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/usr/lib64/python3.4/multiprocessing/context.py", line 267, in _Popen
    return Popen(process_obj)
  File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 21, in __init__
    self._launch(process_obj)
  File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 77, in _launch
    code = process_obj._bootstrap()
  File "/usr/lib64/python3.4/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/usr/lib64/python3.4/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "/home/username/value_check.py", line 338, in value_check
    global_logger.info("SplitTime: {str_timeDelta} -- COMPLETED: {Check_Name} --- Total Txn Count: {var_Total_Txn_Count} --- Criteria Txn Count: {var_Criteria_Txn_Count} --- Threshold: {Threshold} --- Low_Vol Threshold: {LowVolThresh}".format(str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Message: 'SplitTime: 00:01:05,031 -- COMPLETED: ALPHA_CHECK --- Total Txn Count: 1234--- Criteria Txn Count: 0 --- Threshold: 10 --- Low_Vol Threshold: 0'
Arguments: None

代码中的错误引用了我的代码中的日志记录对象,但即使我在调用周围放置

try/except
逻辑,它也不会执行任何操作,错误似乎发生在上游。 我还尝试将记录的内容从格式化字符串更改为简单字符串,但无济于事。 似乎在这个过程中的某个地方,各个作业要么失去了与
Queue
的连接,要么
Queue
中的某些内容失败并导致了问题。

有什么想法吗? 我一直在努力获取更新版本的 Python,这出于多种原因(特别是 f 字符串)是有益的,但我不知道这是否可以解决这个问题,而且我已经用完了故障排除思路。

python python-3.x logging multiprocessing
3个回答
3
投票

即使我在调用周围放置 try/ except 逻辑,它也不会执行任何操作。

这可能是因为,如果日志记录包遇到与日志记录本身有关的异常,它将打印回溯,但不会引发异常本身。 这在

logging.Handler.handleError
的文档字符串中有更完整的解释。

从设置开始:

logging.raiseExceptions = True

如果模块级属性 raiseExceptions 为 False,异常将被默默忽略。

如果这没有帮助,您可以在

import pdb; pdb.set_trace()
 的代码中添加 
.emit()
调用;像这样的东西:

def emit(self, record):
    try:
        msg = self.format(record)
        stream = self.stream
        stream.write(msg)
        stream.write(self.terminator)
        self.flush()
    except Exception as e:
        import pdb; pdb.set_trace()  # < ---
        self.handleError(record)

其中

record
将是一个
LogRecord
实例。 通常,当我看到弹出日志错误时,这是因为我对给定的格式字符串使用了错误数量的参数,但检查
record
对象应该会告诉您更多信息。

最后,从调用堆栈中,您的日志记录调用本身:

global_logger.info(
    "SplitTime: {str_timeDelta} -- "
    "COMPLETED: {Check_Name} --- "
    "Total Txn Count: {var_Total_Txn_Count} --- "
    "Criteria Txn Count: {var_Criteria_Txn_Count} --- "
    "Threshold: {Threshold} --- "
    "Low_Vol Threshold: {LowVolThresh}".format(
    str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))

很难确切地说出最终引发异常的原因,因为字符串似乎已完全格式化。 (虽然我们看不到

YAML_Config
。)

无论如何,一个建议:您可以利用日志记录的“惰性”字符串格式,而不是像当前那样使用

str.format()
str.format()
调用将尽快进行评估,而如果您将 kwargs 传递给
global_logger.info()
,日志记录包将等待评估它们,直到必须评估为止。


2
投票

我刚刚在 python3.10 中遇到了类似的问题。我正在将日志消息写入多处理队列,并随机得到这个:

    self.queue.put(obj)
  File "<string>", line 2, in put
  File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
TypeError: 'NoneType' object cannot be interpreted as an integer

它在我看来是某种竞争条件,并用锁修复了它:

with self.lock:
    self.queue.put(msg)

锁和队列都是用管理器创建的:

self.pool = ProcessPoolExecutor(max_workers=1)
self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue(-1)
self.lock = self.manager.Lock()

0
投票

这很可能是 cPython 的一个错误。请参阅此拉取请求。 (现在还没有合并,所以我猜你需要自己修补它)

© www.soinside.com 2019 - 2024. All rights reserved.