Go - 如何将大文件读取成块并使用多线程进行处理,以及聚合结果

问题描述 投票:0回答:1

我有一个非常大的 CSV 文件,无法完全装入内存。我希望能够将文件读取成块,然后将一系列操作链接在一起来处理结果。最后,我需要将最终结果聚合在一起,保持数据的原始顺序。

操作类似于

GetColumns(start int, end int) , GetRows(start int, end int), SumRows(),
等。因此,我可能有以下代码:

err := Read("input.csv").
        With(GetColumns(3, 5)).
        With(GetRows(7, 20)).
        With(SumRow()).
        Write("output.csv")

是否可以使用goroutine来处理每个操作的数据?如何传递每个操作的数据,最后如何获取最终结果并将其写入文件,同时保持数据的原始顺序?

在将整个文件读入内存的情况下,我想我需要一个像这样的接口和结构:


type TableProcessor struct {
    Operations []TableOperation
}

type TableOperation interface {
    Apply(table [][]string) [][]string
}

func (tp *TableProcessor) With(op TableOperation) *TableProcessor {
    tp.Operations = append(tp.Operations, op)
    return tp
}

我认为每个操作都必须看起来像这样:

func GetRows(start, end int) TableOperation {
    return &GetRowsOperation{Start: start, End: end}
}

type GetRowsOperation struct {
    Start, End int
}

func (op *GetRowsOperation) Apply(table [][]string) [][]string {
    return table[op.Start:op.End]
}

但是我不确定如果我使用分块,接口和结构应该如何。而且我真的不确定如何将 goroutine 融入其中。

csv go pipeline goroutine chunks
1个回答
0
投票

您可能希望在 TableProcessor 上有一个 apply 函数来应用所有 TableOperations 数组,但我认为某些 TableOperations 不能与像 GetRows 这样的 goroutine 一起应用。 所以我添加了 UseGoRoutine 字段来检查 TableOperation 是否应该使用 GoRoutine 并具有以下代码:

func (tp *TableProcessor) Apply() [][]string {
    // read the table from csv
    var table [][]string
    for i, op := range tp.Operations {
        if tp.UseGoRoutines[i] {
            table = processChunks(table, 1, op.Apply)
        } else {
            table = op.Apply(table)
        }
    }

    return table
}

然后是 processChunks:

func processChunks(arr [][]string, chunkSize int, function applyFunc) [][]string {
    var wg sync.WaitGroup

    output := make([][][]string, len(arr))

    for i := 0; i < len(arr); i += chunkSize {
        end := i + chunkSize
        if end > len(arr) {
            end = len(arr)
        }

        wg.Add(1)
        go func(i int, chunk [][]string) {
            defer wg.Done()
            output[i] = function(chunk)
        }(i, arr[i:end])
    }

    wg.Wait()
    return function(flatten(output))
}

请注意,您应该调用 apply 函数。 我认为你不能在所有类型的操作中将这个函数与 goroutine 一起使用,但在某些情况下,例如 sum 和 ... 它可以工作 我使用 3D 数组,因为你说保留行的顺序很重要。

然后我使用下面的函数使 3D 数组变平:

func flatten(arr [][][]string) [][]string {
    var arr2D [][]string
    for _, layer := range arr {
        for _, innerArr := range layer {
            arr2D = append(arr2D, innerArr)
        }
    }

    return arr2D
}
© www.soinside.com 2019 - 2024. All rights reserved.