我需要用 julia 语言处理 65,000 个 .csv 文件。
目标是对数据集进行基本统计。
我有一些方法来加入所有数据集
#1 - set a common index and leftjoin() - perform statistics row wise
#2 - vcat() the dataframes on top of each other - vertically stacked use group by
无论如何,最终的数据框都非常大!并且处理速度变慢
有没有有效的方法来做到这一点?
我考虑执行 #1 或 #2 并将连接操作分成三分之一,假设在 20,000 个连接保存到 .csv 并分块操作后,最后在最后一个操作中连接所有 3 个连接。
不太确定如何复制制作 65k .csv 文件,但基本上在下面我循环遍历目录中的文件,加载 csv,然后
vcat()
到一个 df。更多的问题是是否有更好的方法来管理操作的规模。 vcat()
使某些东西成长。也许我可以提前循环遍历 .csv 文件,获取每个 .csv 的文件尺寸,将完整数据帧初始化为最终输出大小,然后逐行循环遍历每个 .csv 并填充初始化的 df。
using CSV
using DataFrames
# read all files in directory
csv_dir_tmax = cd(readdir, "C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmax")
# initialize outputs
tmax_all = DataFrame(Date = [], TMAX = [])
c=1
for c = 1:length(csv_dir_tmax)
print("Starting csv file ", csv_dir_tmax[c]," - Iteration ",c,"\n")
if c <= length(csv_dir_tmax)
csv_tmax = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmax/", csv_dir_tmax[c]]), DataFrame, header=true)
tmax_all = vcat(tmax_all, csv_tmax)
end
end
以下方法应该相对有效(假设数据适合内存):
tmax_all = reduce(vcat, [CSV.read("YOUR_DIR$x", DataFrame) for x in csv_dir_tmax])
将最终输出初始化为最终输出的总大小(就像最终构建的 vcat() 一样)。然后按元素填充它似乎效果更好:
# get the dimensions of each .csv files
tmax_all_total_output_size = fill(0, size(csv_dir_tmax,1))
tmin_all_total_output_size = fill(0, size(csv_dir_tmin,1))
tavg_all_total_output_size = fill(0, size(csv_dir_tavg,1))
tmax_dim = Int64[]
tmin_dim = Int64[]
tavg_dim = Int64[]
c=1
for c = 1:length(csv_dir_tmin) # 47484 - last point
print("Starting csv file ", csv_dir_tmin[c]," - Iteration ",c,"\n")
if c <= length(csv_dir_tmax)
tmax_csv = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmax/", csv_dir_tmax[c] ]), DataFrame, header=true)
global tmax_dim = size(tmax_csv,1)
tmax_all_total_output_size[c] = tmax_dim
end
if c <= length(csv_dir_tmin)
tmin_csv = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmin/", csv_dir_tmin[c]]), DataFrame, header=true)
global tmin_dim = size(tmin_csv,1)
tmin_all_total_output_size[c] = tmin_dim
end
if c <= length(csv_dir_tavg)
tavg_csv = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tavg/", csv_dir_tavg[c]]), DataFrame, header=true)
global tavg_dim = size(tavg_csv,1)
tavg_all_total_output_size[c] = tavg_dim
end
end
# sum total dimension of all .csv files
tmax_sum = sum(tmax_all_total_output_size)
tmin_sum = sum(tmin_all_total_output_size)
tavg_sum = sum(tavg_all_total_output_size)
# initialize final output to total final dimension
tmax_date_array = fill(Date("13000101", "yyyymmdd"),tmax_sum)
tmax_array = zeros(tmax_sum)
tmin_date_array = fill(Date("13000101", "yyyymmdd"),tmin_sum)
tmin_array = zeros(tmin_sum)
tavg_date_array = fill(Date("13000101", "yyyymmdd"),tavg_sum)
tavg_array = zeros(tavg_sum)
# initialize outputs
tmax_all = DataFrame(Date = tmax_date_array, TMAX = tmax_array)
tmin_all = DataFrame(Date = tmin_date_array, TMIN = tmin_array)
tavg_all = DataFrame(Date = tavg_date_array, TAVG = tavg_array)
tmax_count = 0
tmin_count = 0
tavg_count = 0
然后开始填充初始化的df。
我花了相当多的时间来解决这个任务,处理 PosgreSQL 的 WAL 输出。结果通过 CsvCruncher CLI 工具的功能捕获。
它主要将 CSV 或 JSON 加载到表中,并允许在它们上运行 SQL。
但在此之前,它会预处理具有相同(或相似)结构的大量 CSV 文件,检测结构差异(更改的列),自动检测最小拟合类型,动态索引键以进行聚合等。
让我知道这是否有帮助。