我在 kubernetes 上运行气流,我正在尝试在我的气流中实现 dag s3 同步而不是 git-sync。
我的气流部署使用 helm chart apache-airflow https://airflow.apache.org/ --version 1.5.0.
我的 values.yaml 设置如下:
airflow:
env:
- name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
value: "True"
config:
core:
dags_folder: /opt/airflow/dags/repo/
executor: KubernetesExecutor
dags:
persistence:
enabled: false
executor: KubernetesExecutor
scheduler:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: amazon/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*"
sleep 600;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "key"
- name: AWS_SECRET_ACCESS_KEY
value: "secret"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
triggerer:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: amazon/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*"
sleep 600;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "key"
- name: AWS_SECRET_ACCESS_KEY
value: "secret"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
我正在从 s3 存储桶同步到 POD 内的文件夹。 当我访问 pod 时,dag 文件按预期位于文件夹内:
/opt/airflow/dags/repo/dags/dag_manual.py
但是当我尝试运行任何 dag 时,出现以下错误:
[2023-03-24 23:01:40,968] {dagbag.py:507} INFO - Filling up the DagBag from /opt/airflow/dags/repo/dags/dag_manual.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 360, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 204, in get_dag
f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'DAG_MANUAL' could not be found; either it does not exist or it failed to parse.
我还没有弄清楚这个问题,如果有人能帮忙的话。