为什么 BeamRunPythonPipelineOperator 无法跟踪数据流作业状态,一直等到作业结束而不返回数据流日志?

问题描述 投票:0回答:1

我使用 Airflow on Cloud Composer (composer-2.9.8-airflow-2.9.3) 中的 BeamRunPythonPipelineOperator() 触发数据流管道。作业已成功提交到 Dataflow,但气流任务继续运行,而不记录来自 Dataflow 的任何作业状态更新,并且任务以

INFO - Process exited with return code: 0
退出。我想跟踪 Airflow 中的作业状态,以便我可以根据作业状态触发后续任务(例如 JOB_STATE_DONE)。

我的操作员设置如下:

start_dataflow_job = BeamRunPythonPipelineOperator(
        task_id="start_dataflow_job",
        runner="DataflowRunner",
        py_file=GCS_FILE_LOCATION,
        pipeline_options={
            "tempLocation": GCS_BUCKET,
            "stagingLocation": GCS_BUCKET,
            "output_project": PROJECT,
            "service_account_email": GCP_CUSTOM_SERVICE_ACCOUNT, 
            "requirements_file": "gs://GCS_CODE_BUCKET/requirements.txt", 
            "max_num_workers": "2", 
            "region": "us-east1", 
             "experiments": [
                "streaming_boot_disk_size_gb=100", 
                "workerLogLevelOverrides=com.google.cloud.dataflow#DEBUG", 
                "dataflow_service_options=enable_prime"
            ],
        },
        py_options=[],
        py_requirements=["apache-beam[gcp]~=2.60.0"],
        py_interpreter="python3",
        py_system_site_packages=False,
        dataflow_config=DataflowConfiguration(
            job_name="{{task.task_id}}",
            project_id=PROJECT,
            location="us-east1",
            wait_until_finished=False,
            gcp_conn_id="google_cloud_default",
        ),
        do_xcom_push=True,
    )

日志是:

[2024-11-04, 04:31:42 UTC] {beam.py:151} INFO - Start waiting for Apache Beam process to complete.
[2024-11-04, 04:41:09 UTC] {beam.py:172} INFO - Process exited with return code: 0
[2024-11-04, 04:41:11 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2024-11-04, 04:41:11 UTC] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=test_data_pipeline, task_id=start_dataflow_job, run_id=manual__2024-11-04T04:24:59.026429+00:00, execution_date=20241104T042459, start_date=20241104T042502, end_date=20241104T044111
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:243} INFO - Task exited with return code 0
[2024-11-04, 04:41:12 UTC] {taskinstance.py:3506} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:222} ▲▲▲ Log group end
google-cloud-dataflow apache-beam google-cloud-composer airflow-2.x
1个回答
0
投票

Airflow 使用标准的 Python 日志框架来写入日志,并且在任务期间,根记录器配置为写入任务的日志。因此,要跟踪 Airflow 中 Dataflow 管道的进度,Dataflow 管道中的日志记录级别需要设置为

INFO
,我最初设置为
ERROR
。一旦我更新了日志级别,操作员就可以提交作业并在 XCOM 中获得
dataflow_job_id
,不久之后将自己标记为成功,传感器会跟进并跟踪作业状态直至完成。

logging.getLogger().setLevel(logging.INFO)

在此处阅读更多信息:从代码写入 Airflow 任务日志

© www.soinside.com 2019 - 2024. All rights reserved.