我正在设置一个使用 dagster 中的 Config 的管道,以便用户可以在运行管道时输入用户定义的参数。我的用例是运行 SQL 查询来提取数据,用户可以定义数据提取的开始和结束日期。将结果从该资产传递到另一个资产时,我遇到配置错误。
这是有关运行配置的 dagster 文档:https://docs.dagster.io/concepts/configuration/config-schema
我用下面的简单脚本重现了该错误。
当我在 dagster UI 中运行此脚本时,它提示我缺少脚手架,然后生成脚手架,我可以输入参数 - 在本例中,我将“Bananas”添加到fruit_select参数中。该参数在 filter_data 资产中使用。然后,我想在以下名为 filter_again 的资产中使用该资产(即 df2)的结果。
这是我不断遇到的错误:
dagster._core.errors.DagsterInvalidConfigError: Error in config for op Error 1: Missing required config entry "config" at the root. Sample config for missing entry: {'config': {'fruit_select': '...'}}
任何帮助将不胜感激!我的目标是让用户在运行管道时定义一个参数,该参数在其中一个资产中使用来提取数据或操作数据,然后将这些结果传递到另一个资产以执行另一项任务。
import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize
@asset
def generate_dataset():
# Function to generate random dates
def random_dates(start_date, end_date, n=10):
date_range = end_date - start_date
random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
return random_dates
# Set seed for reproducibility
random.seed(42)
# Define the number of rows
num_rows = 100
# Generate random data
fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
fruit_column = [random.choice(fruits) for _ in range(num_rows)]
units_column = [random.randint(1, 10) for _ in range(num_rows)]
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)
date_column = random_dates(start_date, end_date, num_rows)
# Create a DataFrame
df = pd.DataFrame({
'fruit': fruit_column,
'units': units_column,
'date': date_column
})
# Display the DataFrame
print(df)
return df
class fruit_config(Config):
fruit_select: str
@asset(deps=[generate_dataset])
def filter_data(config: fruit_config):
df = generate_dataset()
df2 = df[df['fruit'] == config.fruit_select]
print(df2)
return df2
@asset(deps=[filter_data])
def filter_again():
df2 = filter_data()
df3 = df2[df2['units'] > 5]
print(df3)
return df3
我意识到代码只是被错误地实现了......这个 dagster 文档页面有点帮助https://docs.dagster.io/concepts/assets/software-define-assets 但演示视频 https://www.youtube.com/watch?v=lRwpcyd6w8k 更有帮助。
本质上,你必须在下游资产中传递上游资产,并且有一个重要的部分是分配资产的预期回报输出。更正后的代码如下:
import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, MaterializeResult, MetadataValue, Config, materialize
@asset
def generate_dataset() -> pd.DataFrame:
# Function to generate random dates
def random_dates(start_date, end_date, n=10):
date_range = end_date - start_date
random_dates = [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]
return random_dates
# Set seed for reproducibility
random.seed(42)
# Define the number of rows
num_rows = 100
# Generate random data
fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']
fruit_column = [random.choice(fruits) for _ in range(num_rows)]
units_column = [random.randint(1, 10) for _ in range(num_rows)]
start_date = datetime(2022, 1, 1)
end_date = datetime(2022, 12, 31)
date_column = random_dates(start_date, end_date, num_rows)
# Create a DataFrame
df = pd.DataFrame({
'fruit': fruit_column,
'units': units_column,
'date': date_column
})
# Display the DataFrame
print(df)
return df
class fruit_config(Config):
fruit_select: str
@asset
def filter_data(generate_dataset: pd.DataFrame, config: fruit_config) -> pd.DataFrame:
generate_dataset = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
print(generate_dataset)
return generate_dataset
@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
filter_data = filter_data[filter_data['units'] > 5]
print(filter_data)
return filter_data