一种并行运行 Dagster 操作的方法,在 UI 中也可见

问题描述 投票:0回答:1

(也发布在 r/dagster 上)

这里是 Dagster N00b。

我有一个非常具体的用例。我的 ETL 执行以下步骤:

  1. 查询数据库以获取 CSV 文件列表
  2. 转到文件系统并针对每个 CSV 文件:
  • 将其加载到DuckDB中
  • 将一些列转换为日期
  • 将一些数字代码转换为文本类别
  • 将干净的表导出到
    .parquet
    文件
  • 运行干净数据的配置文件报告

为了方便起见,DuckDB 表的命名与 CSV 文件相同。

2a 到 2e 可以针对每个 CSV 文件并行完成。在单个 CSV 文件的上下文中,它们需要串行运行。

我当前的代码是:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

def load_csv_into_duckdb(context, csv_filename)

def transform_dates(context, csv_filename)

def from_code_2_categories(context, csv_filename)

def export_2_parquet(context, csv_filename)

def profile_dataset(context, csv_filename)

@op
def process(context, csv_filename:str):
  load_csv_into_duckdb(context, csv_filename)
  transform_dates(context, csv_filename)
  from_code_2_categories(context, csv_filename)
  export_2_parquet(context, csv_filename)
  profile_dataset(context, csv_filename)
  
@job
def pipeline():
  csv_filename_list = get_csv_filenames()
  generate_subtasks(csv_filename_list).map(process)

管道运行,但实际执行加载到 DuckDB、转换和导出到 parquet 的函数“隐藏”在

process()
操作中。

有没有办法在遵循 Dagster 的最佳实践的同时正确地进行模块化?我想将我的

process()
定义为
graph
并将我的
job
定义为
graph
的执行,同时能够在 DagsterUI 中查看各个任务,以便我只能重新运行那些失败的。

我尝试过

generate_subtasks(csv_filename_list).map(load_csv_into_duckdb).map(transform_dates).map(from_code_2_categories).map(...)
路线,但任务不会等待前一个任务完成后再启动。

愿意帮忙吗?

python pipeline duckdb dagster data-engineering
1个回答
0
投票

如果我理解正确的话,你想要深度优先处理,而不是广度优先?我认为您可能能够在动态输出步骤之后使用嵌套图触发深度优先处理。像这样的东西:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

@op
def load_csv_into_duckdb(context, csv_filename)

@op
def transform_dates(context, csv_filename)

@op
def from_code_2_categories(context, csv_filename)

@op
def export_2_parquet(context, csv_filename)

@op
def profile_dataset(context, csv_filename)

@graph
def process(context, csv_filename:str):
  load_csv_into_duckdb(context, csv_filename)
  transform_dates(context, csv_filename)
  from_code_2_categories(context, csv_filename)
  export_2_parquet(context, csv_filename)
  profile_dataset(context, csv_filename)
  
@job
def pipeline():
  csv_filename_list = get_csv_filenames()
  generate_subtasks(csv_filename_list).map(process)
© www.soinside.com 2019 - 2024. All rights reserved.