文本文件行 → 通过现有存储过程插入单行的表行

问题描述 投票:0回答:2

一直在努力解决这个问题。长话短说,我有一个问题,我们需要在客户提供的一个月的不同时间读取文件。该文件可以有几百行,也可以超过一百万行。我已经搜索过,但没有什么能解决存储过程调用问题(我已经看到使用executemany等)

对于每一行,我需要读取行内容(分隔),并将内容写入数据库。我目前所有这些都可以工作。问题在于时机。我将其从界面引擎中取出,因为读取大约 5000 行需要 1 小时。我用 python 创建了这个应用程序,希望能够显着加快这个过程。

我的问题是:“多线程”以提高速度的最佳方法是什么?基本上我需要尽快读取文件并将数据尽快插入数据库中。下面的粗体部分是我想要以某种方式多线程并一次执行多个的部分。

def ReadMetaDataFile (srcFile, fileDelimiter, clientID, mysqlCnx):
    functionResponse = ''
    path, filename = os.path.split(srcFile)

    try:
        with open(srcFile, 'r') as delimitedFile:
            fileReader = csv.DictReader(delimitedFile, delimiter=fileDelimiter)
            #next(fileReader, None)
            
            if(mysqlCnx and mysqlCnx.is_connected()):
                cursor = mysqlCnx.cursor()
                for row in fileReader:
                    **rowArgs = (clientID, row['FILENAME'], '', row['DATA_FORMAT'], 1)
                    cursor.callproc('sp_InsertMetaData', rowArgs)
                    mysqlCnx.commit()**
                cursor.close()
                mysqlCnx.close()
                functionResponse = "MetaData File Loaded Successfully"
    except mysql.connector.Error as err:
        functionResponse =  "Error processing MySQL SPROC call: " + str(err)
    except (mysql.connector.Error, IOError) as err:
        functionResponse =  "Error connecting to MySQL in function: " + str(err)
    except Exception as err:
        functionResponse =  "Exception Found in function: " + str(err)
    finally:
        mysqlCnx.close()
        return functionResponse

--更新 我已经尝试了几种不同的方法,此时我只是设置存储过程调用(并未实际执行),并且从文件中读取 20k 行需要 20 多分钟。某些文件中有数百万行,这太长了。有什么建议请。

def ReadMetaDataFile (srcFile, fileDelimiter, clientID, engine):
    functionResponse = ''
    path, filename = os.path.split(srcFile)

    try:
        query = 'CALL sp_InsertMetaData (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
        with open(srcFile, 'r') as delimitedFile:
            fileReader = csv.DictReader(delimitedFile, delimiter=fileDelimiter)
            conn = engine.raw_connection()
            cursor = conn.cursor()
            for row in fileReader:
                rowArgs = (clientID, row['FILENAME'], '', row['DATA_FORMAT'], row['SOURCE_SYSTEM_NAME'], row['CONFIDENTIALITY_CODE'], row['STATUS'], row['DOCUMENT_TYPE_SOURCE_SYSTEM'], row['DOCUMENT_TYPE_ID'], row['DOCUMENT_TYPE_DESCRIPTION'], row['DATE_OF_SERVICE'], row['DOCUMENT_ID'], row['SOURCE_CREATED_DATE'], row['SOURCE_LAST_MODIFIED_DATE'], row['TIMEZONE'], row['MRNSOURCE_SYSTEM'], row['PATIENT_MRN'], row['MEMBER_NBR'], row['PATIENT_LAST_NAME'], row['PATIENT_FIRST_NAME'], row['PATIENT_MIDDLE_NAME'], row['GENDER'], row['PATIENT_DATE_OF_BIRTH'], row['ENCOUNTER_SOURCE'], row['ENCOUNTER_ID'], row['ENCOUNTER_TYPE'], row['ADMIT_TIME'], row['DISCHARGE_TIME'], row['FACILITY_NAME'], row['FACILITY_SOURCE_SYSTEM'], row['PROVIDER_TYPE'], row['PROVIDER_SOURCE_SYSTEM'], row['PROVIDER_IDENTIFIER'], row['PROVIDER_LAST_NAME'], row['PROVIDER_FIRST_NAME'], row['PROVIDER_MIDDLE_NAME'], row['PROVIDER_CREDENTIAL'], row['PROVIDER_SPECIALTY'], 1)
                cursor.callproc("sp_InsertMetaData", rowArgs)
            #conn.commit()
            cursor.close()

            functionResponse = "MetaData File Loaded Successfully"
python mysql sqlalchemy mysql-connector
2个回答
0
投票

为了重现您的问题,我准备了 5000 行的文件,并尝试逐行读取它。它花了

0.07 s
,所以我假设你的瓶颈是在数据库中插入数据。尝试使用批量插入进行此类操作。我很确定这足以完成您的任务。


0
投票

您的代码速度很慢,因为您为要插入的每一行调用一次

.callproc()
,并且每个
.callproc()
都会导致两次往返数据库。 (一个用于设置参数值,另一个用于实际
CALL
存储过程。)

如果将所有行上传到临时表,然后使用另一个存储过程循环访问临时表并调用现有存储过程,您可能会获得更好更好的性能。填充临时表后,所有后续处理都在服务器上进行。如果临时表被称为

tmp_tbl
,新的存储过程将如下所示:

CREATE DEFINER=`root`@`localhost` PROCEDURE `sp_InsertMyDataBatch`()
   MODIFIES SQL DATA
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE new_id INT;
DECLARE new_txt VARCHAR(50);
DECLARE cur1 CURSOR FOR SELECT id, txt FROM tmp_tbl;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur1;
read_loop: LOOP
   FETCH cur1 INTO new_id, new_txt;
   IF done THEN
     LEAVE read_loop;
   END IF;
   CALL sp_InsertMyData(new_id, new_txt);
END LOOP;
CLOSE cur1;
END
© www.soinside.com 2019 - 2024. All rights reserved.