从 Dagster 图收集 dyn @op 输出的方法

问题描述 投票:0回答:1

在 @zyd 在 this answer 中对 Dagster 中的并行深度优先执行提供了非常感谢的帮助后,我现在正在寻找一种方法来在收集的图形运行结果上运行

@op
,或者至少等待它们全部完成的一个,因为它们本身没有硬依赖关系。我的工作代码如下:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

@op # no dep since 1st task
def load_csv_into_duckdb(context, csv_filename)

@op(ins={"start":In(Nothing)}
def transform_dates(context, csv_filename)

@op(ins={"start":In(Nothing)}
def from_code_2_categories(context, csv_filename)

@op(ins={"start":In(Nothing)}
def export_2_parquet(context, csv_filename)

@op(ins={"start":In(Nothing)}
def profile_dataset(context, csv_filename)

@graph
def process(context, csv_filename:str):
  task1 = load_csv_into_duckdb(context=context, csv_filename=csv_filename)
  task2 = transform_dates(start=task1, context=context, csv_filename=csv_filename)
  task3 = from_code_2_categories(start=task2, context=context, csv_filename=csv_filename)
  task4 = export_2_parquet(start=task3, context=context, csv_filename=csv_filename)
  profile_dataset(start=task4, context=context, csv_filename=csv_filename)

  
@job
def pipeline():
  csv_filename_list = get_csv_filenames()
  generate_subtasks(csv_filename_list).map(process)

我尝试过

.map(process).collect()
方法,但 Dagster 抱怨
Nonetype
没有属性
collect
。但是,我在网上看到了几个相同方法的示例,显然它应该有效。

我还尝试过使用

@graph
返回单个任务返回值的列表,但 DagsterUI 抱怨图形装饰函数应该返回带有映射键的字典。我可以构建它,但我觉得我应该从 Dagster 的执行上下文中获取它,我不知道如何从图形函数中访问它。

有人指点一下吗?

python pipeline duckdb dagster data-engineering
1个回答
0
投票

这是一个对我有用的例子:

from dagster import Definitions, op, DynamicOutput, graph, GraphOut, DynamicOut


@op
def a_op(path):
    return path

@op
def op2(path):
    return path

@op
def op3(path):
    return path

@op(out=DynamicOut(str))
def mapper():
    for i in range(10):
        yield DynamicOutput(str(i), mapping_key=str(i))

# I think what you were missing is returning the output from the graph here
@graph(out={"out": GraphOut()})
def nested(path: str):
    return op2(a_op(path))

@op
def consumer(context, paths: list[str]):
    context.log.info(paths)


@graph
def the_graph():
    consumer(mapper().map(nested).collect())

the_job = the_graph.to_job()

defs = Definitions(
    jobs=[the_job],
)

图实际上只是一个用于对操作进行分组的组织概念。在运行时,嵌套图被展平为整个作业的单个图。这意味着如果您想在嵌套图中使用操作的输出,则需要从嵌套图中返回操作的输出。然后其他一切都与任何其他 op->op 依赖项一样工作。

© www.soinside.com 2019 - 2024. All rights reserved.