我用ORM系统开发了一个数据库来管理金融市场数据。
我将 Python 与 SQLAlchemy 和 SQLite 结合使用。
我需要从 market_data 表中检索每个资产的最新记录,提取其日期和时间,从服务器请求更新的数据,然后将新数据插入到 market_data 表中。我计划每分钟多次执行此操作,因此速度至关重要。但是,当前查询需要几分钟才能执行。我发现每个资产的过滤过程是主要瓶颈;如果没有它,查询会立即完成,但返回不正确的结果。
我认为将每个资产的数据存储在单独的表中会更有效,但我无法为每个资产创建特定的类,因为它们可以在运行时动态添加。
有没有一种方法可以从一个 ORM 类生成多个表?或者,您对增强数据库结构以提高查询速度有什么建议吗?
数据库的当前模型:
class Date(Base):
__tablename__ = "dates"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
date: Mapped[datetime.date] = mapped_column(Date, nullable=False, unique=True)
class Time(Base):
__tablename__ = "times"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
time: Mapped[datetime.time] = mapped_column(Time, nullable=False, unique=True)
class Region(Base):
__tablename__ = "regions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
name: Mapped[str] = mapped_column(String(30), nullable=False, unique=True)
markets: Mapped[List["Market"]] = relationship()
class Market(Base):
__tablename__ = "markets"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
name: Mapped[str] = mapped_column(String(20), nullable=False)
region: Mapped[int] = mapped_column(ForeignKey("regions.id"))
assets: Mapped[List["Asset"]] = relationship()
class Asset(Base):
__tablename__ = "assets"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
symbol: Mapped[str] = mapped_column(String(10), nullable=False, unique=True)
name: Mapped[str] = mapped_column(String(30), nullable=True)
market: Mapped[int] = mapped_column(ForeignKey("markets.id"))
class MarketData(Base):
__tablename__ = "market_data"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
date: Mapped[int] = mapped_column(ForeignKey("dates.id"))
time: Mapped[int] = mapped_column(ForeignKey("times.id"))
asset: Mapped[int] = mapped_column(ForeignKey("assets.id"))
opening: Mapped[float] = mapped_column(Float, nullable=False)
high: Mapped[float] = mapped_column(Float, nullable=False)
low: Mapped[float] = mapped_column(Float, nullable=False)
closing: Mapped[float] = mapped_column(Float, nullable=False)
volume: Mapped[float] = mapped_column(Float, nullable=True)
我测试获取一项资产的最后 x 条记录的查询:
session.query(MarketData).filter_by(asset=asset.id).order_by(desc(MarketData.id)).limit(10).all()
mapped_class = type("Asset", (Asset, Base), {"__tablename__": "assets"})
>>> "assets" in Base.metadata.tables
True
>>> from sqlalchemy.sql.schema import Table
>>> isinstance(Base.metadata.tables["assets"], Table)
True
>>> Base.metadata.tables["assets"] is mapped_class.__table__
True
如果每个资产都有不同的表,只需更改表名称,您就可以创建引用该表的另一个 ORM 类。
type("Asset", (Asset, Base), {"__tablename__": "another_assets"})
>>> isinstance(Base.metadata.tables["another_assets"], Table)
True
对于给定的表名,
Table
中的metadata
一次只能存在一个实例:
>>> type("Asset", (Asset, Base), {"__tablename__": "another_assets"})
{InvalidRequestError}InvalidRequestError("Table 'another_assets' is already defined for this MetaData instance. Specify 'extend_existing=True' to redefine options and columns on an existing Table object.")
>>> from sqlalchemy import select
>>> session.scalars(select(mapped_class).filter_by(name="asset_name")).one_or_none()
如果之前创建的话,您还可以在此处使用
Table
实例:
session.scalars(select(Base.metadata.tables["assets"]).filter_by(name="asset_name")).one_or_none()
仅通过在
Table
中加载 metadata
的实例不足以发送查询。您的表也必须存在于数据库中。通过使用之前创建的 ORM 类,您还可以动态创建表。
from sqlalchemy import create_engine
engine = create_engine(url=...)
mapped_class = type("Asset", (Asset, Base), {"__tablename__": "new_assets"})
with engine.begin() as conn:
mapped_class.__table__.create(bind=conn, checkfirst=True)