说我有一个dag_id为run_id的DAG,并且管道为T1> T2> T3,其中T1,T2,T3是Python运算符。
我希望能够将参数从T1传递到T2 无将它们存储在数据库/ S3中,然后将它们读回到T2,因为它很长。
我知道如果T1失败,那么T2将不会执行,并且经过调查,我发现该决定是基于退出代码(0/1)的,因此似乎没有传递参数的方法。
有人知道我是否可以在不进行外部读写的情况下将参数/输出传递给下一个运算符吗?有没有这样的例子,我找不到。
如气流文档中所述,您可以使用Xcoms。 XComs允许任务交换消息,从而实现更细微的控制形式和共享状态。该名称是“交叉通信”的缩写。 XCom主要由键,值和时间戳定义,但也跟踪创建XCom的任务/ DAG等属性以及何时可见。任何可以腌制的对象都可以用作XCom值,因此用户应确保使用适当大小的对象。检查此链接:https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#xcoms
说T1具有任务ID t1并调用python函数func。
def func():
return some_value
您可以使用xcom获取func的返回值。假设T2调用了某些python函数sample_function。然后您可以获取func返回值作为
def sample_function(**context):
value = context['task_instance'].xcom_pull(task_ids='t1')
注意,您需要在函数中传递上下文,这可以通过在运算符中提及provide_context=True
来完成。