SQLAlchemy 的最大缺点似乎是在使用临时表时需要倒退几个步骤。例如,一个非常常见的用例是创建一个特定于一项任务的临时表,在其中放入一些数据,然后对其进行联接。
对于初学者来说,声明临时表是冗长且有限的。请注意,在这个示例中,我必须编辑它,因为我的类实际上继承了一个基类,所以我在这里给出的内容可能略有不正确。
@as_declarative(metaclass=MetaBase)
class MyTempTable(object):
__tablename__ = "temp"
__table_args__ = {'prefixes': ['TEMPORARY']}
id = Column(Integer(), primary_key=True)
person_id = Column(BigInteger())
a_string = Column(String(100))
创建它并不直观:
MyTempTable.__table__.create(session.bind)
我还必须记住明确删除它,除非我做了一些创造性的事情来让它使用 ON COMMIT DROP 进行渲染:
MyTempTable.__table__.drop(session.bind)
此外,除非临时表完成“顶级”,否则我刚刚给出的内容甚至不起作用。我还没有完全弄清楚这一点(因为不想花时间调查为什么它不起作用),但基本上我尝试使用 session.begin_nested() 在嵌套事务中以这种方式创建临时表,并且您最终出现错误,表明该关系不存在。但是,我在一些情况下在嵌套事务中创建临时表以进行单元测试,并且它们工作得很好。检查 echo 输出,发现区别在于,一个在 BEGIN 语句之前渲染,而另一个在 BEGIN 语句之后渲染。这是使用 Postgresql。
在嵌套事务中起作用的,并且坦率地说可以节省你大量时间的,就是输入该死的 sql 并使用 session.execute 执行它。
session.execute(text(
"CREATE TEMPORARY TABLE temp ("
" id SERIAL,"
" person_id BIGINT,"
" a_string TEXT"
") ON COMMIT DROP;"
))
当然,如果这样做,你仍然需要相应的表模型来使用 ORM 功能,或者必须坚持使用原始 sql 查询,这从一开始就违背了 SQLAlchemy 的目的。
我想知道我是否在这里遗漏了一些东西,或者是否有人提出了一个更优雅的解决方案。
我将 ORM 与 Core 一起使用。 ORM 保留用于更高级别的操作。对于大量数据和临时表,Core 更方便。示例:
temptbl_name = 'temp_del_dup_pk_{}'.format(datestamp)
temptbl = Table(temptbl_name, metadata, Column('col1', Integer, index=True),..., extend_existing=True)
temptbl.create(engine)
更新 这是一个简单的函数,可以动态生成临时表 ORM 定义:
def temp_table(name, cols):
args = dict(col1=Column(Integer, index=True),...)
args['__tablename__'] = name
args['__table_args__'] = dict(extend_existing=True)
return type(name, (Base,), args)
镜像现有表的列可能很有用:
def temp_table(name, base_table):
args = {c.name:c.copy() for c in base_table.__table__.c}
...
我决定以this答案为基础,因为我想要一种更灵活的方式来从现有模型创建复制表,同时仍然支持索引定义并与
alembic
*很好地配合。
我发现这种方法对于创建真正的临时表和创建将与主表交换的动态表都很有用。如果定义不完全匹配,您可能会在后者中遇到更棘手的
alembic
场景。
*以我特定的使用模式
import time
import warnings
import sqlalchemy as sa
def copy_table_args(model, **kwargs):
table_args = model.__table_args__
if isinstance(table_args, tuple):
new_args = []
for arg in table_args:
if isinstance(arg, dict):
table_args_dict = arg.copy()
table_args_dict.update(**kwargs)
new_args.append(table_args_dict)
elif isinstance(arg, sa.Index):
index = sa.Index(
arg.name,
*[col for col in arg.columns.keys()],
unique=arg.unique,
**arg.kwargs,
)
new_args.append(index)
elif isinstance(arg, sa.UniqueConstraint):
new_args.append(arg.copy())
else:
# TODO: need to handle other Constraints
raise Exception(f"Unhandled table arg: {arg}")
table_args = tuple(new_args)
elif isinstance(table_args, dict):
table_args = {
k: (v.copy() if hasattr(v, "copy") else v) for k, v in table_args.items()
}
table_args.update(**kwargs)
else:
raise Exception(f"Unexpected __table_args__ type: {table_args}")
return table_args
def copy_table_from_model(conn, model, **kwargs):
model_name = model.__name__ + "Tmp"
table_name = model.__table__.name + "_" + str(time.time()).replace(".", "_")
table_args = copy_table_args(model, extend_existing=True, **kwargs)
args = {c.name: c.copy() for c in model.__table__.c}
args["__tablename__"] = table_name
args["__table_args__"] = table_args
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=sa.exc.SAWarning)
copy_model = type(model_name, model.__bases__, args)
copy_model.__table__.create(conn)
return copy_model
def temp_table_from_model(conn, model):
return copy_table_from_model(conn, model, prefixes=["TEMPORARY"])
注意:我没有添加逻辑来处理复制约束,并且这是针对 MySQL 进行的简单测试。
如今(SQLAlchemy v2.0+),使用临时表没有问题中描述的那么困难。
创建它并不直观:
临时表可以直接通过其
create
方法或通过其相关 metadata
实例 创建
我还必须记住显式删除它,除非我做了一些创造性的事情来让它使用 ON COMMIT DROP 进行渲染:
ON COMMIT DROP
支持
此外,除非临时表完成“顶级”,否则我刚刚给出的内容甚至不起作用...尝试使用
在嵌套事务中以这种方式创建临时表,最终会出现错误session.begin_nested()
如评论中所述[1,2,3]由metatoaster,Grisha S和ddaa,这是因为如果
Table.create
或metadata.create_all
通过引擎,它们将检查一个新连接来完成它们的工作,这通常会与会话事务隔离:它们必须传递封闭会话使用的连接。
此代码片段演示了创建临时表并将其连接到 ORM 实体:
import sqlalchemy as sa
from sqlalchemy import orm
engine = sa.create_engine('postgresql+psycopg2:///test', echo=True)
metadata = sa.MetaData()
# Assume we have an existing table "users", with column "id".
users = sa.Table('users', metadata, autoload_with=engine)
class User:
pass
# Create the User model.
mapper_registry = orm.registry(metadata=metadata)
mapper_registry.map_imperatively(User, users)
# Define a temporary table.
tmp_tbl = sa.Table(
't34889957',
metadata,
sa.Column('id', sa.Integer, primary_key=True),
prefixes=['TEMPORARY'],
postgresql_on_commit='DROP',
)
Session = orm.sessionmaker(engine)
with Session.begin() as s:
with s.begin_nested():
# Create the temporary table
# tmp_tbl.create(s.connection()) would also work
metadata.create_all(s.connection(), tables=[tmp_tbl])
s.execute(tmp_tbl.insert())
q = sa.select(User).join(tmp_tbl, User.id == tmp_tbl.c.id)
for row in s.execute(q):
print(row)