我在 s3 中有大量数据。在我的 Python 胶水作业中,我将以 pandas 数据框架的形式从这些文件中提取数据,并对数据框架应用必要的转换,然后使用 PYMSSQL 库将其加载到 Microsoft SQL 数据库中。最终的数据帧平均包含 100-200K 行和 180 列数据。 目前我正在使用 PYMSSQL 连接到数据库。问题是游标类的executemany 需要太多时间来加载数据。 100k 行大约需要 20 分钟。我检查了日志,总是加载缓慢。 附有屏幕截图。如何更快地加载它们?我在这里附上我的代码:
file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
all_data.append(chunk)
data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace ='',value =np.nan,inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0,'New-column', 1111)
sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
db_cursor.executemany(sql,sql_data_tuple)
print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
print(e)
我最终这样做了,并给了我更好的结果(11 分钟内 100 万): (使用 Glue 2.0 python 作业代替 python shell 作业)
这是我使用的代码:
csv_buffer = StringIO()
s3_resource = boto3.resource("s3", region_name=AWS_REGION)
file = s3.get_object(Bucket=S3_BUCKET_NAME, Key=each_file)
for chunk in pd.read_csv(file['Body'], sep=",", header=None, low_memory=False, chunksize=100000):
all_data.append(chunk)
data_frame = pd.concat(all_data, axis=0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace='', value=np.nan, inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0, 'New-column', 1234)
data_frame.to_csv(csv_buffer)
result = s3_resource.Object(S3_BUCKET_NAME, 'path in s3').put(Body=csv_buffer.getvalue())
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="source db name", table_name="source table name",
transformation_ctx="datasource0")
applymapping1 = ApplyMapping.apply(frame=datasource0, mappings=[mappings], transformation_ctx="applymapping1")
selectfields2 = SelectFields.apply(frame=applymapping1, paths=[column names of destination catalog table],
transformation_ctx="selectfields2")
resolvechoice3 = ResolveChoice.apply(frame=selectfields2, choice="MATCH_CATALOG", database="destination dbname",
table_name="destination table name", transformation_ctx="resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame=resolvechoice3, choice="make_cols", transformation_ctx="resolvechoice4")
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame=resolvechoice4, database="destination db name",
table_name="destination table name",
transformation_ctx="datasink5")
job.commit()