一直在努力解决这个问题。长话短说,我有一个问题,我们需要在客户提供的一个月的不同时间读取文件。该文件可以有几百行,也可以超过一百万行。我已经搜索过,但没有什么能解决存储过程调用问题(我已经看到使用executemany等)
对于每一行,我需要读取行内容(分隔),并将内容写入数据库。我目前所有这些都可以工作。问题在于时机。我将其从界面引擎中取出,因为读取大约 5000 行需要 1 小时。我用 python 创建了这个应用程序,希望能够显着加快这个过程。
我的问题是:“多线程”以提高速度的最佳方法是什么?基本上我需要尽快读取文件并将数据尽快插入数据库中。下面的粗体部分是我想要以某种方式多线程并一次执行多个的部分。
def ReadMetaDataFile (srcFile, fileDelimiter, clientID, mysqlCnx):
functionResponse = ''
path, filename = os.path.split(srcFile)
try:
with open(srcFile, 'r') as delimitedFile:
fileReader = csv.DictReader(delimitedFile, delimiter=fileDelimiter)
#next(fileReader, None)
if(mysqlCnx and mysqlCnx.is_connected()):
cursor = mysqlCnx.cursor()
for row in fileReader:
**rowArgs = (clientID, row['FILENAME'], '', row['DATA_FORMAT'], 1)
cursor.callproc('sp_InsertMetaData', rowArgs)
mysqlCnx.commit()**
cursor.close()
mysqlCnx.close()
functionResponse = "MetaData File Loaded Successfully"
except mysql.connector.Error as err:
functionResponse = "Error processing MySQL SPROC call: " + str(err)
except (mysql.connector.Error, IOError) as err:
functionResponse = "Error connecting to MySQL in function: " + str(err)
except Exception as err:
functionResponse = "Exception Found in function: " + str(err)
finally:
mysqlCnx.close()
return functionResponse
--更新 我已经尝试了几种不同的方法,此时我只是设置存储过程调用(并未实际执行),并且从文件中读取 20k 行需要 20 多分钟。某些文件中有数百万行,这太长了。有什么建议请。
def ReadMetaDataFile (srcFile, fileDelimiter, clientID, engine):
functionResponse = ''
path, filename = os.path.split(srcFile)
try:
query = 'CALL sp_InsertMetaData (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
with open(srcFile, 'r') as delimitedFile:
fileReader = csv.DictReader(delimitedFile, delimiter=fileDelimiter)
conn = engine.raw_connection()
cursor = conn.cursor()
for row in fileReader:
rowArgs = (clientID, row['FILENAME'], '', row['DATA_FORMAT'], row['SOURCE_SYSTEM_NAME'], row['CONFIDENTIALITY_CODE'], row['STATUS'], row['DOCUMENT_TYPE_SOURCE_SYSTEM'], row['DOCUMENT_TYPE_ID'], row['DOCUMENT_TYPE_DESCRIPTION'], row['DATE_OF_SERVICE'], row['DOCUMENT_ID'], row['SOURCE_CREATED_DATE'], row['SOURCE_LAST_MODIFIED_DATE'], row['TIMEZONE'], row['MRNSOURCE_SYSTEM'], row['PATIENT_MRN'], row['MEMBER_NBR'], row['PATIENT_LAST_NAME'], row['PATIENT_FIRST_NAME'], row['PATIENT_MIDDLE_NAME'], row['GENDER'], row['PATIENT_DATE_OF_BIRTH'], row['ENCOUNTER_SOURCE'], row['ENCOUNTER_ID'], row['ENCOUNTER_TYPE'], row['ADMIT_TIME'], row['DISCHARGE_TIME'], row['FACILITY_NAME'], row['FACILITY_SOURCE_SYSTEM'], row['PROVIDER_TYPE'], row['PROVIDER_SOURCE_SYSTEM'], row['PROVIDER_IDENTIFIER'], row['PROVIDER_LAST_NAME'], row['PROVIDER_FIRST_NAME'], row['PROVIDER_MIDDLE_NAME'], row['PROVIDER_CREDENTIAL'], row['PROVIDER_SPECIALTY'], 1)
cursor.callproc("sp_InsertMetaData", rowArgs)
#conn.commit()
cursor.close()
functionResponse = "MetaData File Loaded Successfully"
为了重现您的问题,我准备了 5000 行的文件,并尝试逐行读取它。它花了
0.07 s
,所以我假设你的瓶颈是在数据库中插入数据。尝试使用批量插入进行此类操作。我很确定这足以完成您的任务。
您的代码速度很慢,因为您为要插入的每一行调用一次
.callproc()
,并且每个 .callproc()
都会导致两次往返数据库。 (一个用于设置参数值,另一个用于实际 CALL
存储过程。)
如果将所有行上传到临时表,然后使用另一个存储过程循环访问临时表并调用现有存储过程,您可能会获得更好更好的性能。填充临时表后,所有后续处理都在服务器上进行。如果临时表被称为
tmp_tbl
,新的存储过程将如下所示:
CREATE DEFINER=`root`@`localhost` PROCEDURE `sp_InsertMyDataBatch`()
MODIFIES SQL DATA
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE new_id INT;
DECLARE new_txt VARCHAR(50);
DECLARE cur1 CURSOR FOR SELECT id, txt FROM tmp_tbl;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur1;
read_loop: LOOP
FETCH cur1 INTO new_id, new_txt;
IF done THEN
LEAVE read_loop;
END IF;
CALL sp_InsertMyData(new_id, new_txt);
END LOOP;
CLOSE cur1;
END