我有一个非常大的 CSV 文件,无法完全装入内存。我希望能够将文件读取成块,然后将一系列操作链接在一起来处理结果。最后,我需要将最终结果聚合在一起,保持数据的原始顺序。
操作类似于
GetColumns(start int, end int) , GetRows(start int, end int), SumRows(),
等。因此,我可能有以下代码:
err := Read("input.csv").
With(GetColumns(3, 5)).
With(GetRows(7, 20)).
With(SumRow()).
Write("output.csv")
是否可以使用goroutine来处理每个操作的数据?如何传递每个操作的数据,最后如何获取最终结果并将其写入文件,同时保持数据的原始顺序?
在将整个文件读入内存的情况下,我想我需要一个像这样的接口和结构:
type TableProcessor struct {
Operations []TableOperation
}
type TableOperation interface {
Apply(table [][]string) [][]string
}
func (tp *TableProcessor) With(op TableOperation) *TableProcessor {
tp.Operations = append(tp.Operations, op)
return tp
}
我认为每个操作都必须看起来像这样:
func GetRows(start, end int) TableOperation {
return &GetRowsOperation{Start: start, End: end}
}
type GetRowsOperation struct {
Start, End int
}
func (op *GetRowsOperation) Apply(table [][]string) [][]string {
return table[op.Start:op.End]
}
但是我不确定如果我使用分块,接口和结构应该如何。而且我真的不确定如何将 goroutine 融入其中。
您可能希望在 TableProcessor 上有一个 apply 函数来应用所有 TableOperations 数组,但我认为某些 TableOperations 不能与像 GetRows 这样的 goroutine 一起应用。 所以我添加了 UseGoRoutine 字段来检查 TableOperation 是否应该使用 GoRoutine 并具有以下代码:
func (tp *TableProcessor) Apply() [][]string {
// read the table from csv
var table [][]string
for i, op := range tp.Operations {
if tp.UseGoRoutines[i] {
table = processChunks(table, 1, op.Apply)
} else {
table = op.Apply(table)
}
}
return table
}
然后是 processChunks:
func processChunks(arr [][]string, chunkSize int, function applyFunc) [][]string {
var wg sync.WaitGroup
output := make([][][]string, len(arr))
for i := 0; i < len(arr); i += chunkSize {
end := i + chunkSize
if end > len(arr) {
end = len(arr)
}
wg.Add(1)
go func(i int, chunk [][]string) {
defer wg.Done()
output[i] = function(chunk)
}(i, arr[i:end])
}
wg.Wait()
return function(flatten(output))
}
请注意,您应该调用 apply 函数。 我认为你不能在所有类型的操作中将这个函数与 goroutine 一起使用,但在某些情况下,例如 sum 和 ... 它可以工作 我使用 3D 数组,因为你说保留行的顺序很重要。
然后我使用下面的函数使 3D 数组变平:
func flatten(arr [][][]string) [][]string {
var arr2D [][]string
for _, layer := range arr {
for _, innerArr := range layer {
arr2D = append(arr2D, innerArr)
}
}
return arr2D
}