[Airflow]:使用 Xcoms 在 docker 容器上应用动态任务映射

问题描述 投票:0回答:1

我正在创建一个 dag,它应该执行以下操作:

  • 获取事件ID
  • 对于每个事件 ID,获取事件详细信息

这两个步骤使用 docker 容器,因为我不想使用 PythonOperator,因为我想在某个时刻部署项目,并且不希望在与 Airflow 运行的实例相同的实例上完成处理.

下面的代码是我尝试做我想做的事:

import re
from datetime import datetime

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue


leagues = ["league1", "league2", "league3"]


def extract_event_ids_from_logs(ti, previous_task):
    logs = ti.xcom_pull(key="return_value", task_ids=previous_task)

    match = re.search(r"\[([0-9, ]+)\]", logs)
    if match:
        a = [int(id.strip()) for id in match.group(1).split(",")]
        return a
    return []


def merge_ids(ti, tasks):
    ids = []
    for ti_ in tasks:
        val = ti.xcom_pull(key="return_value", task_ids=ti_)
        ids.extend(val)
    return ids


@dag(
    start_date=datetime(2024, 11, 1),
    schedule="@daily",
    catchup=False,
    tags=["scraping"],
)
def scraping():
    tasks = []

    for league in leagues:
        fetch_league_events = DockerOperator(
            task_id=f"fetch_{league}_events",
            max_active_tis_per_dag=1,
            image="image:v1",
            command=f"fetch-event-ids-from-league --league {league}",
            api_version="auto",
            auto_remove=True,
            tty=True,
            xcom_all=False,
            mount_tmp_dir=False,
        )
        extract_ids = PythonOperator(
            task_id=f"extract_ids_{league}",
            python_callable=extract_event_ids_from_logs,
            op_kwargs={"previous_task": fetch_league_events.task_id},
        )

        fetch_league_events >> extract_ids
        tasks.append(extract_ids)

    merge_task = PythonOperator(
        task_id="merge_ids",
        python_callable=merge_ids,
        op_kwargs={"tasks": [o.task_id for o in tasks]},
        # provide_context=False,
    )
    for task_ in tasks:
        task_ >> merge_task

    fetch_event_details = DockerOperator(
        task_id="fetch_event_details",
        image="image:v1",
        ).expand(
            command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='merge_ids', key='return_value') }}"]
        )


    merge_task >> fetch_event_details


scraping()

上面的内容显然不起作用,因为我正在循环一个字符串。 完成我想要实现的目标的正确语法是什么。

我还尝试了不同的事情,例如,我尝试创建一个 python 运算符来获取 XComs 值并使用 docker 运算符进行循环。我没有收到任何错误,但 docker 任务没有运行。

def fetch_event_using_docker(ti):
        event_ids = ti.xcom_pull(task_ids="merge_ids", key="return_value")

        for event_id in event_ids:
            action = DockerOperator(
                task_id=f"fetch_event_{event_id}",
                image="image:v1",
                api_version="auto",
                auto_remove=True,
                tty=True,
                xcom_all=False,
                mount_tmp_dir=False,
                max_active_tis_per_dag=1,
                command=f"fetch-event --event-id {event_id}",
            )

// ... existing code ...
        fetch_task = PythonOperator(
            task_id="fetch_events",
            python_callable=fetch_event_using_docker,
        )

        merge_task >> fetch_task

我尝试在网上查找,但找不到任何有用的信息。我发现这个线程说使用 xcoms 创建 dag 不是一个好主意(here)。我理解逻辑,但是什么是正确的设计模式来实现我想要实现的目标。

python docker airflow airflow-taskflow airflow-xcom
1个回答
0
投票

您必须将 xcom 返回调整为动态任务映射运算符的参数


from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator

dag = DAG(
    dag_id="docker_dag",
    schedule_interval=None,
    start_date=days_ago(1),
)
with dag:
    def fn_get_work():
        return ["a", "b", "c"]


    get_work_task = PythonOperator(task_id="get_work",
                                   python_callable=fn_get_work
                                   )


    def fn_build(work):
        rst = []
        for i in work:
            rst.append(f"fetch-event --event-id {i}")
        return rst


    build_work_task = PythonOperator(task_id="build_work",
                                     python_callable=fn_build,
                                     op_kwargs={"work": get_work_task.output})

    run_work_task = DockerOperator.partial(
        task_id="run_work",
        image="alpine:3.16.2",
    ).expand(command=build_work_task.output)

    get_work_task >> build_work_task >> run_work_task

© www.soinside.com 2019 - 2024. All rights reserved.