我正在运行下面的代码。我有一份工作和一份工作,但是当我做 dagster dev 时,我遇到了以下问题。还有另一个操作从表中加载数据,在这里我想将此数据放入另一个表中。正如我通过 dagster 预览看到的那样,数据框正在加载。加载操作以数据帧的形式给出输出。
@op
def save_df_to_db(db: resource.sql_resource, dataframe, custom_table_name: str):
log=get_dagster_logger()
log.info("Connecting to db", custom_table_name)
with db.get().begin() as con:
dataframe.to_sql(custom_table_name, con, if_exists='append', index=False)
...
职位定义
@job
def start_day_job():
delivery_location=import_ops.get_location()
import_ops.save_df_to_db(dataframe=delivery_location, custom_table_name="Delivery_Location")
import_schedule=ScheduleDefinition(
job=start_day_job, cron_schedule="0 7 * * 2-6",execution_timezone="Europe/Zurich"
)
但是我收到此错误。
```在 @job start_day_job 中,在 op 调用“save_df_to_db”中收到输入 (custom_table_name) 的无效类类型 str。在组合期间调用 ndoes 时,必须将先前节点调用的输出或输入传递给复合函数作为输入`
How do i resolve this error?
I have tried the following.
1. Passed as custom parameter.
2. Passed as config schema
Still same issue.
这是因为文字值(在本例中为
pd.DataFrame
和 str
custom_table_name)无法直接传递给运算符。相反,它们必须作为 Out
从另一个运算符返回,或者作为配置值返回。请参阅以下示例,了解如何指定 RunConfig
table_name
。
import pandas as pd
from dagster import op, job, Out, Definitions, Config, RunConfig
from dagster_duckdb import DuckDBResource
class SaveTableConfig(Config):
custom_table_name: str
@op(out=Out(pd.DataFrame))
def create_dataframe():
data = {
"Column1": [1, 2, 3],
"Column2": ["A", "B", "C"],
"Column3": [True, False, True],
}
df = pd.DataFrame(data)
return df
@op
def save_df_to_db(dataframe: pd.DataFrame, db: DuckDBResource, config: SaveTableConfig):
with db.get_connection() as conn:
dataframe.to_sql(
config.custom_table_name, conn, if_exists="append", index=False
)
@job(
config=RunConfig(
ops={"save_df_to_db": SaveTableConfig(custom_table_name="Delivery_Location")}
)
)
def start_day_job():
df = create_dataframe()
save_df_to_db(dataframe=df)
defs = Definitions(
jobs=[start_day_job],
resources={
"db": DuckDBResource(
database="my.duckdb",
)
},
)
在此示例中,我选择使用 DuckDB 来确认一切按预期工作。干杯。