为什么 SQLAlchemy 在与 Session 一起使用时会给出错误“num (INTEGER) not a string”,但在与 engine.begin() 一起使用时不会给出错误

问题描述 投票:0回答:1

我正在尝试批量插入 mssql 数据库。 应用程序应该从 api 获取数据,将其排列到 pandas.DataFrame 中,强制使用 int 类型 列“num”.astype(int).

在此之后,我想开始与数据库进行事务

  1. 创建临时表
  2. 将数据插入到tempTable中
  3. 将 tempTable 与 prodTable 合并
  4. 如果没有发生错误则提交事务,如果数据库出现错误则回滚。

有人可以向我解释一下为什么我收到错误“num (INTEGER) not a string” 当我尝试运行此版本的代码时:

SQL 表架构:

CREATE TABLE [dbo].[discountDimensions](
[ID] [int] IDENTITY(1,1) NOT NULL,
[num] [int] NOT NULL,
[name] [varchar](60) NULL,
[posPercent] [float] NULL
)

在 DataProcessor.py 中:

    def discounts_dim_to_df(self, data:dict):
        df = pd.DataFrame(data["discounts"])
        df = df.drop(columns=['num', 'name'])
        df['mstrNum'] = df["mstrNum"].astype(int)
        return df.rename(columns={"mstrNum":"num","mstrName":"name"})

在康涅狄格州.py:

  class Conn:
     def __init__(self, server:str, database:str, username:str, passwd:str): 
        driver = 'ODBC Driver 17 for SQL Server'
        connection_string = f'mssql+pyodbc://{username}:{passwd}@{server}:1433/{database}?driver={driver}'
        try:
            self.engine = create_engine(connection_string, pool_pre_ping=True)
            Session = sessionmaker(bind=self.engine)
            self.session = Session()
        except Exception as err:
            print(f'Failed to connect to {server} -> {database}')
            mm = Mailman()
            mm.connect_to_db_failed(server=server,db=database,error_message=err)

     def update_discounts_db(self, data:pd.DataFrame) -> bool:
         create_temp_sql = '''
         CREATE TABLE #tempDiscountDimensions (
            num INT PRIMARY KEY,
            name VARCHAR(60) NOT NULL,
            posPercent FLOAT NULL
         );
         '''
         merge_sql = '''
         MERGE INTO discountDimensions AS target
         USING #tempDiscountDimensions AS source
         ON target.num = source.num
         WHEN MATCHED THEN
            UPDATE SET target.name = source.name,
                       target.posPercent = source.posPercent
         WHEN NOT MATCHED THEN
            INSERT (num, name, posPercent) VALUES (source.num, source.name, source.posPercent);
         '''  
         try:
             self.session.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
                                DROP TABLE #tempDiscountDimensions;"""))
             self.session.execute(text(create_temp_sql))
             data.to_sql('#tempDiscountDimensions',
                    con=self.session,
                    if_exists='append',
                    index=False,
                    dtype={
                        "num": Integer(),
                        "name": String(),
                        "posPercent": Float()
                    })

             self.session.execute(text(merge_sql))
             self.session.execute(text('DROP TABLE #tempDiscountDimensions'))
             self.session.commit()

         except Exception as err:
             self.session.rollback()
             print(f'Failed to update/insert data from discountDimensions table:\n{err}')
             mm = Mailman()
             mm.process_failed(option='db_connection', error=err)
             return None

给出错误:

Traceback (most recent call last):
  File "path\to\folder\main.py", line 81, in <module>
    conn.update_discounts_db(data=data_disc)
  File "path\to\folder\Conn.py", line 82, in update_discounts_db
    data.to_sql('#tempDiscountDimensions',
  File "path\to\folder\_env\Lib\site-packages\pandas\util\_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "path\to\folder\_env\Lib\site-packages\pandas\core\generic.py", line 3087, in to_sql
    return sql.to_sql(
          ^^^^^^^^^^^
  File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 842, in to_sql
    return pandas_sql.to_sql(
          ^^^^^^^^^^^^^^^^^^
  File "path\to\folder\_env\Lib\site-packages\pandas\io\sql.py", line 2839, in to_sql
    raise ValueError(f"{col} ({my_type}) not a string")
ValueError: num (INTEGER) not a string

但是当我将“self.session”更改为“以 self.engine.begin() 作为连接”时,它就可以工作了。但是,我失去了在发生错误时回滚的能力。

        try:
            with self.engine.begin() as connection:
                connection.execute(text("""IF OBJECT_ID('tempdb..#tempDiscountDimensions', 'U') IS NOT NULL
                                    DROP TABLE #tempDiscountDimensions;"""))
                connection.execute(text(create_temp_sql))
                data.to_sql('#tempDiscountDimensions',
                        con=connection,
                        if_exists='append',
                        index=False,
                        dtype={
                            "num": Integer(),
                            "name": String(),
                            "posPercent": Float()
                        })

                connection.execute(text(merge_sql))
                connection.execute(text('DROP TABLE #tempDiscountDimensions'))
python pandas sqlalchemy
1个回答
0
投票

我遇到的问题是由于将 Session 实例传递给 pandas.DataFrame.to_sql 中的“con”,而不是我之前怀疑的数据类型问题。

根据pandas的文档DataFrame.to_sql

参数“con”应该有“sqlalchemy.engine.(引擎或连接)或sqlite3.Connection”

感谢 @GordThompson 为我指明了正确的方向,我检查了 Session.connection() 上的 SQLAlchemy 文档,它启动事务并用于在会话的绑定引擎上获取新的 connection,正是我的 df.to_sql(con=) 所需要的。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.