Upsert 与 sqlalchemy 2.x 和 Postgresql 的冲突

问题描述 投票:0回答:1

我正在使用 python 查询外部 api,转换数据并在内部将其写入 postgresql 数据库。

在此过程中,我使用 pandas 将 api 的结果与数据库中的现有数据进行比较,并生成一个数据帧,其中包含新记录和在一个数据帧中已更改的现有记录。

我想做的是,将数据框或字典交给 sqlalchemy 并以如下方式处理它:

  1. 新记录刚刚添加
  2. 现有记录已更新

这就是我的方法(我是Python新手,所以请对我的初学者技能保持耐心......)

def update_absence(year):
    api_result = get_absence(year)
    db_result = get_database_absence(year)
    df = compare_dataframes(api_result, db_result, 'id')
    metadata_obj = MetaData()
    metadata_obj.reflect(bind=engine)
    some_table = Table("tb_absence", metadata_obj, autoload_with=engine)

    for item in df.to_dict('records'):
        insert_stmt = insert(some_table).values(item).on_conflict_do_update(constraint='tb_absence_pkey', set_=item)
        print(insert_stmt.compile())
        with engine.connect() as conn:
            result = conn.execute(insert_stmt)
            print(result.rowcount)

    conn.commit()

insert_stmt.compile() 的输出如下:

INSERT INTO tb_absence (id, start_date, end_date, half_day, morning, user_id, employee_id, type, extra_vacation, state, substitute_state, workdays, hours, medical_certificate, comments, substitute_user_id, name) VALUES (%(id)s, %(start_date)s, %(end_date)s, %(half_day)s, %(morning)s, %(user_id)s, %(employee_id)s, %(type)s, %(extra_vacation)s, %(state)s, %(substitute_state)s, %(workdays)s, %(hours)s, %(medical_certificate)s, %(comments)s, %(substitute_user_id)s, %(name)s) ON CONFLICT ON CONSTRAINT tb_absence_pkey DO UPDATE SET id = %(param_1)s, start_date = %(param_2)s, end_date = %(param_3)s, half_day = %(param_4)s, morning = %(param_5)s, user_id = %(param_6)s, employee_id = %(param_7)s, type = %(param_8)s, extra_vacation = %(param_9)s, state = %(param_10)s, substitute_state = %(param_11)s, workdays = %(param_12)s, hours = %(param_13)s, medical_certificate = %(param_14)s, comments = %(param_15)s, substitute_user_id = %(param_16)s, name = %(param_17)s

对于我迭代的每个项目,行计数都是 1(一旦我理解了该方法,打印语句对于真正的日志条目将消失)。但是,数据库永远不会更新。我无法真正理解冲突执行更新的事情以及如何处理连接和引擎。

我想我在理解它时遇到了一些基本问题,并且我在 sqlalchemy 教程中找到的该部分的示例对我来说很难理解,因为它们只提供了解决方案的小片段和片段。我可能需要一个完整的工作示例。另外,在这里回顾其他问题并没有让我产生很好的理解。

用于识别数据集差异的所有方法都运行良好。

我非常感谢任何可以帮助我进步的提示。

python-3.x postgresql sqlalchemy upsert
1个回答
0
投票

通过循环

for item in df.to_dict('records'):
,您将为每一行创建并发送单独的 INSERT。例如,我的桌子......

some_table = sa.Table(
    "thing",
    sa.MetaData(),
    sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
    sa.Column("txt", sa.String),
)

...和数据框...

df = pd.DataFrame([(1, "txt_1"), (2, "txt_2")], columns=["id", "txt"])

...

engine.echo = True
显示您的代码结果为

INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
[no key 0.00090s] {'id': 1, 'txt': 'txt_1', 'param_1': 1, 'param_2': 'txt_1'}
INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET id = %(param_1)s, txt = %(param_2)s
[no key 0.00066s] {'id': 2, 'txt': 'txt_2', 'param_1': 2, 'param_2': 'txt_2'}

我们可以使用文本 SQL 将其转换为“executemany”

sql = sa.text("INSERT INTO thing (id, txt) VALUES (:id, :txt) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = :txt")
with engine.begin() as conn:
    conn.execute(sql, df.to_dict("records"))

产生

INSERT INTO thing (id, txt) VALUES (%(id)s, %(txt)s) ON CONFLICT ON CONSTRAINT thing_pkey DO UPDATE SET txt = %(txt)s
[generated in 0.00086s] [{'id': 1, 'txt': 'txt_1'}, {'id': 2, 'txt': 'txt_2'}]
© www.soinside.com 2019 - 2024. All rights reserved.