Dagster 使用 Config 作为用户定义参数并将资产结果传递给下游资产时出现错误

问题描述 投票:0回答:1

我正在设置一个使用 dagster 中的 Config 的管道,以便用户可以在运行管道时输入用户定义的参数。我的用例是运行 SQL 查询来提取数据,用户可以定义数据提取的开始和结束日期。将结果从该资产传递到另一个资产时,我遇到配置错误。

这是有关运行配置的 dagster 文档:https://docs.dagster.io/concepts/configuration/config-schema

我用下面的简单脚本重现了该错误。

当我在 dagster UI 中运行此脚本时,它提示我缺少脚手架,然后生成脚手架,我可以输入参数 - 在本例中,我将“Bananas”添加到fruit_select参数中。该参数在 filter_data 资产中使用。然后,我想在以下名为 filter_again 的资产中使用该资产(即 df2)的结果。

这是我不断遇到的错误:

dagster._core.errors.DagsterInvalidConfigError: Error in config for op Error 1: Missing required config entry "config" at the root. Sample config for missing entry: {'config': {'fruit_select': '...'}}

任何帮助将不胜感激!我的目标是让用户在运行管道时定义一个参数,该参数在其中一个资产中使用来提取数据或操作数据,然后将这些结果传递到另一个资产以执行另一项任务。

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize

@asset 
def generate_dataset():
    # Function to generate random dates
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
        return random_dates
    
    # Set seed for reproducibility
    random.seed(42)
    
    # Define the number of rows
    num_rows = 100
    
    # Generate random data
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
    fruit_column = [random.choice(fruits) for _ in range(num_rows)]
    units_column = [random.randint(1, 10) for _ in range(num_rows)]
    start_date = datetime(2022, 1, 1)
    end_date = datetime(2022, 12, 31)
    date_column = random_dates(start_date, end_date, num_rows)
    
    # Create a DataFrame
    df = pd.DataFrame({
        'fruit': fruit_column,
        'units': units_column,
        'date': date_column
    })
    
    # Display the DataFrame
    print(df)
    return df

class fruit_config(Config):
    fruit_select: str 

@asset(deps=[generate_dataset]) 
def filter_data(config: fruit_config):
    df = generate_dataset()
    df2 = df[df['fruit'] == config.fruit_select]
    print(df2)
    return df2

@asset(deps=[filter_data]) 
def filter_again():
    df2 = filter_data()
    df3 = df2[df2['units'] > 5]
    print(df3)
    return df3
python config assets dagster
1个回答
0
投票

我意识到代码只是被错误地实现了......这个 dagster 文档页面有点帮助https://docs.dagster.io/concepts/assets/software-define-assets 但演示视频 https://www.youtube.com/watch?v=lRwpcyd6w8k 更有帮助。

本质上,你必须在下游资产中传递上游资产,并且有一个重要的部分是分配资产的预期回报输出。更正后的代码如下:

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize

@asset 
def generate_dataset() -> pd.DataFrame:
    # Function to generate random dates
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
        return random_dates
    
    # Set seed for reproducibility
    random.seed(42)
    
    # Define the number of rows
    num_rows = 100
    
    # Generate random data
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
    fruit_column = [random.choice(fruits) for _ in range(num_rows)]
    units_column = [random.randint(1, 10) for _ in range(num_rows)]
    start_date = datetime(2022, 1, 1)
    end_date = datetime(2022, 12, 31)
    date_column = random_dates(start_date, end_date, num_rows)
    
    # Create a DataFrame
    df = pd.DataFrame({
        'fruit': fruit_column,
        'units': units_column,
        'date': date_column
    })
    
    # Display the DataFrame
    print(df)
    return df

class fruit_config(Config):
    fruit_select: str 

@asset 
def filter_data(generate_dataset: pd.DataFrame, config: fruit_config) -> pd.DataFrame:
    generate_dataset = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
    print(generate_dataset)
    return generate_dataset

@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
    filter_data = filter_data[filter_data['units'] > 5]
    print(filter_data)
    return filter_data
© www.soinside.com 2019 - 2024. All rights reserved.