在 @zyd 在 this answer 中对 Dagster 中的并行深度优先执行提供了非常感谢的帮助后,我现在正在寻找一种方法来在收集的图形运行结果上运行
@op
,或者至少等待它们全部完成的一个,因为它们本身没有硬依赖关系。我的工作代码如下:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
@op # no dep since 1st task
def load_csv_into_duckdb(context, csv_filename)
@op(ins={"start":In(Nothing)}
def transform_dates(context, csv_filename)
@op(ins={"start":In(Nothing)}
def from_code_2_categories(context, csv_filename)
@op(ins={"start":In(Nothing)}
def export_2_parquet(context, csv_filename)
@op(ins={"start":In(Nothing)}
def profile_dataset(context, csv_filename)
@graph
def process(context, csv_filename:str):
task1 = load_csv_into_duckdb(context=context, csv_filename=csv_filename)
task2 = transform_dates(start=task1, context=context, csv_filename=csv_filename)
task3 = from_code_2_categories(start=task2, context=context, csv_filename=csv_filename)
task4 = export_2_parquet(start=task3, context=context, csv_filename=csv_filename)
profile_dataset(start=task4, context=context, csv_filename=csv_filename)
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)
我尝试过
.map(process).collect()
方法,但 Dagster 抱怨 Nonetype
没有属性 collect
。但是,我在网上看到了几个相同方法的示例,显然它应该有效。
我还尝试过使用
@graph
返回单个任务返回值的列表,但 DagsterUI 抱怨图形装饰函数应该返回带有映射键的字典。我可以构建它,但我觉得我应该从 Dagster 的执行上下文中获取它,我不知道如何从图形函数中访问它。
有人指点一下吗?
这是一个对我有用的例子:
from dagster import Definitions, op, DynamicOutput, graph, GraphOut, DynamicOut
@op
def a_op(path):
return path
@op
def op2(path):
return path
@op
def op3(path):
return path
@op(out=DynamicOut(str))
def mapper():
for i in range(10):
yield DynamicOutput(str(i), mapping_key=str(i))
# I think what you were missing is returning the output from the graph here
@graph(out={"out": GraphOut()})
def nested(path: str):
return op2(a_op(path))
@op
def consumer(context, paths: list[str]):
context.log.info(paths)
@graph
def the_graph():
consumer(mapper().map(nested).collect())
the_job = the_graph.to_job()
defs = Definitions(
jobs=[the_job],
)
图实际上只是一个用于对操作进行分组的组织概念。在运行时,嵌套图被展平为整个作业的单个图。这意味着如果您想在嵌套图中使用操作的输出,则需要从嵌套图中返回操作的输出。然后其他一切都与任何其他 op->op 依赖项一样工作。