加入 65,000 个 .csv 文件的高效方法

问题描述 投票:0回答:3

我需要用 julia 语言处理 65,000 个 .csv 文件。

目标是对数据集进行基本统计。

我有一些方法来加入所有数据集

#1 - set a common index and leftjoin() - perform statistics row wise 
#2 - vcat() the dataframes on top of each other - vertically stacked use group by

无论如何,最终的数据框都非常大!并且处理速度变慢

有没有有效的方法来做到这一点?

我考虑执行 #1 或 #2 并将连接操作分成三分之一,假设在 20,000 个连接保存到 .csv 并分块操作后,最后在最后一个操作中连接所有 3 个连接。

不太确定如何复制制作 65k .csv 文件,但基本上在下面我循环遍历目录中的文件,加载 csv,然后

vcat()
到一个 df。更多的问题是是否有更好的方法来管理操作的规模。
vcat()
使某些东西成长。也许我可以提前循环遍历 .csv 文件,获取每个 .csv 的文件尺寸,将完整数据帧初始化为最终输出大小,然后逐行循环遍历每个 .csv 并填充初始化的 df。

using CSV
using DataFrames

# read all files in directory
csv_dir_tmax = cd(readdir, "C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmax")

# initialize outputs
tmax_all = DataFrame(Date = [], TMAX = [])
c=1
for c = 1:length(csv_dir_tmax)
    print("Starting csv file ", csv_dir_tmax[c]," - Iteration ",c,"\n")
        if c <= length(csv_dir_tmax)
    csv_tmax = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmax/", csv_dir_tmax[c]]), DataFrame, header=true)
        tmax_all = vcat(tmax_all, csv_tmax)
    end
end
dataframe csv julia
3个回答
3
投票

以下方法应该相对有效(假设数据适合内存):

tmax_all = reduce(vcat, [CSV.read("YOUR_DIR$x", DataFrame) for x in csv_dir_tmax])

0
投票

将最终输出初始化为最终输出的总大小(就像最终构建的 vcat() 一样)。然后按元素填充它似乎效果更好:

# get the dimensions of each .csv files
tmax_all_total_output_size = fill(0, size(csv_dir_tmax,1))
tmin_all_total_output_size = fill(0, size(csv_dir_tmin,1))
tavg_all_total_output_size = fill(0, size(csv_dir_tavg,1))
tmax_dim = Int64[]
tmin_dim = Int64[]
tavg_dim = Int64[]
c=1
for c = 1:length(csv_dir_tmin) # 47484 - last point
    print("Starting csv file ", csv_dir_tmin[c]," - Iteration ",c,"\n")
        if c <= length(csv_dir_tmax)
    tmax_csv = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmax/", csv_dir_tmax[c] ]), DataFrame, header=true)
    global tmax_dim = size(tmax_csv,1)
        tmax_all_total_output_size[c] = tmax_dim
    end
    if c <= length(csv_dir_tmin)
    tmin_csv = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tmin/", csv_dir_tmin[c]]), DataFrame, header=true)
    global tmin_dim = size(tmin_csv,1)
        tmin_all_total_output_size[c] = tmin_dim
    end
    if c <= length(csv_dir_tavg)
    tavg_csv = CSV.read(join(["C:/Users/andrew.bannerman/Desktop/Julia/scripts/GHCN data/ghcnd_all_csv/tavg/", csv_dir_tavg[c]]), DataFrame, header=true)
    global tavg_dim = size(tavg_csv,1)
        tavg_all_total_output_size[c] = tavg_dim
    end
end

# sum total dimension of all .csv files
tmax_sum = sum(tmax_all_total_output_size)
tmin_sum = sum(tmin_all_total_output_size)
tavg_sum = sum(tavg_all_total_output_size)

# initialize final output to total final dimension
tmax_date_array = fill(Date("13000101", "yyyymmdd"),tmax_sum)
tmax_array = zeros(tmax_sum)
tmin_date_array = fill(Date("13000101", "yyyymmdd"),tmin_sum)
tmin_array = zeros(tmin_sum)
tavg_date_array = fill(Date("13000101", "yyyymmdd"),tavg_sum)
tavg_array = zeros(tavg_sum)

# initialize outputs
tmax_all = DataFrame(Date = tmax_date_array, TMAX = tmax_array)
tmin_all = DataFrame(Date = tmin_date_array, TMIN = tmin_array)
tavg_all = DataFrame(Date = tavg_date_array, TAVG = tavg_array)
tmax_count = 0
tmin_count = 0
tavg_count = 0

然后开始填充初始化的df。


0
投票

我花了相当多的时间来解决这个任务,处理 PosgreSQL 的 WAL 输出。结果通过 CsvCruncher CLI 工具的功能捕获。

它主要将 CSV 或 JSON 加载到表中,并允许在它们上运行 SQL。

但在此之前,它会预处理具有相同(或相似)结构的大量 CSV 文件,检测结构差异(更改的列),自动检测最小拟合类型,动态索引键以进行聚合等。

让我知道这是否有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.