如何在 AWS Glue python shell 作业中将数据批量插入 MSSQL 数据库?

问题描述 投票:0回答:1

我在 s3 中有大量数据。在我的 Python 胶水作业中,我将以 pandas 数据框架的形式从这些文件中提取数据,并对数据框架应用必要的转换,然后使用 PYMSSQL 库将其加载到 Microsoft SQL 数据库中。最终的数据帧平均包含 100-200K 行和 180 列数据。 目前我正在使用 PYMSSQL 连接到数据库。问题是游标类的executemany 需要太多时间来加载数据。 100k 行大约需要 20 分钟。我检查了日志,总是加载缓慢。 附有屏幕截图。如何更快地加载它们?我在这里附上我的代码:

file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
 all_data.append(chunk)

data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
    data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
    data_frame.replace(to_replace ='',value =np.nan,inplace=True)
    data_frame.fillna(value=np.nan, inplace=True)
    data_frame.insert(0,'New-column', 1111)
    sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
    sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
    sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
    db_cursor.executemany(sql,sql_data_tuple)
    print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
    print(e)
python python-3.x aws-glue pymssql
1个回答
-1
投票

我最终这样做了,并给了我更好的结果(11 分钟内 100 万): (使用 Glue 2.0 python 作业代替 python shell 作业)

  1. 从s3中提取数据
  2. 使用 Pandas 对其进行转换
  3. 将转换后的文件作为 CSV 上传到 s3。
  4. 从目录表创建动态框架,该目录表是使用爬网程序通过爬网转换后的 CSV 文件创建的。或者您可以直接使用选项创建动态框架。
  5. 通过爬网目标 MSSQL 表将动态框架同步到使用爬网程序创建的目录表。

这是我使用的代码:

csv_buffer = StringIO()
s3_resource = boto3.resource("s3", region_name=AWS_REGION)
file = s3.get_object(Bucket=S3_BUCKET_NAME, Key=each_file)
for chunk in pd.read_csv(file['Body'], sep=",", header=None, low_memory=False, chunksize=100000):
    all_data.append(chunk)

data_frame = pd.concat(all_data, axis=0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace='', value=np.nan, inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0, 'New-column', 1234)

data_frame.to_csv(csv_buffer)
result = s3_resource.Object(S3_BUCKET_NAME, 'path in s3').put(Body=csv_buffer.getvalue())
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="source db name", table_name="source table name",
                                                            transformation_ctx="datasource0")

applymapping1 = ApplyMapping.apply(frame=datasource0, mappings=[mappings], transformation_ctx="applymapping1")

selectfields2 = SelectFields.apply(frame=applymapping1, paths=[column names of destination catalog table],
                                   transformation_ctx="selectfields2")

resolvechoice3 = ResolveChoice.apply(frame=selectfields2, choice="MATCH_CATALOG", database="destination dbname",
                                     table_name="destination table name", transformation_ctx="resolvechoice3")

resolvechoice4 = ResolveChoice.apply(frame=resolvechoice3, choice="make_cols", transformation_ctx="resolvechoice4")

datasink5 = glueContext.write_dynamic_frame.from_catalog(frame=resolvechoice4, database="destination db name",
                                                         table_name="destination table name",
                                                         transformation_ctx="datasink5")

job.commit()
© www.soinside.com 2019 - 2024. All rights reserved.