Python 和 SQLAlchemy 从 API 插入数据(一对多关系和 1000 行以上)

问题描述 投票:0回答:1

我在从 SQLAlchemy 插入方法时遇到问题。当我想在 ORM 模型表中插入数据时,我遇到了下一个问题:

具有一对多关系的 ORM 模型表:

class OzonProductPrice(Base):
  __tablename__ = "ozon_product_price"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    articul: Mapped[str] = mapped_column(String(120))
    brand: Mapped[str] = mapped_column(String(50))
    product_price: Mapped[float]
    product_price_plus_invest: Mapped[float]
    date_update_db: Mapped[datetime] = mapped_column(
      server_default=func.now(),
      default=datetime.utcnow)
    ozon_orders_info: Mapped[list["OzonOrders"]] = relationship( # Для одной цены есть множество    заказов
    back_populates = "ozon_product_price",
    cascade = "all, delete-orphan"

class OzonOrders(Base):
  __tablename__ = "ozon_orders_info"
 
  id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
  articul: Mapped[str] = mapped_column(String(120))
  brand: Mapped[str] = mapped_column(String(30))
  order_number: Mapped[str] = mapped_column(String(120))
  posting_number: Mapped[str] = mapped_column(String(120))
  date_created_at: Mapped[datetime] = mapped_column(
      server_default=func.now(),
      default=datetime.utcnow)
  in_process_at: Mapped[datetime] = mapped_column(
      server_default=func.now(),
      default=datetime.utcnow)
  status: Mapped[str] = mapped_column(String(120))
  product_price_order: Mapped[float]
  currency_code: Mapped[str] = mapped_column(String(90))
  product_name: Mapped[str] = mapped_column(String(200))
  product_sku: Mapped[str] = mapped_column(String(140))
  product_quantity: Mapped[str] = mapped_column(String(140))
  warehouse_name: Mapped[str] = mapped_column(String(140))
  commission_percent: Mapped[float]
  ozon_product_price_id = mapped_column(ForeignKey("ozon_product_price.id"))
  date_update_db: Mapped[datetime] = mapped_column(
      server_default=func.now(),
      default=datetime.utcnow)
  ozon_product_price: Mapped["OzonProductPrice"] = relationship( # Для множества заказов есть только одна цена
      back_populates = "ozon_orders_info"
  )
)
with engine_dwh_staging.connect() as conn:
    for item_price in range(len(data_ozon_price)):
        for item_orders in range(len(data_ozon_orders)):
        # если артикул цены совпадает с артикулом заказа, то собираем заказы в список
            if data_ozon_price.iloc[item_price]['articul'] == data_ozon_orders.iloc[item_orders]['article']:
                sql = insert(OzonProductPrice).values(
                    articul=data_ozon_price.iloc[item_price]['articul'],
                    brand=data_ozon_price.iloc[item_price]['brand'],
                    product_price=data_ozon_price.iloc[item_price]['product_price'],
                    product_price_plus_invest=data_ozon_price.iloc[item_price]['product_price_plus_invest'],
                    date_update_db=data_ozon_price.iloc[item_price]['date_update_db'],
                    ozon_orders_info=[
                            OzonOrders(
                                articul=data_ozon_orders.iloc[item_orders]['article'],
                                brand=data_ozon_orders.iloc[item_orders]['brand'],
                                order_number=data_ozon_orders.iloc[item_orders]['order_number'],
                                posting_number=data_ozon_orders.iloc[item_orders]['posting_number'],
                                date_created_at=data_ozon_orders.iloc[item_orders]['date_created_at'],
                                in_process_at=data_ozon_orders.iloc[item_orders]['in_process_at'],
                                status=data_ozon_orders.iloc[item_orders]['status'],
                                product_price_order=data_ozon_orders.iloc[item_orders]['product_price_order'],
                                currency_code=data_ozon_orders.iloc[item_orders]['currency_code'],
                                product_name=data_ozon_orders.iloc[item_orders]['product_name'],
                                product_sku=data_ozon_orders.iloc[item_orders]['product_sku'],
                                product_quantity=data_ozon_orders.iloc[item_orders]['product_quantity'],
                                warehouse_name=data_ozon_orders.iloc[item_orders]['warehouse_name'],
                                commission_percent=data_ozon_orders.iloc[item_orders]['commission_percent'],
                                date_update_db=data_ozon_orders.iloc[item_orders]['date_update_db']
                            )]
                )
                conn.execute(sql)
                conn.commit()

我收到错误:

sqlalchemy.exc.StatementError:(builtins.TypeError)不可散列类型:“列表” [SQL:插入 ozon_product_price (id = ozon_product_price_id、articul、brand、product_price、product_price_plus_invest、date_update_db) 值 (%(param_1)s、%(articul)s、%(brand)s、%(product_price)s、%(product_price_plus_invest) )s, %(date_update_db)s) 返回 ozon_product_price.id]

什么问题?如何解决这个问题?

python sqlalchemy insert
1个回答
0
投票

以下是如何重构以使用会话确保 SQLAlchemy 管理关系和事务

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine_dwh_staging)
session = Session()

try:
    for item_price in range(len(data_ozon_price)):
        price_data = data_ozon_price.iloc[item_price]
        # Creating OzonProductPrice object
        ozon_product_price = OzonProductPrice(
            articul=price_data['articul'],
            brand=price_data['brand'],
            product_price=price_data['product_price'],
            product_price_plus_invest=price_data['product_price_plus_invest'],
            date_update_db=price_data['date_update_db'],
        )
        session.add(ozon_product_price)

        for item_orders in range(len(data_ozon_orders)):
            order_data = data_ozon_orders.iloc[item_orders]
            if price_data['articul'] == order_data['article']:
                # Creating OzonOrders object
                ozon_order = OzonOrders(
                    articul=order_data['article'],
                    brand=order_data['brand'],
                    order_number=order_data['order_number'],
                    posting_number=order_data['posting_number'],
                    date_created_at=order_data['date_created_at'],
                    in_process_at=order_data['in_process_at'],
                    status=order_data['status'],
                    product_price_order=order_data['product_price_order'],
                    currency_code=order_data['currency_code'],
                    product_name=order_data['product_name'],
                    product_sku=order_data['product_sku'],
                    product_quantity=order_data['product_quantity'],
                    warehouse_name=order_data['warehouse_name'],
                    commission_percent=order_data['commission_percent'],
                    date_update_db=order_data['date_update_db'],
                    ozon_product_price=ozon_product_price
                )
                session.add(ozon_order)

    session.commit()
except Exception as e:
    session.rollback()
    raise e
finally:
    session.close()
© www.soinside.com 2019 - 2024. All rights reserved.