我如何将数据从python_callable传递给on_success_callback?

问题描述 投票:0回答:1

我有一个带有PythonOperator的DAG,它正在调用一个函数。我也有一个回调函数。我想将一些信息从可调用传递到回调。我该怎么办?

我尝试使用上下文字典:

from datetime import datetime
from pprint import pprint
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

args = {"owner": "airflow",
        "start_date": datetime(2019, 11, 18)}

dag = DAG(dag_id="context_demo",
          default_args=args)


def test_callable(**context):
    print('Starting callable...')
    context['z_my_var'] = 'my_val'
    pprint(context)
    print('Callable finished.')


def test_callback(context):
    print('Starting callback...')
    pprint(context)
    print('Callback finished.')


print_exec_date = PythonOperator(
    task_id="print_exec_date",
    python_callable=test_callable,
    on_success_callback=test_callback,
    provide_context=True,
    dag=dag,
)

但是我在test_callable中设置的值不会保留在test_callback中。

callback airflow
1个回答
0
投票

就我而言,我使用xCom来解决。这是来自官方气流文档的完整example。您可以阅读有关xcome here的更多信息。

# inside a PythonOperator called 'pushing_task'
def push_function():
   return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='pushing_task')
© www.soinside.com 2019 - 2024. All rights reserved.