DAG中的运算符之间共享

问题描述 投票:0回答:1

说我有一个dag_id为run_id的DAG,并且管道为T1> T2> T3,其中T1,T2,T3是Python运算符。

我希望能够将参数从T1传递到T2 将它们存储在数据库/ S3中,然后将它们读回到T2,因为它很长。

我知道如果T1失败,那么T2将不会执行,并且经过调查,我发现该决定是基于退出代码(0/1)的,因此似乎没有传递参数的方法。

有人知道我是否可以在不进行外部读写的情况下将参数/输出传递给下一个运算符吗?有没有这样的例子,我找不到。

airflow airflow-scheduler apache-airflow-xcom
1个回答
0
投票

如气流文档中所述,您可以使用Xcoms。 XComs允许任务交换消息,从而实现更细微的控制形式和共享状态。该名称是“交叉通信”的缩写。 XCom主要由键,值和时间戳定义,但也跟踪创建XCom的任务/ DAG等属性以及何时可见。任何可以腌制的对象都可以用作XCom值,因此用户应确保使用适当大小的对象。检查此链接:https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#xcoms

说T1具有任务ID t1并调用python函数func

def func():
    return some_value

您可以使用xcom获取func的返回值。假设T2调用了某些python函数sample_function。然后您可以获取func返回值作为

def sample_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='t1')

注意,您需要在函数中传递上下文,这可以通过在运算符中提及provide_context=True来完成。

© www.soinside.com 2019 - 2024. All rights reserved.