我当前的问题是从 SQL Server 数据库的大约 5,000,000 行的表中加载大量数据。
设置(我无法影响)是:
我的 SQL 代码作为
.sql
文件存储在项目文件夹中。
我从 500.000 行的块开始,但这导致了内核崩溃。尝试了 250.000,结果相同。现在已经 100.000 了,但仍然崩溃。
根据公司规则,我必须如下所示对数据库进行初始连接,该连接正在工作:
# Connection to SQL Server with Kerberos + pyodbc
def mssql_conn_kerberos(server, driver, trusted_connection, trust_server_certificate, kerberos_cmd):
# Run Kerberos for authentifications
os.system(kerberos_cmd)
try:
# First connection attempt
c_conn = pyodbc.connect(
f'DRIVER={driver};'
f'SERVER={server};'
f'Trusted_Connection={trusted_connection};'
f'TrustServerCertificate={trust_server_certificate}'
)
except:
# Re-run Kerberos and try authentification
os.system(kerberos_cmd)
c_conn = pyodbc.connect(
f"DRIVER={driver};"
f"SERVER={server};"
f"Trusted_Connection={trusted_connection};"
f"TrustServerCertificate={trust_server_certificate}"
)
c_cursor = c_conn.cursor()
print("Pyodbc connection ready.")
return c_conn # Connection to the database
然后我有一个函数来读取和处理我的 SQL 查询(位于项目文件夹中保存的
.sql
文件中):
def call_my_query(path_to_query, query_name, chunk, connection):
file_path = os.path.join(path_to_query, query_name)
with open(file_path, "r") as file:
query = file.read()
# SQL processing in chunks + time
chunks = []
start_time = time.time()
for x in pd.read_sql_query(query, connection, chunksize=chunk):
chunks.append(x)
# Concating the chunks - joining all the chunks together
df = pd.concat(chunks, ignore_index=True)
# Process end-time
end_time = time.time()
print("Data loaded successfully!")
print(f'Processed {len(df)} rows in {end_time - start_time:.2f} seconds')
return df
这会导致内核崩溃:
在当前单元格或前一个单元格中执行代码时内核崩溃。
请检查单元格中的代码以确定失败的可能原因。
单击此处了解更多信息。
查看 Jupyter 日志以获取更多详细信息。
我还尝试通过 Dask 运行此任务,并更改
call_my_query
函数,但由于某种原因 Dask 导致 pyodbc 出现问题。
Dask 的
call_my_query
的更改:
def call_my_query_dask(query_name, chunk, connection, index_col):
# Load query from file
file_path = os.path.join(path_to_query, query_name)
with open(file_path, "r") as file:
query_original = file.read()
# Convert the SQL string/text
query = sqlalchemy.select(query_original)
# Start timing the process
start_time = time.time()
# Use Dask to read the SQL query in chunks
print("Executing query and loading data with Dask...")
df_dask = dd.read_sql_query(
sql=query,
con=connection_url,
npartitions=10,
index_col = index_col
)
# Process end-time
end_time = time.time()
print("Data loaded successfully!")
print(f"Processed approximately {df_dask.shape[0].compute()} rows in {end_time - start_time:.2f} seconds")
return df_dask
导致此错误的原因:
文本列表达式 'SELECT [第一栏] , [COL...' 应使用 text('SELECT [第一栏] ,[COL...'),或使用literal_column('SELECT [第一栏] ,[COL...') 以获得更多特异性
谢谢大家的帮助。
考虑使用 SQL Server 的
OFFSET n ROWS
和 FETCH NEXT n ROWS ONLY
在数据库端而不是客户端进行分块。请务必正确设置 ORDER BY
子句和足够的 range
限制以覆盖所有行。
def call_my_query(path_to_query, query_name, chunk, connection):
# Read SQL string from file
file_path = os.path.join(path_to_query, query_name)
with open(file_path, "r") as file:
base_query = file.read()
main_query = (
f"SELECT * FROM ({base_query}) "
"ORDER BY id "
"OFFSET ? ROWS "
"FETCH NEXT ? ROWS ONLY"
)
# Build list of data frames from SQL parameterized chunks
chunk_frames = []
start_time = time.time()
for i in range(0, int(5e6), chunk):
chunk_frames.append(
pd.read_sql_query(
sql = main_query, con = connection, params = [i, chunk]
)
)
# Concatenating the chunks - joining all the chunks together
final_frame = pd.concat(chunk_frames, ignore_index=True)
# Process end-time
end_time = time.time()
print("Data loaded successfully!")
print(
f'Processed {len(final_frame)} rows '
f'in {end_time - start_time:.2f} seconds'
)
return final_frame
read_sql_query 的 dask 文档字符串包括:
sql : SQLAlchemy Selectable
SQL query to be executed. TextClause is not supported
但是您无论如何都在传递文本,这就是您收到该错误的原因。要使用 dask 及其自动查找数据中分区点的能力,您必须根据 sqlalchemy 函数来表达查询,例如从 sqlalchemy.sql 及其子模块开始(https://docs.sqlalchemy.org/ en/20/core/sqlelement.html ).