我在Python 3.7中编写了一个ETL框架,它使用函数作为带有特殊装饰器的“任务”。大多数这些任务都会循环运行。如果某些事情在循环中引发异常,我希望该函数通过记录有关失败的数据来处理此异常,并继续循环。
这是我到目前为止的简化示例:
class TaskExecutionError(RuntimeError):
def __init__(self, msg="", context={}):
self.msg = msg
self.context = context
def __str__(self):
return self.msg or "Error executing task."
def task(fn):
@wraps(fn)
def _wrapper(*args, **kwargs):
start_ts = datetime.utcnow()
try:
return fn(*args, **kwargs)
except TaskExecutionError as e:
logger.exception(f"Task execution error will be logged: {e}.")
fail_data = {
"task_name": fn.__name__,
"args": list(args),
"kwargs": kwargs,
"context": e.context,
"fail_message": str(e),
"fail_time": str(datetime.utcnow()),
# etc.
}
)
# Write failure data in an object store
finally:
end_ts = datetime.utcnow()
logger.info(f"*** Wallclock: {end_ts - start_ts}.")
_wrapper.is_task = True
return _wrapper
@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
"""
Test handling failures.
"""
for i in range(10):
if i % 3:
raise TaskExecutionError(context={"i": i})
else:
print("All's well")
只要我看到正在打印和保存的消息,这种方法就可以正常工作,但当然,一旦引发第一个异常,执行就会中断。
我该如何解决这个问题,以便继续执行?
好像我不能使用非常方便的异常机制,我可能不得不设计一个自定义的handle_failure()
函数。但我不确定将函数装饰器的上下文传递给handle failure()
函数的最佳方法,而我在装饰函数中调用它。
由于我将在几个@task
修饰函数中使用此机制,我希望尽可能轻量级调用,没有很多参数。
感谢您提出的任何建议。
我使用inspect
解决了这个问题,我不喜欢经常使用它,但这似乎是必要的:
def task(fn):
@wraps(fn)
def _wrapper(*args, **kwargs):
start_ts = datetime.utcnow()
try:
return fn(*args, **kwargs)
finally:
end_ts = datetime.utcnow()
logger.info(f"*** Wallclock: {end_ts - start_ts}.")
_wrapper.is_task = True
def handle_task_failure(exc, local_ctx={}):
caller_frame = inspect.currentframe().f_back
wrapper_frame = caller_frame.f_back
caller_ctx = wrapper_frame.f_locals
print(f"Context: {caller_ctx}")
logger.exception(f"Task execution error will be logged: {exc}.")
fail_data = {
"start_time": caller_ctx.get("start_ts"),
"runid": caller_ctx.get("runid"),
"task_name": caller_ctx.get("fn").__name__,
"args": list(caller_ctx.get("args")),
"kwargs": caller_ctx.get("kwargs"),
"context": local_ctx,
"fail_message": str(exc),
"fail_time": str(datetime.utcnow()),
"traceback": format_stack(caller_frame),
}
# Save failure data in object store
@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
"""
Test handling failures.
"""
for i in range(10):
try:
if i % 3:
raise RuntimeError("All's borked.")
else:
print("All's well.")
except Exception as exc:
handle_task_failure(exc, {"i": i})
另一方面,我不需要自定义异常类和处理失败的调用,这在几个有趣的重复是非常轻量级的。