将先前 Python 运算符任务的返回值传递给气流中的另一个

问题描述 投票:0回答:1

我是 Apache Airflow 的新用户。我正在构建如下所示的 DAG 来安排任务:

def add():
    return 1 + 1

def multiply(a):
    return a * 999
dag_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 27),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=3)}
with DAG(
    dag_id='dag',
    start_date=datetime(2023, 2, 27),
    default_args=dag_args,
    schedule_interval='@once',
    end_date=None,) as dag:

    t1 = PythonOperator(task_id="t1",
                        python_callable=add,
                        dag=dag
                        )
    t2 = PythonOperator(task_id="t2",
                        python_callable=multiply,
                        dag=dag)

如您所见,

t2
取决于
t1
的结果。 请问有什么办法可以把
t1
的返回结果直接传给
t2
。我正在使用 Apache Airflow 2.5.1 版本和 Python 3.9.

我对

xcom
做了一些研究,发现Airflow任务的所有结果都存储在那里,可以通过代码访问
task_instance = kwargs['t1'] task_instance.xcom_pull(task_ids='t1')

python python-3.x airflow etl directed-acyclic-graphs
1个回答
0
投票

您的 DAG 可以使用任务流 API 进行简化。它将处理 xcom 并简化代码。

import pendulum

from airflow.decorators import dag, task


@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['example'],
)
def taskflow_api_etl():
    @task()
    def add():
        return 1+1
    
    @task()
    def multiply(a: int):

        return a * 99

    order_data = add()
    multiply(order_data)


etl_dag = taskflow_api_etl()
© www.soinside.com 2019 - 2024. All rights reserved.