我是 Apache Airflow 的新用户。我正在构建如下所示的 DAG 来安排任务:
def add():
return 1 + 1
def multiply(a):
return a * 999
dag_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2023, 2, 27),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=3)}
with DAG(
dag_id='dag',
start_date=datetime(2023, 2, 27),
default_args=dag_args,
schedule_interval='@once',
end_date=None,) as dag:
t1 = PythonOperator(task_id="t1",
python_callable=add,
dag=dag
)
t2 = PythonOperator(task_id="t2",
python_callable=multiply,
dag=dag)
如您所见,
t2
取决于t1
的结果。
请问有什么办法可以把t1
的返回结果直接传给t2
。我正在使用 Apache Airflow 2.5.1 版本和 Python 3.9.
我对
xcom
做了一些研究,发现Airflow任务的所有结果都存储在那里,可以通过代码访问
task_instance = kwargs['t1'] task_instance.xcom_pull(task_ids='t1')
您的 DAG 可以使用任务流 API 进行简化。它将处理 xcom 并简化代码。
import pendulum
from airflow.decorators import dag, task
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
)
def taskflow_api_etl():
@task()
def add():
return 1+1
@task()
def multiply(a: int):
return a * 99
order_data = add()
multiply(order_data)
etl_dag = taskflow_api_etl()