[Airflow]:使用 Xcoms 在 DockerOperator 上进行动态任务映射

问题描述 投票:0回答:1

我正在创建一个 dag,它应该执行以下操作:

  • 获取事件ID
  • 对于每个事件 ID,获取事件详细信息 ( DockerOperator )

下面的代码是我尝试做我想做的事:

import re
from datetime import datetime

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue


leagues = ["league1", "league2", "league3"]




@dag(
    start_date=datetime(2024, 11, 1),
    schedule="@daily",
)
    task_fetch_ids = PythonOperator(
        task_id="fetch_detail",
        ...)


    task_fetch_detail = DockerOperator(
        task_id="fetch_detail",
        image="image:v1",
        ).expand(
            command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
        )


    task_fetch_ids >> task_fetch_detail


上面的方法显然不起作用,因为我正在循环一个字符串。 正确的语法是什么?

python docker airflow airflow-taskflow airflow-xcom
1个回答
0
投票

您必须将 xcom 返回调整为动态任务映射运算符的参数


from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator

dag = DAG(
    dag_id="docker_dag",
    schedule_interval=None,
    start_date=days_ago(1),
)
with dag:
    def fn_get_work():
        return ["a", "b", "c"]


    get_work_task = PythonOperator(task_id="get_work",
                                   python_callable=fn_get_work
                                   )


    def fn_build(work):
        rst = []
        for i in work:
            rst.append(f"fetch-event --event-id {i}")
        return rst


    build_work_task = PythonOperator(task_id="build_work",
                                     python_callable=fn_build,
                                     op_kwargs={"work": get_work_task.output})

    run_work_task = DockerOperator.partial(
        task_id="run_work",
        image="alpine:3.16.2",
    ).expand(command=build_work_task.output)

    get_work_task >> build_work_task >> run_work_task

© www.soinside.com 2019 - 2024. All rights reserved.