[在气流中,我知道您可以通过on_success_callback和on_failure_callback自动发送松弛通知,在我看来,这些通知已正常运行。
在我的用例中,如果当前数据为空且可以正常工作,那么我有一个ETL会引发AirflowSkipException。但这会向我的懈怠发送成功通知
我想知道是否有类似on_skip_callback之类的方法或一种将我今天DAG被跳过的通知发送给我的闲暇人。
任何帮助将不胜感激。谢谢
Edit:为我的ETL添加了代码参考。数据点是从数据库中获取的,并且可能每天变化,有时如果没有要处理的数据,则数据点将为空,反之亦然。
def ETL_function():
# Retrieve data code
....
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
raise AirflowSkipException
# return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
# return True
# ETL Process code
....
ETL_function_Task = PythonOperator(
task_id='ETL_function',
provide_context=True,
python_callable=fleet_behavior_idling,
on_success_callback=task_success_slack_alert,
dag=dag,
)
嗨,@ Anindhito Irmandharu,
您可以使用派生自PythonOperator的ShortCircuitOperator
。
def ETL_function():
...
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
return True
ETL_function_Task = ShortCircuitOperator(
task_id="ETL_function",
python_callable= ETL_function,
provide_context=True,
dag=dag,
)
ETL_function_Task >> downstream_Tasks
注意:您的下游任务将被跳过,但是此任务'ETL_function_Task'将进入成功状态。我不确定是否要为成功执行的任务发送松弛通知。虽然您可以轻松更改
on_success_callback = task_success_slack_alert
根据您的要求。在您使用的slack_hook中写一个新的task_skipped_slack_alert
。