我正在尝试执行一个由几个函数组成的 Airflow 脚本。我想将“program_no”的值作为 Spark 提交请求中的参数传递,我通过 get_conf 方法中的上下文从 api 调用中获取该请求。我试图像 {ti.xcom_pull(task_ids='parameterized_task')} 一样传递,但收到错误 - NameError: name 'ti' is not Defined。请帮忙如何解决这个问题?
我还尝试传递 {prog_no} 而不是 {ti.xcom_pull(task_ids='parameterized_task')} 但收到相同的错误 - prog_no 未定义
dag = DAG(
dag_id=os.path.basename(__file__).replace(".py", ""),
default_args=default_args,
start_date=datetime(2023, 12, 21),
schedule_interval=None,
description='Event based job for calculating missed sales based allowances for retroactive program setup'
)
def get_conf(**context):
global prog_no
#ti = context['ti']
prog_no = context['dag_run'].conf['program_no']
return prog_no
parameterized_task = PythonOperator(
task_id="parameterized_task",
python_callable=get_conf,
provide_context=True,
dag=dag
)
sparkway_request = SimpleHttpOperator(
task_id='sparkway_request',
endpoint=Variable.get('sparkway_api_endpoint'),
method="POST",
http_conn_id="SPARKWAY_CONN",
data=json.dumps({
"cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
"arguments": f"{dag.latest_execution_date},{ti.xcom_pull(task_ids='parameterized_task')}",
"type": "job"
}),
headers={
"Authorization": f"Bearer {Variable.get('sparkway_token')}",
"Content-Type": "application/json",
"X-CSRF-TOKEN": Variable.get('sparkway_csrf_token')
},
response_check=lambda response: handle_sparkway_response(response),
log_response=True,
dag=dag
)
参数化_任务>>sparkway_request
ti
将不会被定义,因为它只能使用 jinja 模板来访问。我不确定SimpleHttpOperator
是否支持这个,但你可以尝试这个
data=json.dumps({
"cmd": "sparkway-submit ........",
"arguments": "{{ execution_date }},{{ task_instance.xcom_pull(task_ids='parameterized_task', key='program_no') }}",
"type": "job"
})
通过这种方式访问 jinja 模板中的 ti 应该可以工作:
data=json.dumps({
"cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
"arguments": f"{dag.latest_execution_date}, {{ ti.xcom_pull(task_ids=["parameterized_task"][0]) }}",
"type": "job"
}),