我正在使用 Go 的官方
mongodb
驱动程序。我获取一个 CSV 并逐行读取它,直到达到 1000 行,然后我解析数据并将其插入到数据库中。我假设它需要一个恒定的内存,因为批量写入的数据总是相同的(1000 个联系人)。但是,随着内存显着增加,情况并非如此。以下是有关上述查询的一些数据:
batchSize = 1000
Contacts - Memory consumed by bulkwrite
10k - 14 MB
20K - 30MB
30K - 59MB
40K - 137 MB
50K -241 MB
谁能解释为什么?
代码如下:
func (c *csvProcessor) processCSV(r io.Reader, headerMap map[string]int, emailDB *mongo.Database) error {
//some code...
csvReader := csv.NewReader(r)
for {
eofReached, err := c.processCSVBatch(csvReader, emailHash, smsHash, headerMap, emailDB)
if err != nil {
return errors.Wrap(err, "process CSV batch")
}
if eofReached {
break
}
}
return nil
}
func (c *csvProcessor) processCSVBatch(csvReader *csv.Reader, emailHash map[string]*userData, smsHash map[string]*userData, headerMap map[string]int, emailDB *mongo.Database) (bool, error) {
var insertUsers, updateUsers, deleteUsers []interface{}
var isEOFReached bool
for i := 0; i < processCSVBatchSize; i++ {
line, err := csvReader.Read()
if err != nil {
if err != io.EOF {
return false, errors.Wrap(err, "read from input")
}
isEOFReached = true
break
}
//some code
insert, update, delete := c.dataMerger.mergeData(
c.parseUser(line, headerMap),
emailHash[stringToMD5(line[headerMap["email"]])],
smsHashVal,
)
if insert != nil {
insertUsers = append(insertUsers, insert)
}
if update != nil {
updateUsers = append(updateUsers, update)
}
if delete != nil {
deleteUsers = append(deleteUsers, delete)
}
}
//update DB
err := c.mongoDBUserHandler.saveUsers(emailDB, insertUsers, updateUsers, deleteUsers)
if err != nil {
return false, errors.Wrap(err, "save users")
}
return isEOFReached, nil
}
func (m *mongoDBUserHandler) saveUsers(emailDB *mongo.Database, insert, update, delete []interface{}) error {
ctx := context.Background()
// create the slice of write models
var writes []mongo.WriteModel
if len(insert) > 0 {
writes = append(writes, m.getInsertWrites(insert)...)
}
if len(update) > 0 {
writes = append(writes, m.getUpdateWrites(update)...)
}
if len(delete) > 0 {
writes = append(writes, m.getDeleteWrites(delete)...)
}
if len(writes) == 0 {
return nil
}
// run bulk write
_, err := emailDB.
Collection(userCollection).
BulkWrite(ctx, writes, options.BulkWrite().SetOrdered(false))
if err != nil {
return errors.Wrap(err, "bulk write")
}
return nil
}
以下是变相的复制:
if len(insert) > 0 {
writes = append(writes, m.getInsertWrites(insert)...)
}
if len(update) > 0 {
writes = append(writes, m.getUpdateWrites(update)...)
}
if len(delete) > 0 {
writes = append(writes, m.getDeleteWrites(delete)...)
}
非常容易实现的目标:删除上面的那些行,更改 BulkWrite 以接受
writes
为 interface{}
这样您就可以重复使用 相同的后备数组,这应该可以节省一些内存。