使用
dagster
我想在单个文件中创建一组可重用资产,我希望能够传递国家/地区参数并为每个国家/地区/时间表制定不同的时间表。我知道如何做到这一点的唯一方法是为每个国家/地区设置单独的资产文件,然后在 __init__.py
文件中为每个国家定义时间表。
下面是一个示例资产,其中除了国家/地区变量之外,所有资产代码都是相同的,我不确定如何将国家/地区从
Config
传递到共享 data_loader
对象:
country = 'JP'
data_loader= DataLoader(country=country)
@asset(partitions_def=daily_partitions_def)
def my_asset(context):
dt = context.partition_key
return RunMyAsset(data_loader=data_loader, country=country).calc(dt)
@asset(partitions_def=daily_partitions_def)
def my_asset2(context):
dt = context.partition_key
return RunMyAsset2(data_loader=data_loader, country=country).calc(dt)
理想情况下,我想重用上述资产,而不是为每个国家/地区创建具有单独资产的单独文件。
您可以使用静态或动态分区定义
https://docs.dagster.io/_apidocs/partitions
from dagster import StaticPartitionsDefinition, asset
oceans_partitions_def = StaticPartitionsDefinition(
["arctic", "atlantic", "indian", "pacific", "southern"]
)
@asset(partitions_def=oceans_partitions_defs)
def ml_model_for_each_ocean():
...