文本文件行 → 通过存储过程一次插入一行的表行

问题描述 投票:0回答:1

一直在努力解决这个问题。长话短说,我有一个问题,我们需要在客户提供的一个月的不同时间读取文件。该文件可以有几百行,也可以超过一百万行。我已经搜索过,但没有什么能解决存储过程调用问题(我已经看到使用executemany等)

对于每一行,我需要读取行内容(分隔),并将内容写入数据库。我目前所有这些都可以工作。问题在于时机。我将其从界面引擎中取出,因为读取大约 5000 行需要 1 小时。我用 python 创建了这个应用程序,希望能够显着加快这个过程。

我的问题是:“多线程”以提高速度的最佳方法是什么?基本上我需要尽快读取文件并将数据尽快插入数据库中。下面的粗体部分是我想要以某种方式多线程并一次执行多个的部分。

def ReadMetaDataFile (srcFile, fileDelimiter, clientID, mysqlCnx):
    functionResponse = ''
    path, filename = os.path.split(srcFile)

    try:
        with open(srcFile, 'r') as delimitedFile:
            fileReader = csv.DictReader(delimitedFile, delimiter=fileDelimiter)
            #next(fileReader, None)
            
            if(mysqlCnx and mysqlCnx.is_connected()):
                cursor = mysqlCnx.cursor()
                for row in fileReader:
                    **rowArgs = (clientID, row['FILENAME'], '', row['DATA_FORMAT'], 1)
                    cursor.callproc('sp_InsertMetaData', rowArgs)
                    mysqlCnx.commit()**
                cursor.close()
                mysqlCnx.close()
                functionResponse = "MetaData File Loaded Successfully"
    except mysql.connector.Error as err:
        functionResponse =  "Error processing MySQL SPROC call: " + str(err)
    except (mysql.connector.Error, IOError) as err:
        functionResponse =  "Error connecting to MySQL in function: " + str(err)
    except Exception as err:
        functionResponse =  "Exception Found in function: " + str(err)
    finally:
        mysqlCnx.close()
        return functionResponse

--更新 我已经尝试了几种不同的方法,此时我只是设置存储过程调用(并未实际执行),并且从文件中读取 20k 行需要 20 多分钟。某些文件中有数百万行,这太长了。有什么建议请。

def ReadMetaDataFile (srcFile, fileDelimiter, clientID, engine):
    functionResponse = ''
    path, filename = os.path.split(srcFile)

    try:
        query = 'CALL sp_InsertMetaData (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
        with open(srcFile, 'r') as delimitedFile:
            fileReader = csv.DictReader(delimitedFile, delimiter=fileDelimiter)
            conn = engine.raw_connection()
            cursor = conn.cursor()
            for row in fileReader:
                rowArgs = (clientID, row['FILENAME'], '', row['DATA_FORMAT'], row['SOURCE_SYSTEM_NAME'], row['CONFIDENTIALITY_CODE'], row['STATUS'], row['DOCUMENT_TYPE_SOURCE_SYSTEM'], row['DOCUMENT_TYPE_ID'], row['DOCUMENT_TYPE_DESCRIPTION'], row['DATE_OF_SERVICE'], row['DOCUMENT_ID'], row['SOURCE_CREATED_DATE'], row['SOURCE_LAST_MODIFIED_DATE'], row['TIMEZONE'], row['MRNSOURCE_SYSTEM'], row['PATIENT_MRN'], row['MEMBER_NBR'], row['PATIENT_LAST_NAME'], row['PATIENT_FIRST_NAME'], row['PATIENT_MIDDLE_NAME'], row['GENDER'], row['PATIENT_DATE_OF_BIRTH'], row['ENCOUNTER_SOURCE'], row['ENCOUNTER_ID'], row['ENCOUNTER_TYPE'], row['ADMIT_TIME'], row['DISCHARGE_TIME'], row['FACILITY_NAME'], row['FACILITY_SOURCE_SYSTEM'], row['PROVIDER_TYPE'], row['PROVIDER_SOURCE_SYSTEM'], row['PROVIDER_IDENTIFIER'], row['PROVIDER_LAST_NAME'], row['PROVIDER_FIRST_NAME'], row['PROVIDER_MIDDLE_NAME'], row['PROVIDER_CREDENTIAL'], row['PROVIDER_SPECIALTY'], 1)
                cursor.callproc("sp_InsertMetaData", rowArgs)
            #conn.commit()
            cursor.close()

            functionResponse = "MetaData File Loaded Successfully"
python mysql sqlalchemy mysql-connector
1个回答
0
投票

为了重现您的问题,我准备了 5000 行的文件,并尝试逐行读取它。它花了

0.07 s
,所以我假设你的瓶颈是在数据库中插入数据。尝试使用批量插入进行此类操作。我很确定这足以完成您的任务。

© www.soinside.com 2019 - 2024. All rights reserved.