我在多个 CSV 文件中收集了大量金融市场数据,我需要将这些数据存储在数据库中以供将来在我的软件中使用。
我将 Python 与 SQLAlchemy 和 SQLite 结合使用。
这是我的数据以及数据库模型的样子:
Open High Low Close
Timestamp
31798800 0.53690 0.53690 0.53690 0.53690
31802400 0.53690 0.53690 0.53690 0.53690
31885200 0.53660 0.53660 0.53660 0.53660
31888800 0.53660 0.53660 0.53660 0.53660
31971600 0.53650 0.53650 0.53650 0.53650
... ... ... ... ...
1731113460 1.07177 1.07185 1.07174 1.07182
1731113520 1.07182 1.07182 1.07177 1.07178
1731113580 1.07179 1.07187 1.07178 1.07186
1731113640 1.07186 1.07186 1.07178 1.07180
1731113700 1.07181 1.07181 1.07181 1.07181
import datetime
from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy import String, Integer, Float, Date, Time
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class Date(Base):
__tablename__ = "dates"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
date: Mapped[datetime.date] = mapped_column(Date, nullable=False, unique=True)
def __repr__(self) -> str:
return f"Date<id={self.id!r}, date={self.date!r}>"
class Time(Base):
__tablename__ = "times"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
time: Mapped[datetime.time] = mapped_column(Time, nullable=False, unique=True)
def __repr__(self) -> str:
return f"Time<id={self.id!r}, time={self.time!r}>"
class Region(Base):
__tablename__ = "regions"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
name: Mapped[str] = mapped_column(String(30), nullable=False, unique=True)
markets: Mapped[List["Market"]] = relationship()
def __repr__(self) -> str:
return f"Region<id={self.id!r}, name={self.name!r}, markets={self.markets!r}>"
class Market(Base):
__tablename__ = "markets"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
name: Mapped[str] = mapped_column(String(20), nullable=False)
region: Mapped[int] = mapped_column(ForeignKey("regions.id"))
assets: Mapped[List["Asset"]] = relationship()
def __repr__(self) -> str:
return f"Market<id={self.id!r}, name={self.name!r}, region={self.region!r}, assets={self.assets!r}>"
class Asset(Base):
__tablename__ = "assets"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
symbol: Mapped[str] = mapped_column(String(10), nullable=False, unique=True)
name: Mapped[str] = mapped_column(String(30), nullable=True)
market: Mapped[int] = mapped_column(ForeignKey("markets.id"))
def __repr__(self) -> str:
return f"Asset<id={self.id!r}, symbol={self.symbol!r}, name={self.name!r}, market={self.market!r}>"
class MarketData(Base):
__tablename__ = "market_data"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True, unique=True)
date: Mapped[int] = mapped_column(ForeignKey("dates.id"))
time: Mapped[int] = mapped_column(ForeignKey("times.id"))
asset: Mapped[int] = mapped_column(ForeignKey("assets.id"))
opening: Mapped[float] = mapped_column(Float, nullable=False)
high: Mapped[float] = mapped_column(Float, nullable=False)
low: Mapped[float] = mapped_column(Float, nullable=False)
closing: Mapped[float] = mapped_column(Float, nullable=False)
volume: Mapped[float] = mapped_column(Float, nullable=True)
def __repr__(self) -> str:
return f"Market Value<id={self.id!r}, date={self.date!r}, time={self.time!r}, asset={self.asset!r}, opening={self.opening!r}, high={self.high!r}, low={self.low!r}, closing={self.closing!r}, volume={self.volume!r}>"
我使用以下代码来插入初始日期。
symbol = "EURUSD"
filenames = get_symbol_data_filenames(symbol)
total_df = None
for filename in filenames:
df = read_csv(filename)
df = df.set_index(["Timestamp"])
if total_df is None:
total_df = df
else:
total_df = concat([df, total_df])
total_df = total_df.sort_index()
total_df = total_df[~total_df.index.duplicated(keep="last")]
database_engine = create_engine("sqlite:///data/db.sqlite3")
Base.metadata.create_all(database_engine)
with Session(database_engine) as session:
global_region = session.query(Region).filter_by(name="Global").first()
if global_region is None:
global_region = Region(name="Global")
session.add(global_region)
session.commit()
forex = session.query(Market).filter_by(name="Forex", region=global_region.id).first()
if forex is None:
forex = Market(name="Forex", region=global_region.id)
session.add(forex)
session.commit()
eurusd = session.query(Asset).filter_by(symbol="EURUSD", name="Euro/US-Dollar", market=forex.id).first()
if eurusd is None:
eurusd = Asset(symbol="EURUSD", name="Euro/US-Dollar", market=forex.id)
session.add(eurusd)
session.commit()
dates = [date.fromtimestamp(ts) for ts in dft.index]
seen = set()
dates = [x for x in dates if x not in seen and not seen.add(x)]
session.bulk_insert_mappings(Date, [{"date": date_} for date_ in dates])
session.commit()
但是这个数据集是定期更新的,我需要在保证数据完整性的同时向数据库中插入新数据,目前速度太慢了。
这就是我尝试将新数据插入数据库的方法。
with Session(database_engine) as session:
dates = session.query(Date).all()
if dates is None:
new_dates = [date.fromtimestamp(ts) for ts in total_df.index]
seen = set()
new_dates = [x for x in new_dates if x not in seen and not seen.add(x)]
session.bulk_insert_mappings(Date, [{"date": date_} for date_ in new_dates])
session.commit()
else:
dates = [x.date for x in dates]
new_dates = [date.fromtimestamp(ts) for ts in total_df.index]
seen = set()
new_dates = [x for x in new_dates if x not in seen and x not in dates and not seen.add(x)]
session.bulk_insert_mappings(Date, [{"date": date_} for date_ in new_dates])
session.commit()
如何结合
ON CONFLICT DO NOTHING
进行批量插入?我认为这是我能使用的最有效的方法。
要结合 ON CONFLICT DO NOTHING 来保证批量插入时的数据完整性并提高插入效率,可以按照以下步骤进行修改:首先需要确保所使用的数据库(本例为 SQLite)支持 ON冲突不做任何声明。对于你现有的代码,假设你要插入的表是MarketData,可以像这样修改bulk_insert_mappings部分代码:来自sqlalchemy。方言。 Sqlite 导入插入
data_to_insert = [ { 'date': date.fromtimestamp(ts), 'time': some_time, 'asset': some_asset_id, 'opening': some_opening_value, 'high': some_high_value, 'low': some_low_value, 'close': some_close_value, 'volume': some_volume_value } 对于 ts, your_data_source 中的 some_time、some_asset_id、some_opening_value、some_high_value、some_low_value、some_ending_value、some_volume_value ]
insert_stmt = insert(MarketData).values(data_to_insert) on_conflict_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['date']) # 假设 'date' 列是可能的冲突列 会话.执行(on_conflict_stmt) 会话.commit() 在上面的代码中,首先创建一条插入语句insert_stmt,然后使用on_conflict_do_nothing方法指定发生冲突时不执行任何操作(基于指定的列日期)。最后通过session.execute执行语句并提交事务。这样,当新插入的数据与日期列中已有的数据冲突时,不进行任何操作。这样,在保证数据完整性的同时,提高了插入效率。注意,需要根据实际的表结构和数据,修改data_to_insert中的列值,指定合适的可能冲突的列。