(也发布在 r/dagster 上)
这里是 Dagster N00b。
我有一个非常具体的用例。我的 ETL 执行以下步骤:
.parquet
文件为了方便起见,DuckDB 表的命名与 CSV 文件相同。
2a 到 2e 可以针对每个 CSV 文件并行完成。在单个 CSV 文件的上下文中,它们需要串行运行。
我当前的代码是:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
def load_csv_into_duckdb(context, csv_filename)
def transform_dates(context, csv_filename)
def from_code_2_categories(context, csv_filename)
def export_2_parquet(context, csv_filename)
def profile_dataset(context, csv_filename)
@op
def process(context, csv_filename:str):
load_csv_into_duckdb(context, csv_filename)
transform_dates(context, csv_filename)
from_code_2_categories(context, csv_filename)
export_2_parquet(context, csv_filename)
profile_dataset(context, csv_filename)
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)
管道运行,但实际执行加载到 DuckDB、转换和导出到 parquet 的函数“隐藏”在
process()
操作中。
有没有办法在遵循 Dagster 的最佳实践的同时正确地进行模块化?我想将我的
process()
定义为 graph
并将我的 job
定义为 graph
的执行,同时能够在 DagsterUI 中查看各个任务,以便我只能重新运行那些失败的。
我尝试过
generate_subtasks(csv_filename_list).map(load_csv_into_duckdb).map(transform_dates).map(from_code_2_categories).map(...)
路线,但任务不会等待前一个任务完成后再启动。
愿意帮忙吗?
如果我理解正确的话,你想要深度优先处理,而不是广度优先?我认为您可能能够在动态输出步骤之后使用嵌套图触发深度优先处理。像这样的东西:
@op
def get_csv_filenames(context) -> List[str]:
@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
for csv_filename in csv_list:
yield DynamicOutput(csv_filename, mapping_key=csv_filename)
@op
def load_csv_into_duckdb(context, csv_filename)
@op
def transform_dates(context, csv_filename)
@op
def from_code_2_categories(context, csv_filename)
@op
def export_2_parquet(context, csv_filename)
@op
def profile_dataset(context, csv_filename)
@graph
def process(context, csv_filename:str):
load_csv_into_duckdb(context, csv_filename)
transform_dates(context, csv_filename)
from_code_2_categories(context, csv_filename)
export_2_parquet(context, csv_filename)
profile_dataset(context, csv_filename)
@job
def pipeline():
csv_filename_list = get_csv_filenames()
generate_subtasks(csv_filename_list).map(process)