我在从 SQLAlchemy 插入方法时遇到问题。当我想在 ORM 模型表中插入数据时,我遇到了下一个问题:
具有一对多关系的 ORM 模型表:
class OzonProductPrice(Base):
__tablename__ = "ozon_product_price"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
articul: Mapped[str] = mapped_column(String(120))
brand: Mapped[str] = mapped_column(String(50))
product_price: Mapped[float]
product_price_plus_invest: Mapped[float]
date_update_db: Mapped[datetime] = mapped_column(
server_default=func.now(),
default=datetime.utcnow)
ozon_orders_info: Mapped[list["OzonOrders"]] = relationship( # Для одной цены есть множество заказов
back_populates = "ozon_product_price",
cascade = "all, delete-orphan"
class OzonOrders(Base):
__tablename__ = "ozon_orders_info"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
articul: Mapped[str] = mapped_column(String(120))
brand: Mapped[str] = mapped_column(String(30))
order_number: Mapped[str] = mapped_column(String(120))
posting_number: Mapped[str] = mapped_column(String(120))
date_created_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
default=datetime.utcnow)
in_process_at: Mapped[datetime] = mapped_column(
server_default=func.now(),
default=datetime.utcnow)
status: Mapped[str] = mapped_column(String(120))
product_price_order: Mapped[float]
currency_code: Mapped[str] = mapped_column(String(90))
product_name: Mapped[str] = mapped_column(String(200))
product_sku: Mapped[str] = mapped_column(String(140))
product_quantity: Mapped[str] = mapped_column(String(140))
warehouse_name: Mapped[str] = mapped_column(String(140))
commission_percent: Mapped[float]
ozon_product_price_id = mapped_column(ForeignKey("ozon_product_price.id"))
date_update_db: Mapped[datetime] = mapped_column(
server_default=func.now(),
default=datetime.utcnow)
ozon_product_price: Mapped["OzonProductPrice"] = relationship( # Для множества заказов есть только одна цена
back_populates = "ozon_orders_info"
)
)
with engine_dwh_staging.connect() as conn:
for item_price in range(len(data_ozon_price)):
for item_orders in range(len(data_ozon_orders)):
# если артикул цены совпадает с артикулом заказа, то собираем заказы в список
if data_ozon_price.iloc[item_price]['articul'] == data_ozon_orders.iloc[item_orders]['article']:
sql = insert(OzonProductPrice).values(
articul=data_ozon_price.iloc[item_price]['articul'],
brand=data_ozon_price.iloc[item_price]['brand'],
product_price=data_ozon_price.iloc[item_price]['product_price'],
product_price_plus_invest=data_ozon_price.iloc[item_price]['product_price_plus_invest'],
date_update_db=data_ozon_price.iloc[item_price]['date_update_db'],
ozon_orders_info=[
OzonOrders(
articul=data_ozon_orders.iloc[item_orders]['article'],
brand=data_ozon_orders.iloc[item_orders]['brand'],
order_number=data_ozon_orders.iloc[item_orders]['order_number'],
posting_number=data_ozon_orders.iloc[item_orders]['posting_number'],
date_created_at=data_ozon_orders.iloc[item_orders]['date_created_at'],
in_process_at=data_ozon_orders.iloc[item_orders]['in_process_at'],
status=data_ozon_orders.iloc[item_orders]['status'],
product_price_order=data_ozon_orders.iloc[item_orders]['product_price_order'],
currency_code=data_ozon_orders.iloc[item_orders]['currency_code'],
product_name=data_ozon_orders.iloc[item_orders]['product_name'],
product_sku=data_ozon_orders.iloc[item_orders]['product_sku'],
product_quantity=data_ozon_orders.iloc[item_orders]['product_quantity'],
warehouse_name=data_ozon_orders.iloc[item_orders]['warehouse_name'],
commission_percent=data_ozon_orders.iloc[item_orders]['commission_percent'],
date_update_db=data_ozon_orders.iloc[item_orders]['date_update_db']
)]
)
conn.execute(sql)
conn.commit()
我收到错误:
sqlalchemy.exc.StatementError:(builtins.TypeError)不可散列类型:“列表” [SQL:插入 ozon_product_price (id = ozon_product_price_id、articul、brand、product_price、product_price_plus_invest、date_update_db) 值 (%(param_1)s、%(articul)s、%(brand)s、%(product_price)s、%(product_price_plus_invest) )s, %(date_update_db)s) 返回 ozon_product_price.id]
什么问题?如何解决这个问题?
以下是如何重构以使用会话确保 SQLAlchemy 管理关系和事务
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine_dwh_staging)
session = Session()
try:
for item_price in range(len(data_ozon_price)):
price_data = data_ozon_price.iloc[item_price]
# Creating OzonProductPrice object
ozon_product_price = OzonProductPrice(
articul=price_data['articul'],
brand=price_data['brand'],
product_price=price_data['product_price'],
product_price_plus_invest=price_data['product_price_plus_invest'],
date_update_db=price_data['date_update_db'],
)
session.add(ozon_product_price)
for item_orders in range(len(data_ozon_orders)):
order_data = data_ozon_orders.iloc[item_orders]
if price_data['articul'] == order_data['article']:
# Creating OzonOrders object
ozon_order = OzonOrders(
articul=order_data['article'],
brand=order_data['brand'],
order_number=order_data['order_number'],
posting_number=order_data['posting_number'],
date_created_at=order_data['date_created_at'],
in_process_at=order_data['in_process_at'],
status=order_data['status'],
product_price_order=order_data['product_price_order'],
currency_code=order_data['currency_code'],
product_name=order_data['product_name'],
product_sku=order_data['product_sku'],
product_quantity=order_data['product_quantity'],
warehouse_name=order_data['warehouse_name'],
commission_percent=order_data['commission_percent'],
date_update_db=order_data['date_update_db'],
ozon_product_price=ozon_product_price
)
session.add(ozon_order)
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()