我使用 Airflow on Cloud Composer (composer-2.9.8-airflow-2.9.3) 中的 BeamRunPythonPipelineOperator() 触发数据流管道。作业已成功提交到 Dataflow,但气流任务继续运行,而不记录来自 Dataflow 的任何作业状态更新,并且任务以
INFO - Process exited with return code: 0
退出。我想跟踪 Airflow 中的作业状态,以便我可以根据作业状态触发后续任务(例如 JOB_STATE_DONE)。
我的操作员设置如下:
start_dataflow_job = BeamRunPythonPipelineOperator(
task_id="start_dataflow_job",
runner="DataflowRunner",
py_file=GCS_FILE_LOCATION,
pipeline_options={
"tempLocation": GCS_BUCKET,
"stagingLocation": GCS_BUCKET,
"output_project": PROJECT,
"service_account_email": GCP_CUSTOM_SERVICE_ACCOUNT,
"requirements_file": "gs://GCS_CODE_BUCKET/requirements.txt",
"max_num_workers": "2",
"region": "us-east1",
"experiments": [
"streaming_boot_disk_size_gb=100",
"workerLogLevelOverrides=com.google.cloud.dataflow#DEBUG",
"dataflow_service_options=enable_prime"
],
},
py_options=[],
py_requirements=["apache-beam[gcp]~=2.60.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=PROJECT,
location="us-east1",
wait_until_finished=False,
gcp_conn_id="google_cloud_default",
),
do_xcom_push=True,
)
日志是:
[2024-11-04, 04:31:42 UTC] {beam.py:151} INFO - Start waiting for Apache Beam process to complete.
[2024-11-04, 04:41:09 UTC] {beam.py:172} INFO - Process exited with return code: 0
[2024-11-04, 04:41:11 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2024-11-04, 04:41:11 UTC] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=test_data_pipeline, task_id=start_dataflow_job, run_id=manual__2024-11-04T04:24:59.026429+00:00, execution_date=20241104T042459, start_date=20241104T042502, end_date=20241104T044111
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:243} INFO - Task exited with return code 0
[2024-11-04, 04:41:12 UTC] {taskinstance.py:3506} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:222} ▲▲▲ Log group end
Airflow 使用标准的 Python 日志框架来写入日志,并且在任务期间,根记录器配置为写入任务的日志。因此,要跟踪 Airflow 中 Dataflow 管道的进度,Dataflow 管道中的日志记录级别需要设置为
INFO
,我最初设置为 ERROR
。一旦我更新了日志级别,操作员就可以提交作业并在 XCOM 中获得 dataflow_job_id
,不久之后将自己标记为成功,传感器会跟进并跟踪作业状态直至完成。
logging.getLogger().setLevel(logging.INFO)
在此处阅读更多信息:从代码写入 Airflow 任务日志