from sqlalchemy.orm import sessionmaker
Session = sessionmaker(engine)
with Session() as session:
"""Here upsert functionality"""
session.insert_or_update(company)
session.commit()
工作,因为我只需要检查主键而不是其他唯一值。 文献说:
session.merge()检查源实例的主要密钥属性,并试图将其与会话中同一主键的实例调和。如果在本地找不到,它将尝试根据主键从数据库加载对象,并且如果无法找到对象,则会创建一个新实例。然后将源实例上每个属性的状态复制到目标实例。然后由该方法返回所得的目标实例。原始源实例未修改,如果还没有,则与会话不相关。 我如何为多个对象执行此操作?
您已经注意到,
Session.merge()
将以对象为基础完成任务。例如,如果我们有
class Thing(Base):
__tablename__ = "thing"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=False)
txt: Mapped[str] = mapped_column(String(50))
my_thing = Thing(id=1, txt="foo")
with Session(engine) as sess:
sess.merge(my_thing)
sess.commit()
,但是,我们不想为大量对象做到这一点,例如,rows_to_upsert = 4000
things = [Thing(id=i, txt=f"txt_{i}") for i in range(rows_to_upsert)]
with Session(engine, autoflush=False) as sess:
t0 = time.perf_counter()
for thing in things:
sess.merge(thing)
sess.commit()
print(
f"merge: {rows_to_upsert:,} rows upserted in {(time.perf_counter() - t0):0.1f} seconds"
)
由于将导致服务器4000往返以执行每个对象的选择,这将很慢。在我的测试中,上升4000行大约需要40秒,或大约100行/秒。
,我们应该将对象列表转换为
dict
列表
list_of_dict = [dict(id=thing.id, txt=thing.txt) for thing in things]
然后使用
INSERT … ON DUPLICATE KEY
语句
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(Thing).values(list_of_dict)
on_duplicate_stmt = insert_stmt.on_duplicate_key_update(
dict(txt=insert_stmt.inserted.txt)
)
with Session(engine) as sess:
t0 = time.perf_counter()
sess.execute(on_duplicate_stmt)
sess.commit()
print(
f"insert: {rows_to_upsert:,} rows upserted in {(time.perf_counter() - t0):0.1f} seconds"
)
仅需2秒钟,即大约2000行/秒。