来自 S3 的 Airflow 同步 Dags 导致 Dag 执行错误

问题描述 投票:0回答:0

我在 kubernetes 上运行气流,我正在尝试在我的气流中实现 dag s3 同步而不是 git-sync。

我的气流部署使用 helm chart apache-airflow https://airflow.apache.org/ --version 1.5.0.

我的 values.yaml 设置如下:

airflow:
  env:
  - name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
    value: "True"
config:
  core:
    dags_folder: /opt/airflow/dags/repo/
    executor: KubernetesExecutor
dags:
  persistence:
    enabled: false
executor: KubernetesExecutor

scheduler:
  extraVolumeMounts:
    - name: dags-volume
      mountPath: /opt/airflow/dags/repo
  extraVolumes:
    - name: dags-volume
      emptyDir: {}
  extraContainers:
    - name: s3-sync
      image: amazon/aws-cli
      command:
        - /bin/sh
        - -c
        - >
          while true; do
            aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*" 
            sleep 600;
          done
      env:
        - name: AWS_ACCESS_KEY_ID
          value: "key"
        - name: AWS_SECRET_ACCESS_KEY
          value: "secret"
        - name: AWS_DEFAULT_REGION
          value: "region"
      volumeMounts:
      - name: dags-volume
        mountPath: /opt/airflow/dags/repo

triggerer:
  extraVolumeMounts:
    - name: dags-volume
      mountPath: /opt/airflow/dags/repo
  extraVolumes:
    - name: dags-volume
      emptyDir: {}
  extraContainers:
    - name: s3-sync
      image: amazon/aws-cli
      command:
        - /bin/sh
        - -c
        - >
          while true; do
            aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*" 
            sleep 600;
          done
      env:
        - name: AWS_ACCESS_KEY_ID
          value: "key"
        - name: AWS_SECRET_ACCESS_KEY
          value: "secret"
        - name: AWS_DEFAULT_REGION
          value: "region"
      volumeMounts:
      - name: dags-volume
        mountPath: /opt/airflow/dags/repo

我正在从 s3 存储桶同步到 POD 内的文件夹。 当我访问 pod 时,dag 文件按预期位于文件夹内:

/opt/airflow/dags/repo/dags/dag_manual.py

但是当我尝试运行任何 dag 时,出现以下错误:

[2023-03-24 23:01:40,968] {dagbag.py:507} INFO - Filling up the DagBag from /opt/airflow/dags/repo/dags/dag_manual.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 360, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 204, in get_dag
f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'DAG_MANUAL' could not be found; either it does not exist or it failed to parse.

我还没有弄清楚这个问题,如果有人能帮忙的话。

kubernetes amazon-s3 airflow kubernetes-helm
© www.soinside.com 2019 - 2024. All rights reserved.