从 SQL Server 数据库加载大量数据时出现问题

问题描述 投票:0回答:2

我当前的问题是从 SQL Server 数据库的大约 5,000,000 行的表中加载大量数据。

设置(我无法影响)是:

  • 0 GPU
  • 4000CPU
  • 15.0 Gi 内存

我的 SQL 代码作为

.sql
文件存储在项目文件夹中。

我从 500.000 行的块开始,但这导致了内核崩溃。尝试了 250.000,结果相同。现在已经 100.000 了,但仍然崩溃。

根据公司规则,我必须如下所示对数据库进行初始连接,该连接正在工作:

# Connection to SQL Server with Kerberos + pyodbc

def mssql_conn_kerberos(server, driver, trusted_connection, trust_server_certificate, kerberos_cmd):
   # Run Kerberos for authentifications
   os.system(kerberos_cmd)
    
   try:
   # First connection attempt
     c_conn = pyodbc.connect(
     f'DRIVER={driver};'
     f'SERVER={server};'
     f'Trusted_Connection={trusted_connection};'
     f'TrustServerCertificate={trust_server_certificate}'
     )
   except:
     # Re-run Kerberos and try authentification
     os.system(kerberos_cmd)
     c_conn = pyodbc.connect(
     f"DRIVER={driver};"
     f"SERVER={server};"
     f"Trusted_Connection={trusted_connection};"
     f"TrustServerCertificate={trust_server_certificate}"
     )
    
    c_cursor = c_conn.cursor()

    print("Pyodbc connection ready.")

    return c_conn # Connection to the database

然后我有一个函数来读取和处理我的 SQL 查询(位于项目文件夹中保存的

.sql
文件中):

def call_my_query(path_to_query, query_name, chunk, connection):
    
    file_path = os.path.join(path_to_query, query_name)
    with open(file_path, "r") as file:
        query = file.read()

    # SQL processing in chunks + time
    chunks = []
    start_time = time.time()

    for x in pd.read_sql_query(query, connection, chunksize=chunk):
        chunks.append(x)

    # Concating the chunks - joining all the chunks together
    df = pd.concat(chunks, ignore_index=True)

    # Process end-time
    end_time = time.time()

    print("Data loaded successfully!")
    print(f'Processed {len(df)} rows in {end_time - start_time:.2f} seconds')

    return df

这会导致内核崩溃:

在当前单元格或前一个单元格中执行代码时内核崩溃。
请检查单元格中的代码以确定失败的可能原因。
单击此处了解更多信息。
查看 Jupyter 日志以获取更多详细信息。

我还尝试通过 Dask 运行此任务,并更改

call_my_query
函数,但由于某种原因 Dask 导致 pyodbc 出现问题。

Dask 的

call_my_query
的更改:

def call_my_query_dask(query_name, chunk, connection, index_col):
    
    # Load query from file
    file_path = os.path.join(path_to_query, query_name)
    with open(file_path, "r") as file:
        query_original = file.read()

    # Convert the SQL string/text
    query = sqlalchemy.select(query_original)

    # Start timing the process
    start_time = time.time()

    # Use Dask to read the SQL query in chunks
    print("Executing query and loading data with Dask...")
    df_dask = dd.read_sql_query(
        sql=query,
        con=connection_url,
        npartitions=10,
        index_col = index_col
    )

    # Process end-time
    end_time = time.time()
    print("Data loaded successfully!")
    print(f"Processed approximately {df_dask.shape[0].compute()} rows in {end_time - start_time:.2f} seconds")

    return df_dask

导致此错误的原因:

文本列表达式 'SELECT [第一栏] , [COL...' 应使用 text('SELECT [第一栏] ,[COL...'),或使用literal_column('SELECT [第一栏] ,[COL...') 以获得更多特异性

谢谢大家的帮助。

python sql-server pandas sqlalchemy dask
2个回答
1
投票

考虑使用 SQL Server 的

OFFSET n ROWS
FETCH NEXT n ROWS ONLY
在数据库端而不是客户端进行分块。请务必正确设置
ORDER BY
子句和足够的
range
限制以覆盖所有行。

def call_my_query(path_to_query, query_name, chunk, connection):
    # Read SQL string from file
    file_path = os.path.join(path_to_query, query_name)
    with open(file_path, "r") as file:
        base_query = file.read()

    main_query = (
        f"SELECT * FROM ({base_query}) "
        "ORDER BY id "
        "OFFSET ? ROWS "
        "FETCH NEXT ? ROWS ONLY"
    )

    # Build list of data frames from SQL parameterized chunks
    chunk_frames = []
    start_time = time.time()

    for i in range(0, int(5e6), chunk):
        chunk_frames.append(
            pd.read_sql_query(
                sql = main_query, con = connection, params = [i, chunk]
            )
        )

    # Concatenating the chunks - joining all the chunks together
    final_frame = pd.concat(chunk_frames, ignore_index=True)

    # Process end-time
    end_time = time.time()

    print("Data loaded successfully!")
    print(
        f'Processed {len(final_frame)} rows '
        f'in {end_time - start_time:.2f} seconds'
    )

    return final_frame

-1
投票

read_sql_query 的 dask 文档字符串包括:

sql : SQLAlchemy Selectable
    SQL query to be executed. TextClause is not supported

但是您无论如何都在传递文本,这就是您收到该错误的原因。要使用 dask 及其自动查找数据中分区点的能力,您必须根据 sqlalchemy 函数来表达查询,例如从 sqlalchemy.sql 及其子模块开始(https://docs.sqlalchemy.org/ en/20/core/sqlelement.html ).

© www.soinside.com 2019 - 2024. All rights reserved.