处理异常并继续循环

问题描述 投票:1回答:1

我在Python 3.7中编写了一个ETL框架,它使用函数作为带有特殊装饰器的“任务”。大多数这些任务都会循环运行。如果某些事情在循环中引发异常,我希望该函数通过记录有关失败的数据来处理此异常,并继续循环。

这是我到目前为止的简化示例:

class TaskExecutionError(RuntimeError):
    def __init__(self, msg="", context={}):
        self.msg = msg
        self.context = context

    def __str__(self):
        return self.msg or "Error executing task."


def task(fn):
    @wraps(fn)
    def _wrapper(*args, **kwargs):
        start_ts = datetime.utcnow()
        try:
            return fn(*args, **kwargs)

        except TaskExecutionError as e:
            logger.exception(f"Task execution error will be logged: {e}.")
            fail_data = {
                    "task_name": fn.__name__,
                    "args": list(args),
                    "kwargs": kwargs,
                    "context": e.context,
                    "fail_message": str(e),
                    "fail_time": str(datetime.utcnow()),
                    # etc.
                }
            )
            # Write failure data in an object store

        finally:
            end_ts = datetime.utcnow()
            logger.info(f"*** Wallclock: {end_ts - start_ts}.")

    _wrapper.is_task = True
    return _wrapper


@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
    """
    Test handling failures.
    """
    for i in range(10):
        if i % 3:
            raise TaskExecutionError(context={"i": i})
        else:
            print("All's well")

只要我看到正在打印和保存的消息,这种方法就可以正常工作,但当然,一旦引发第一个异常,执行就会中断。

我该如何解决这个问题,以便继续执行?

好像我不能使用非常方便的异常机制,我可能不得不设计一个自定义的handle_failure()函数。但我不确定将函数装饰器的上下文传递给handle failure()函数的最佳方法,而我在装饰函数中调用它。

由于我将在几个@task修饰函数中使用此机制,我希望尽可能轻量级调用,没有很多参数。

感谢您提出的任何建议。

python-3.x exception python-decorators
1个回答
0
投票

我使用inspect解决了这个问题,我不喜欢经常使用它,但这似乎是必要的:

def task(fn):
    @wraps(fn)
    def _wrapper(*args, **kwargs):
        start_ts = datetime.utcnow()
        try:
            return fn(*args, **kwargs)

        finally:
            end_ts = datetime.utcnow()
            logger.info(f"*** Wallclock: {end_ts - start_ts}.")

    _wrapper.is_task = True


def handle_task_failure(exc, local_ctx={}):
    caller_frame = inspect.currentframe().f_back
    wrapper_frame = caller_frame.f_back
    caller_ctx = wrapper_frame.f_locals
    print(f"Context: {caller_ctx}")
    logger.exception(f"Task execution error will be logged: {exc}.")
    fail_data = {
            "start_time": caller_ctx.get("start_ts"),
            "runid": caller_ctx.get("runid"),
            "task_name": caller_ctx.get("fn").__name__,
            "args": list(caller_ctx.get("args")),
            "kwargs": caller_ctx.get("kwargs"),
            "context": local_ctx,
            "fail_message": str(exc),
            "fail_time": str(datetime.utcnow()),
            "traceback": format_stack(caller_frame),
    }
    # Save failure data in object store

@task
def test_fail_log(a, b, c, kwa=1, kwb=2):
    """
    Test handling failures.
    """
    for i in range(10):
        try:
            if i % 3:
                raise RuntimeError("All's borked.")
            else:
                print("All's well.")
        except Exception as exc:
            handle_task_failure(exc, {"i": i})

另一方面,我不需要自定义异常类和处理失败的调用,这在几个有趣的重复是非常轻量级的。

© www.soinside.com 2019 - 2024. All rights reserved.