从失败的 Airflow KubernetesPodOperator 任务中提取 XCOM 值

问题描述 投票:0回答:1

我正在开发一个继承自 KubernetesPodOperator (KPO) 的操作符来执行 bash 脚本。 我有内置的错误处理来处理这个 bash 脚本失败的许多可能原因。

我正在尝试创建测试 DAG,以证明此错误处理有效。该 DAG 将包含一个后续任务,该任务断言基于 KPO 的任务的退出代码等于预期的退出代码值。 为此,我一直在尝试使用 XCOM 从基于 KPO 的操作员任务中推送退出代码,并在后续断言任务中拉出此退出代码。

我已经设法在成功运行基于 KPO 的任务的情况下使其正常工作,但在任务最初失败时尚未做到这一点。 我在 KPO 的文档中看到 “XCOM 将仅针对标记为 State.SUCCESS 的任务推送。” 并一直在尝试找到解决方法。

我尝试将失败的任务标记为成功,如此处所示。 虽然这确实将任务状态更改为成功,但退出代码仍然不作为 Xcom 值出现,因此我无法在后续任务中通过 xcom 提取退出代码。

python kubernetes airflow astronomer
1个回答
0
投票

KubernetesPodOperator 接受回调参数,请参阅此处的文档:KubernetesPodOperator — apache-airflow-providers-cncf-kubernetes 文档

因此,您可以创建类似的内容,确保在 Pod 完成之后但在删除之前调用此方法。

class MyKpoCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_completion(*, pod: k8s.V1Pod, client: client_type, mode: str, **kwargs) -> None:
        context = get_current_context()
        exit_code = pod.status.container_statuses[0].state.terminated.exit_code
        context["ti"].xcom_push(key="exit_code", value=exit_code)

并将此类(不是实例!)传递给 KubernetesPodOperator。考虑上面的伪代码;这未经测试,并且基于对 k8s 客户端库的高级审查。

© www.soinside.com 2019 - 2024. All rights reserved.