我检查数据库表和 pandas df 中是否有重叠的 ID,对于这些 ID,尝试使用 sqlalchemy.update 和 sqlalchemy.bindparam 功能来更新记录:
def update_table(engine, user_table, dataframe: pd.DataFrame):
dataframe = dataframe.add_prefix("b_")
stmt = (
update(user_table)
.where(
user_table.c.ID == bindparam("b_ID")
)
.values(
other_column_a=bindparam("b_other_column_a"),
other_column_b=bindparam("b_other_column_b"),
other_column_c=bindparam("b_other_column_c")
)
)
with engine.connect() as conn:
result = conn.execute(stmt, dataframe.to_dict(orient="records"))
conn.commit()
return result
def main():
# other lines here (engine has already been initialized using sqlalchemy.create_engine())
# Find if there are any IDs to update
ids_to_update = old_ids.intersection(new_ids)
if isinstance(ids_to_update, set) & (len(ids_to_update) != 0):
with engine.connect() as conn:
meta_data = MetaData(schema="uesm")
meta_data.reflect(bind=conn)
user_table = meta_data.tables[f"uesm.{table_name}"]
df_to_update = new_dataset[
new_dataset[id_names[table_name]].isin(ids_to_update)
]
result = update_table(engine, user_table, df_to_update)
print(f"Updated {result.rowcount} records in table {table_name}.")
这里可能出了什么问题?当我运行上面的代码时,控制台中打印的 sqlalchemy.update 功能生成的 SQL 语句看起来是正确的:
[SQL: UPDATE schema_name.table_name SET other_column_a=?, other_column_b=?, other_column_c=? WHERE schema_name.table_name.[ID] = ?]
提供给语句执行的数据的字典看起来也完全正确。可以使用 pd.to_sql 功能将相同的数据插入到相同的目标表中,没有任何问题。但是,我收到以下错误:
sqlalchemy.exc.DataError: (pyodbc.DataError) ('22003', '[22003] [Microsoft][ODBC Driver 18 for SQL Server]数值超出范围 (0) (SQLExecDirectW)')仅供参考:我尝试传入的数值均不大于 3 位数字,并且我尝试将它们传递到十进制 (8,3) 列中。
在评论中,我被要求提供表的 DDL:
CREATE TABLE [schema_name].[table_name](
[ID] [nvarchar](200) NOT NULL,
[valid_from] [date] NOT NULL,
[valid_to] [date] NOT NULL,
[column_A] [nvarchar](200) NOT NULL,
[column_B] [nvarchar](200) NOT NULL,
[column_C] [decimal](8, 3) NOT NULL,
[column_D_flag] [char](1) NOT NULL,
[column_E] [nvarchar](200) NULL,
CONSTRAINT [PK_table_name] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
CONSTRAINT [UNIQUE_UEBERGABESTELLE] UNIQUE NONCLUSTERED
(
[valid_from] ASC,
[valid_to] ASC,
[column_A] ASC,
[column_B] ASC,
[column_C] ASC,
[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
以及该声明的字典摘录:
dict = [
{
"b_ID": "AMAG_Ranshofen 110_110_1.0",
"b_valid_from": Timestamp("2023-01-01 00:00:00"),
"b_valid_to": Timestamp("2050-01-01 00:00:00"),
"b_column_A": "AMAG",
"b_column_B": "Ranshofen 110",
"b_column_C": 110,
"b_column_D_flag": 0,
"b_column_E": nan,
},
{
"b_ID": "EKW_Großraming 110_110_1.0",
"b_valid_from": Timestamp("2023-01-01 00:00:00"),
"b_valid_to": Timestamp("2050-01-01 00:00:00"),
"b_column_A": "EKW",
"b_column_B": "Großraming 110",
"b_column_C": 110,
"b_column_D_flag": 0,
"b_column_E": nan,
}
]
难不成是nan们在制造问题吗?然而,这些仅出现在非数字列中。
.to_dict()
在浮点/小数列中返回“空”值作为
nan
而不是
None
。
>>> df = pd.DataFrame([(1, 1.23), (2, None), (3, 3.14), ], columns=["new_id", "new_d"])
>>> df
new_id new_d
0 1 1.23
1 2 NaN
2 3 3.14
>>> params = df.to_dict(orient="records")
>>> params
[{'new_id': 1, 'new_d': 1.23}, {'new_id': 2, 'new_d': nan}, {'new_id': 3, 'new_d': 3.14}]
>>> null_value = params[1]["new_d"]
>>> type(null_value)
<class 'float'>
>>> repr(null_value)
'nan'
“快速而肮脏”的修复方法是通过将 DataFrame 转储到 JSON 并将其加载回来来创建字典
>>> as_json = df.to_json(orient="records")
>>> as_json
'[{"new_id":1,"new_d":1.23},{"new_id":2,"new_d":null},{"new_id":3,"new_d":3.14}]'
>>> params = json.loads(as_json)
>>> params
[{'new_id': 1, 'new_d': 1.23}, {'new_id': 2, 'new_d': None}, {'new_id': 3, 'new_d': 3.14}]
>>> null_value = params[1]["new_d"]
>>> type(null_value)
<class 'NoneType'>