http 运算符和查询字符串的动态任务映射

问题描述 投票:0回答:1

假设我有以下(简化的)dag:我有一个返回一系列查询参数值的任务,并且我想生成一个 httpoperator 的动态任务实例来执行类似

http://example.com?key=value1
http://example.com?key=value2
、 ETC。 但我无法映射运算符中的
data
字段

@task
def get_values():
  return ["value1","value2","value3"]

@dag
def mydag() -> None:
    values = get_values()
    gets = HttpOperator.partial(task_id='gets', method='GET').expand(
        data={ 'key': values }, # expanding in the ui, but "ValueError: too many values to unpack (expected 2)" at runtime
        data=[{'key': value} for value in values], # dag error: "TypeError: 'XComArg' object is not iterable"
    )

我不确定如何设置参数以实际正确使用值对象。

更糟糕的是,在真实的 dag 数据中通常还会引用额外的 qp,其中一些来自其他任务(具有静态映射)

airflow airflow-taskflow
1个回答
0
投票

我认为问题在于它试图扩展“键”和数组值,因此扩展两个参数,而不仅仅是一个数组。 我对 HttpOperator 没有太多经验,但也许可以在部分部分中传递键,然后您可以简单地执行 data=values 。 另一种选择是使用 requests.get,如下所示:

@task
def send_get_request(data):
    response = requests.get(f"https://www.something.com?{data}")
    return response.json()

然后你可以简单地做:

.expand(data=values)

希望有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.