Dagster 中的并行、深度优先操作,将操作、图形和作业结合在一起

问题描述 投票:0回答:1

(也发布在 r/dagster 上)

这里是 Dagster N00b。

我有一个非常具体的用例。我的 ETL 执行以下步骤:

  1. 查询数据库以获取 CSV 文件列表
  2. 转到文件系统并针对每个 CSV 文件:
  • 将其加载到DuckDB中
  • 将一些列转换为日期
  • 将一些数字代码转换为文本类别
  • 将干净的表导出到
    .parquet
    文件
  • 运行干净数据的配置文件报告

为了方便起见,DuckDB 表的命名与 CSV 文件相同。

2a 到 2e 可以针对每个 CSV 文件并行完成。在单个 CSV 文件的上下文中,它们需要串行运行。

我当前的代码是:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

def load_csv_into_duckdb(context, csv_filename)

def transform_dates(context, csv_filename)

def from_code_2_categories(context, csv_filename)

def export_2_parqu
python pipeline duckdb dagster data-engineering
1个回答
1
投票

如果我理解正确的话,你想要深度优先处理,而不是广度优先?我认为您可能能够在动态输出步骤之后使用嵌套图触发深度优先处理。从概念上讲,您还缺少如何在 Dagster 中设置操作之间的依赖关系。像这样的东西应该有效:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

@op
def load_csv_into_duckdb(context, csv_filename)
  ...
  return csv_filename

@op
def transform_dates(context, csv_filename)
  ...
  return csv_filename

@op
def from_code_2_categories(context, csv_filename)
  ...
  return csv_filename

@op
def export_2_parquet(context, csv_filename)
  ...
  return csv_filename

@op
def profile_dataset(context, csv_filename)
  ...
  return csv_filename

@graph
def process(context, csv_filename:str):
  profile_dataset(export_2_parquet(from_code_2_categories(transform_dates(load_csv_into_duckdb(csv_filename)))))

  
@job
def pipeline():
  csv_filename_list = get_csv_filenames()
  generate_subtasks(csv_filename_list).map(process)
© www.soinside.com 2019 - 2024. All rights reserved.