我正在创建一个 dag,它应该执行以下操作:
下面的代码是我尝试做我想做的事:
import re
from datetime import datetime
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue
leagues = ["league1", "league2", "league3"]
@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
)
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)
task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)
task_fetch_ids >> task_fetch_detail
上面的方法显然不起作用,因为我正在循环一个字符串。 正确的语法是什么?
您必须将 xcom 返回调整为动态任务映射运算符的参数
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
dag = DAG(
dag_id="docker_dag",
schedule_interval=None,
start_date=days_ago(1),
)
with dag:
def fn_get_work():
return ["a", "b", "c"]
get_work_task = PythonOperator(task_id="get_work",
python_callable=fn_get_work
)
def fn_build(work):
rst = []
for i in work:
rst.append(f"fetch-event --event-id {i}")
return rst
build_work_task = PythonOperator(task_id="build_work",
python_callable=fn_build,
op_kwargs={"work": get_work_task.output})
run_work_task = DockerOperator.partial(
task_id="run_work",
image="alpine:3.16.2",
).expand(command=build_work_task.output)
get_work_task >> build_work_task >> run_work_task