我一直在开发一个 python3 (3.4.5) 项目,该项目利用
multiprocessing.Pool
通过 4 个工作人员运行大约 50 多个作业。 我有一个带有 logging.handlers.QueueListener
的单独进程设置,因此我可以通过与 Queue
一起使用的 multiprocessing.Manager()
将全局内容记录到单个文件中。 所以基本上流程是这样的
Queue
创建
multiprocessing.Manager()
QueueListener
监听我刚刚为全局日志创建的 Queue
启动专用日志记录进程。 (我也尝试过使用主程序之外的线程来实现相同的结果。)multiprocessing.Pool
来处理各个作业,向它们传递之前创建的 Queue
以及运行和设置其日志记录所需的配置信息(有一个全局日志,以及每个作业的单独日志,其中包含更精细的信息)。作业以 map_async
开始。不过,我在某些作业上不断收到间歇性错误,通常其中 1 个作业有错误(每次都有一个不同的错误),偶尔会有 2 个相同的错误或零个错误。据我所知,导致错误的不是作业中的代码,而是
multiprocessing
或 logging
设置中的某些内容。 这是我收到的错误的示例:
--- Logging error ---
Traceback (most recent call last):
File "/usr/lib64/python3.4/logging/handlers.py", line 1347, in emit
self.enqueue(self.prepare(record))
File "/usr/lib64/python3.4/logging/handlers.py", line 1313, in enqueue
self.queue.put_nowait(record)
File "<string>", line 2, in put_nowait
File "/usr/lib64/python3.4/multiprocessing/managers.py", line 731, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 413, in _send_bytes
self._send(chunk)
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 369, in _send
n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)
Call stack:
File "./sampling__test__py.py", line 100, in <module>
run_pool = multiprocessing.Pool(4)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 267, in _Popen
return Popen(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 21, in __init__
self._launch(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 77, in _launch
code = process_obj._bootstrap()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "/home/username/value_check.py", line 338, in value_check
global_logger.info("SplitTime: {str_timeDelta} -- COMPLETED: {Check_Name} --- Total Txn Count: {var_Total_Txn_Count} --- Criteria Txn Count: {var_Criteria_Txn_Count} --- Threshold: {Threshold} --- Low_Vol Threshold: {LowVolThresh}".format(str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Message: 'SplitTime: 00:01:05,031 -- COMPLETED: ALPHA_CHECK --- Total Txn Count: 1234--- Criteria Txn Count: 0 --- Threshold: 10 --- Low_Vol Threshold: 0'
Arguments: None
代码中的错误引用了我的代码中的日志记录对象,但即使我在调用周围放置
try/except
逻辑,它也不会执行任何操作,错误似乎发生在上游。 我还尝试将记录的内容从格式化字符串更改为简单字符串,但无济于事。 似乎在这个过程中的某个地方,各个作业要么失去了与 Queue
的连接,要么 Queue
中的某些内容失败并导致了问题。
有什么想法吗? 我一直在努力获取更新版本的 Python,这出于多种原因(特别是 f 字符串)是有益的,但我不知道这是否可以解决这个问题,而且我已经用完了故障排除思路。
即使我在调用周围放置 try/ except 逻辑,它也不会执行任何操作。
这可能是因为,如果日志记录包遇到与日志记录本身有关的异常,它将打印回溯,但不会引发异常本身。 这在
logging.Handler.handleError
的文档字符串中有更完整的解释。
从设置开始:
logging.raiseExceptions = True
如果模块级属性 raiseExceptions 为 False,异常将被默默忽略。
如果这没有帮助,您可以在
import pdb; pdb.set_trace()
的代码中添加
.emit()
调用;像这样的东西:
def emit(self, record):
try:
msg = self.format(record)
stream = self.stream
stream.write(msg)
stream.write(self.terminator)
self.flush()
except Exception as e:
import pdb; pdb.set_trace() # < ---
self.handleError(record)
其中
record
将是一个 LogRecord
实例。 通常,当我看到弹出日志错误时,这是因为我对给定的格式字符串使用了错误数量的参数,但检查 record
对象应该会告诉您更多信息。
最后,从调用堆栈中,您的日志记录调用本身:
global_logger.info(
"SplitTime: {str_timeDelta} -- "
"COMPLETED: {Check_Name} --- "
"Total Txn Count: {var_Total_Txn_Count} --- "
"Criteria Txn Count: {var_Criteria_Txn_Count} --- "
"Threshold: {Threshold} --- "
"Low_Vol Threshold: {LowVolThresh}".format(
str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
很难确切地说出最终引发异常的原因,因为字符串似乎已完全格式化。 (虽然我们看不到
YAML_Config
。)
无论如何,一个建议:您可以利用日志记录的“惰性”字符串格式,而不是像当前那样使用
str.format()
。 str.format()
调用将尽快进行评估,而如果您将 kwargs 传递给 global_logger.info()
,日志记录包将等待评估它们,直到必须评估为止。
我刚刚在 python3.10 中遇到了类似的问题。我正在将日志消息写入多处理队列,并随机得到这个:
self.queue.put(obj)
File "<string>", line 2, in put
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
TypeError: 'NoneType' object cannot be interpreted as an integer
它在我看来是某种竞争条件,并用锁修复了它:
with self.lock:
self.queue.put(msg)
锁和队列都是用管理器创建的:
self.pool = ProcessPoolExecutor(max_workers=1)
self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue(-1)
self.lock = self.manager.Lock()
这很可能是 cPython 的一个错误。请参阅此拉取请求。 (现在还没有合并,所以我猜你需要自己修补它)