我有一个气流作业,它在一个任务中启动 Spark 作业,下一个任务提取应用程序日志以查找 Spark 作业应用程序 ID。我在 Spark-Submit 任务中使用 Xcom Push,在下一个任务中使用 Xcom_pull。
def extract_app_id(**kwargs):
ti = kwargs['ti']
log = ti.xcom_pull(task_ids='submit_spark_job')
log_str = str(log)
logger.info("Xcom pulled str log %s",log_str)
app_id = re.search(r'application_\d+_\d+', str(log))
spark_submit_task = SparkSubmitOperator(
name=f"{job_name}",
task_id="submit_spark_job",
conn_id="spark3",
conf=conf,
java_class="Application",
application=f"{jar_path}{jar_file}",
do_xcom_push=True,
application_args=application_args,
execution_timeout=timedelta(minutes=5)
)
extract_app_id_task = PythonOperator(
task_id='extract_app_id',
python_callable=extract_app_id,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE
)
spark_submit_task >> extract_app_id_task
问题是
spark_submit_task
成功启动了spark作业,并且日志正在打印spark应用程序ID,如下所示来自实际日志:
4/06/14 22:37:37 INFO Client: Application report for application_1718047116285_4363 (state: RUNNING)
Identified spark driver id: application_1718047116285_4363
但是,即使 Spark 作业已成功启动,
xcom_pull
始终会返回 null
。
我用
Python 3
和Airflow 2.0.0
。
我相信这个想法是 SparkSubmitOperator 默认情况下不返回任何 XCOM。根据我的经验,总是有一个简单的检查来验证源代码中运算符的执行方法末尾指定了什么返回值。因此,我研究了它,没有 return 语句,只有一个钩子可以使用所需的配置启动作业:https://github.com/apache/airflow/blob/providers-apache-spark/4.7 .1/airflow/providers/apache/spark/operators/spark_submit.py#L30
例如,为了进行比较,当查询处理完成时,AthenaQueryOperator 将返回一个 QueryExecutionID。请参阅:https://github.com/apache/airflow/blob/providers-amazon/8.21.0/airflow/providers/amazon/aws/operators/athena.py#L181
考虑到这一点,我假设您可能会在需要时尝试自定义现有运算符或决定替代方法