(也发布在 r/dagster 上)
这里是 Dagster N00b。
我有一个非常具体的用例。我的 ETL 执行以下步骤:
.parquet
文件为了方便起见,DuckDB 表的命名与 CSV 文件相同。
2a 到 2e 可以针对每个 CSV 文件并行完成。在单个 CSV 文件的上下文中,它们需要串行运行。
我当前的代码是:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
def load_csv_into_duckdb(context, csv_filename)
def transform_dates(context, csv_filename)
def from_code_2_categories(context, csv_filename)
def export_2_parqu
如果我理解正确的话,你想要深度优先处理,而不是广度优先?我认为您可能能够在动态输出步骤之后使用嵌套图触发深度优先处理。从概念上讲,您还缺少如何在 Dagster 中设置操作之间的依赖关系。像这样的东西应该有效:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
@op
def load_csv_into_duckdb(context, csv_filename)
...
return csv_filename
@op
def transform_dates(context, csv_filename)
...
return csv_filename
@op
def from_code_2_categories(context, csv_filename)
...
return csv_filename
@op
def export_2_parquet(context, csv_filename)
...
return csv_filename
@op
def profile_dataset(context, csv_filename)
...
return csv_filename
@graph
def process(context, csv_filename:str):
profile_dataset(export_2_parquet(from_code_2_categories(transform_dates(load_csv_into_duckdb(csv_filename)))))
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)