[在运行DataBricks代码并准备CSV文件并将其加载到ADLS2时,CSV文件被拆分为许多CSV文件并正在加载到ADLS2。
是否可以通过pyspark在ADLS2中合并这些CSV文件。
谢谢
是否可以通过pyspark在ADLS2中合并这些CSV文件。
据我所知,spark数据帧确实将文件分开制作。理论上,您可以使用spark.csv method来接受字符串列表作为参数。
>>> df = spark.read.csv('path')
然后使用df.toPandas().to_csv()方法将对象写入pandas
数据帧。在这种情况下,您可以参考一些线索:Azure Data-bricks : How to read part files and save it as one file to blob?。
但是,恐怕此过程无法容纳如此高的内存消耗。因此,建议您只使用os
包直接进行合并。我测试了以下2个代码段,供您参考。
1st:
import os
path = '/dbfs/mnt/test/'
file_suffix = '.csv'
filtered_files = [file for file in files if file.endswith(file_suffix)]
print(filtered_files)
with open(path + 'final.csv', 'w') as final_file:
for file in filtered_files:
with open(file) as f:
lines = f.readlines()
final_file.writelines(lines[1:])
[第二:
import os
path = '/dbfs/mnt/test/'
file_suffix = '.csv'
filtered_files = [os.path.join(root, name) for root, dirs, files in os.walk(top=path , topdown=False) for name in files if name.endswith(file_suffix)]
print(filtered_files)
with open(path + 'final2.csv', 'w') as final_file:
for file in filtered_files:
with open(file) as f:
lines = f.readlines()
final_file.writelines(lines[1:])
第二个是兼容的层次结构。
另外,我在这里提供一种使用ADF复制活动将多个csv文件传输到ADLS gen2中的一个文件的方法。
请参考此doc,并在ADLS gen2源数据集中配置文件夹路径。然后,将[MergeFiles]设置为copyBehavior属性。(此外,您可以使用wildFileName之类的*.csv
来排除您不想删除的文件触摸特定的文件夹)
将源文件夹中的所有文件合并到一个文件中。如果文件名如果指定,则合并的文件名是指定的名称。除此以外,这是一个自动生成的文件名。