我在正在运行的数据分析中实现了多重处理,但有时会出现错误,我不想让整个队列被杀死,而是希望它们被忽略,所以我实现了一个 try 语句。但是新的错误不断出现,是否有更详尽的错误列表可以包含在 except 语句中?一般来说,我只想记录错误,然后继续编写代码。
其次,错误没有记录在我正在制作的日志文件中。不知道为什么。这是我的代码示例,我用一些更简单的步骤替换了 try 语句中的分析。或多或少做了同样的事情,获取数据并将输出写入进程正在运行的每个数据集的 csv(此处用数据帧字典表示)。注意:我在这里故意引入了一个关键错误,错误命名了第二个数据帧中的一列。
示例代码...
import pandas as pd
import numpy as np
import logging
import traceback
from multiprocessing import Pool, Manager, Process
output_dir = ''
input_dir = 'df_folder'
# make our data
# Create a dict of 5 dataframes with 3 columns of ten entries each
df_dict = {i: pd.DataFrame(np.random.rand(10, 3), columns=['col1', 'col2', 'col3']) for i in range(5)}
# Introduce an error by changing a column name in one of the dataframes
df_dict[1].columns = ['col1', 'col2', 'wrong_col']
for key, df in df_dict.items():
file_name = f"{key}_df.csv"
file_path = os.path.join(input_dir, file_name)
df.to_csv(file_path, index=False)
# define funditons for mutiprocessing and error logging...
def listener_process(queue):
logging.basicConfig(filename='abi_detector_app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
while True:
message = queue.get()
if message == 'kill':
break
logging.error(message)
def example_process(df_id, queue):
df = pd.read_csv(f"{input_dir}/{df_id}_df.csv")
try:
for col in ['col1', 'col2', 'col3']:
mean = df[col].mean()
std = df[col].std()
result = pd.DataFrame({'mean': [mean], 'std': [std]}, index=[col])
result.to_csv(f'{output_dir}/df_{df_id}_{col}_stats.csv')
except (IndexError, KeyError) as e:
logging.error('Error in dataframe id: %s', df_id)
logging.error(traceback.format_exc())
manager = Manager()
queue = manager.Queue()
listener = Process(target=listener_process, args=(queue,))
listener.start()
pool_size = 5
df_list = df_dict.keys()
# run the processes with the specified number of cores
# new code which passes the error messages to the listener process
with Pool(pool_size) as p:
p.starmap(example_process, [(df_id, queue) for df_id in df_list])
queue.put('kill')
listener.join()
注意 df_dict 是我的脚本实际执行的操作的占位符。我编辑了该示例,以便将其作为文件夹写入文件,该文件夹将生成的数据帧存储为 csv。然后示例进程加载它们。这是发生的情况的一个更好的示例,因为 df_dict 实际上不需要跨进程共享。只是错误日志文件。
我已经根据 从多个进程记录到单个文件修改了您的代码。
更新
无论平台是类 Unix 平台(默认使用 fork 方法启动新进程)还是 Windows(必须使用 spawn 方法启动新进程),代码都会成功执行。
我们希望创建数据帧的代码执行一次。如果该代码位于全局范围内,但不在
if __name__ == '__main__':
块内,则在 Windows 下,每个池进程都将重新创建数据帧。根据代码的复杂性,这可能效率低下。此外,代码使用随机数生成器来创建数据帧,这将导致每个池进程创建具有不同数据的数据帧,这可能是一个问题。因此,我们需要确保数据帧创建代码仅由主进程执行一次,并使用池初始化函数在每个池进程中初始化一次。无论我们在 Windows 还是 Linux 下运行,该技术都适用。但是,如果我们在 Linux 或其他支持创建进程的 fork 方法的平台下运行,那么允许池进程从主进程继承数据帧通常比将数据帧传递给主进程更有效。池初始值设定项。下面的代码现在检查我们是否可以使用 fork 方法,如果可以,则允许池进程继承数据帧。池初始值设定项仍用于初始化将记录消息的每个进程的多处理日志处理。
import pandas as pd
import numpy as np
import logging
import logging.handlers
import traceback
import sys
from multiprocessing import Pool, Queue, Process, set_start_method
def listener_process(queue):
root = logging.getLogger()
h = logging.FileHandler('abi_detector_app.log', mode='w')
f = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
h.setFormatter(f)
root.addHandler(h)
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def init_pool(*args):
queue = args[0]
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
if len(args) == 2:
# We have additionally been past the dataframes because they could not
# be inherited.
global df_dict
df_dict = args[1]
output_dir = ''
def example_process(df_id):
df = df_dict[df_id]
try:
for col in ['col1', 'col2', 'col3']:
mean = df[col].mean()
std = df[col].std()
result = pd.DataFrame({'mean': [mean], 'std': [std]}, index=[col])
result.to_csv(f'{output_dir}/df_{df_id}_{col}_stats.csv')
logging.debug(f'Processing complete for id {df_id}.')
except Exception: # Catch all possible exceptions
logging.error('Error in dataframe id: %s', df_id)
logging.error(traceback.format_exc())
if __name__ == '__main__':
# If we can use the fork method of starting new processes, then the pool processes
# can inherit the dataframes from the main process, which is normally more efficient
# than passing the dataframes to each pool process using the pool initializer.
try:
set_start_method('fork')
except:
using_fork = False
else:
using_fork = True
# Create a dict of 5 dataframes with 3 columns of ten entries each.
# We are ensuring that the code to create the dataframes is only executed once (by
# the main process):
df_dict = {i: pd.DataFrame(np.random.rand(10, 3), columns=['col1', 'col2', 'col3']) for i in range(5)}
# Introduce an error by changing a column name in one of the dataframes
df_dict[1].columns = ['col1', 'col2', 'wrong_col']
queue = Queue()
listener = Process(target=listener_process, args=(queue,))
listener.start()
pool_size = 5
df_list = df_dict.keys()
# run the processes with the specified number of cores
# Can the pool processes inherit the dataframes or must we pass them
# to each pool process?
pool_initargs = (queue,) if using_fork else (queue, df_dict)
# new code which passes the error messages to the listener process
with Pool(pool_size, initializer=init_pool, initargs=pool_initargs) as p:
p.map(example_process, df_list)
#queue.put('kill')
queue.put(None) # Use None as the seninel
listener.join()
abi_Detector_app.log的内容:
root - DEBUG - Processing complete for id 0.
root - ERROR - Error in dataframe id: 1
root - ERROR - Traceback (most recent call last):
File "C:\Program Files\Python38\lib\site-packages\pandas\core\indexes\base.py", line 3361, in get_loc
return self._engine.get_loc(casted_key)
File "pandas\_libs\index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
File "pandas\_libs\hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
File "pandas\_libs\hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
KeyError: 'col3'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Booboo\test\test.py", line 46, in example_process
mean = df[col].mean()
File "C:\Program Files\Python38\lib\site-packages\pandas\core\frame.py", line 3455, in __getitem__
indexer = self.columns.get_loc(key)
File "C:\Program Files\Python38\lib\site-packages\pandas\core\indexes\base.py", line 3363, in get_loc
raise KeyError(key) from err
KeyError: 'col3'
root - DEBUG - Processing complete for id 2.
root - DEBUG - Processing complete for id 3.
root - DEBUG - Processing complete for id 4.